use prost::Message;
use steam_enums::{ECsgoGCMsg, EMsg};
use crate::{error::SteamError, protocol::header::CMsgProtoBufHeader, SteamClient};
const PROTO_MASK: u32 = 0x80000000;
#[derive(Debug, Clone)]
pub struct GCMessage {
pub appid: u32,
pub msg_type: u32,
pub is_protobuf: bool,
pub payload: Vec<u8>,
pub target_job_id: Option<u64>,
pub source_job_id: Option<u64>,
}
#[derive(Debug, Clone, Default)]
pub struct GCSendOptions {
pub proto_header: Option<GCProtoHeader>,
pub legacy_header: Option<GCLegacyHeader>,
}
#[derive(Debug, Clone)]
pub struct GCProtoHeader {
pub job_id_source: u64,
pub job_id_target: u64,
pub target_job_name: Option<String>,
}
impl Default for GCProtoHeader {
fn default() -> Self {
Self { job_id_source: u64::MAX, job_id_target: u64::MAX, target_job_name: None }
}
}
#[derive(Debug, Clone)]
pub struct GCLegacyHeader {
pub job_id_source: u64,
pub job_id_target: u64,
}
impl Default for GCLegacyHeader {
fn default() -> Self {
Self { job_id_source: u64::MAX, job_id_target: u64::MAX }
}
}
impl SteamClient {
pub async fn send_to_gc(&mut self, appid: u32, msg_type: u32, payload: &[u8]) -> Result<(), SteamError> {
self.send_to_gc_with_options(appid, msg_type, payload, GCSendOptions::default()).await
}
pub async fn send_to_gc_proto(&mut self, appid: u32, msg_type: u32, payload: &[u8], header: GCProtoHeader) -> Result<(), SteamError> {
self.send_to_gc_with_options(appid, msg_type, payload, GCSendOptions { proto_header: Some(header), legacy_header: None }).await
}
pub async fn send_to_gc_with_options(&mut self, appid: u32, msg_type: u32, payload: &[u8], options: GCSendOptions) -> Result<(), SteamError> {
if !self.is_logged_in() {
return Err(SteamError::NotLoggedOn);
}
let is_protobuf = options.proto_header.is_some();
let gc_payload = if let Some(proto_header) = options.proto_header {
let msg_type_with_proto = msg_type | PROTO_MASK;
let proto_header_bytes = encode_gc_proto_header(&proto_header);
let mut header = Vec::with_capacity(8 + proto_header_bytes.len());
header.extend_from_slice(&msg_type_with_proto.to_le_bytes());
header.extend_from_slice(&(proto_header_bytes.len() as i32).to_le_bytes());
header.extend_from_slice(&proto_header_bytes);
header.extend_from_slice(payload);
header
} else {
let (job_id_source, job_id_target) = if let Some(header) = options.legacy_header { (header.job_id_source, header.job_id_target) } else { (u64::MAX, u64::MAX) };
let mut header = Vec::with_capacity(18 + payload.len());
header.extend_from_slice(&1u16.to_le_bytes()); header.extend_from_slice(&job_id_target.to_le_bytes()); header.extend_from_slice(&job_id_source.to_le_bytes()); header.extend_from_slice(payload);
header
};
let final_msg_type = if is_protobuf { msg_type | PROTO_MASK } else { msg_type };
let gc_msg = steam_protos::CMsgGCClient { appid: Some(appid), msgtype: Some(final_msg_type), payload: Some(gc_payload), ..Default::default() };
self.send_message_with_routing(EMsg::ClientToGC, appid, &gc_msg).await
}
pub async fn request_players_profile(&mut self, steam_id: steamid::SteamID) -> Result<(), SteamError> {
if !self.is_logged_in() {
return Err(SteamError::NotLoggedOn);
}
let msg = steam_protos::CMsgGccStrike15V2ClientRequestPlayersProfile {
account_id: Some(steam_id.account_id),
request_level: Some(32), };
self.send_to_gc_proto(crate::services::csgo::APP_ID, ECsgoGCMsg::ClientRequestPlayersProfile as u32, &msg.encode_to_vec(), GCProtoHeader::default()).await
}
}
fn encode_gc_proto_header(header: &GCProtoHeader) -> Vec<u8> {
let msg = CMsgProtoBufHeader {
jobid_source: Some(header.job_id_source),
jobid_target: Some(header.job_id_target),
target_job_name: header.target_job_name.clone(),
..Default::default()
};
msg.encode_to_vec()
}
pub fn parse_gc_message(body: &steam_protos::CMsgGCClient) -> Option<GCMessage> {
let appid = body.appid?;
let raw_msg_type = body.msgtype?;
let payload_bytes = body.payload.as_ref()?;
let is_protobuf = (raw_msg_type & PROTO_MASK) != 0;
let msg_type = raw_msg_type & !PROTO_MASK;
let (payload, target_job_id, source_job_id) = if is_protobuf {
if payload_bytes.len() < 8 {
return None;
}
let header_len = i32::from_le_bytes([payload_bytes[4], payload_bytes[5], payload_bytes[6], payload_bytes[7]]) as usize;
if payload_bytes.len() < 8 + header_len {
return None;
}
let proto_header_bytes = &payload_bytes[8..8 + header_len];
let proto_header = match CMsgProtoBufHeader::decode(proto_header_bytes) {
Ok(header) => header,
Err(_) => return None,
};
(payload_bytes[8 + header_len..].to_vec(), proto_header.jobid_target, proto_header.jobid_source)
} else {
if payload_bytes.len() < 18 {
return None;
}
let target_job_id = u64::from_le_bytes([payload_bytes[2], payload_bytes[3], payload_bytes[4], payload_bytes[5], payload_bytes[6], payload_bytes[7], payload_bytes[8], payload_bytes[9]]);
let source_job_id = u64::from_le_bytes([payload_bytes[10], payload_bytes[11], payload_bytes[12], payload_bytes[13], payload_bytes[14], payload_bytes[15], payload_bytes[16], payload_bytes[17]]);
(payload_bytes[18..].to_vec(), Some(target_job_id), Some(source_job_id))
};
Some(GCMessage { appid, msg_type, is_protobuf, payload, target_job_id, source_job_id })
}
use std::{
collections::HashMap,
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use tokio::sync::oneshot;
use tracing::{debug, warn};
#[derive(Debug)]
pub enum GCJobResponse {
Success(Vec<u8>),
Timeout,
}
pub struct PendingGCJob {
pub created_at: Instant,
pub timeout: Duration,
pub response_tx: oneshot::Sender<GCJobResponse>,
}
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct GCJobKey {
pub appid: u32,
pub response_msg_type: u32,
}
#[derive(Clone)]
pub struct GCJobManager {
inner: Arc<GCJobManagerInner>,
}
struct GCJobManagerInner {
pending_jobs: Mutex<HashMap<GCJobKey, PendingGCJob>>,
default_timeout: Duration,
}
impl GCJobManager {
pub fn new() -> Self {
Self::with_timeout(Duration::from_secs(30))
}
pub fn with_timeout(default_timeout: Duration) -> Self {
Self { inner: Arc::new(GCJobManagerInner { pending_jobs: Mutex::new(HashMap::new()), default_timeout }) }
}
pub fn create_job(&self, appid: u32, response_msg_type: u32) -> oneshot::Receiver<GCJobResponse> {
self.create_job_with_timeout(appid, response_msg_type, self.inner.default_timeout)
}
pub fn create_job_with_timeout(&self, appid: u32, response_msg_type: u32, timeout: Duration) -> oneshot::Receiver<GCJobResponse> {
let (tx, rx) = oneshot::channel();
let key = GCJobKey { appid, response_msg_type };
let job = PendingGCJob { created_at: Instant::now(), timeout, response_tx: tx };
self.inner.pending_jobs.lock().expect("mutex poisoned").insert(key.clone(), job);
debug!("Created GC job for appid={}, response_msg_type={}", appid, response_msg_type);
rx
}
pub fn try_complete(&self, appid: u32, msg_type: u32, payload: Vec<u8>) -> bool {
let key = GCJobKey { appid, response_msg_type: msg_type };
if let Some(job) = self.inner.pending_jobs.lock().expect("mutex poisoned").remove(&key) {
debug!("Completing GC job for appid={}, msg_type={}", appid, msg_type);
let _ = job.response_tx.send(GCJobResponse::Success(payload));
true
} else {
false
}
}
pub fn cleanup_expired(&self) {
let now = Instant::now();
let mut pending = self.inner.pending_jobs.lock().expect("mutex poisoned");
let expired_keys: Vec<GCJobKey> = pending.iter().filter(|(_, job)| now.duration_since(job.created_at) > job.timeout).map(|(key, _)| key.clone()).collect();
for key in expired_keys {
if let Some(job) = pending.remove(&key) {
warn!("GC job expired for appid={}, response_msg_type={}", key.appid, key.response_msg_type);
let _ = job.response_tx.send(GCJobResponse::Timeout);
}
}
}
pub fn pending_count(&self) -> usize {
self.inner.pending_jobs.lock().expect("mutex poisoned").len()
}
}
impl Default for GCJobManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use steam_protos::CMsgGCClient;
use super::*;
use crate::protocol::header::CMsgProtoBufHeader;
#[test]
fn test_parse_gc_message_protobuf() -> Result<(), Box<dyn std::error::Error>> {
let mut payload = Vec::new();
let msg_type_raw: u32 = 1000 | PROTO_MASK;
payload.extend_from_slice(&msg_type_raw.to_le_bytes());
let header = CMsgProtoBufHeader { jobid_source: Some(1), jobid_target: Some(2), ..Default::default() };
let header_bytes = header.encode_to_vec();
payload.extend_from_slice(&(header_bytes.len() as i32).to_le_bytes());
payload.extend_from_slice(&header_bytes);
let body_content = vec![0x01, 0x02, 0x03];
payload.extend_from_slice(&body_content);
let input = CMsgGCClient {
appid: Some(crate::services::csgo::APP_ID),
msgtype: Some(msg_type_raw),
payload: Some(payload),
..Default::default()
};
let result = parse_gc_message(&input).ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid data"))?;
assert_eq!(result.appid, crate::services::csgo::APP_ID);
assert_eq!(result.msg_type, 1000);
assert!(result.is_protobuf);
assert_eq!(result.payload, body_content);
assert_eq!(result.target_job_id, Some(2));
assert_eq!(result.source_job_id, Some(1));
Ok(())
}
#[test]
fn test_parse_gc_message_legacy() -> Result<(), Box<dyn std::error::Error>> {
let mut payload = Vec::new();
payload.extend_from_slice(&1u16.to_le_bytes()); payload.extend_from_slice(&u64::MAX.to_le_bytes()); payload.extend_from_slice(&u64::MAX.to_le_bytes());
let body_content = vec![0xAA, 0xBB, 0xCC];
payload.extend_from_slice(&body_content);
let input = CMsgGCClient {
appid: Some(440),
msgtype: Some(2000), payload: Some(payload),
..Default::default()
};
let result = parse_gc_message(&input).ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid data"))?;
assert_eq!(result.appid, 440);
assert_eq!(result.msg_type, 2000);
assert!(!result.is_protobuf);
assert_eq!(result.payload, body_content);
assert_eq!(result.target_job_id, Some(u64::MAX));
assert_eq!(result.source_job_id, Some(u64::MAX));
Ok(())
}
#[test]
fn test_parse_gc_message_truncated() {
let input = CMsgGCClient {
appid: Some(730),
msgtype: Some(1000 | PROTO_MASK),
payload: Some(vec![0x00]), ..Default::default()
};
let result = parse_gc_message(&input);
assert!(result.is_none());
}
}