1use prost::Message;
8use steam_enums::{ECsgoGCMsg, EMsg};
9
10use crate::{error::SteamError, protocol::header::CMsgProtoBufHeader, SteamClient};
11
12const PROTO_MASK: u32 = 0x80000000;
14
15#[derive(Debug, Clone)]
17pub struct GCMessage {
18 pub appid: u32,
20 pub msg_type: u32,
22 pub is_protobuf: bool,
24 pub payload: Vec<u8>,
26 pub target_job_id: Option<u64>,
28 pub source_job_id: Option<u64>,
30}
31
32#[derive(Debug, Clone, Default)]
34pub struct GCSendOptions {
35 pub proto_header: Option<GCProtoHeader>,
37 pub legacy_header: Option<GCLegacyHeader>,
39}
40
41#[derive(Debug, Clone)]
43pub struct GCProtoHeader {
44 pub job_id_source: u64,
46 pub job_id_target: u64,
48 pub target_job_name: Option<String>,
50}
51
52impl Default for GCProtoHeader {
53 fn default() -> Self {
54 Self { job_id_source: u64::MAX, job_id_target: u64::MAX, target_job_name: None }
55 }
56}
57
58#[derive(Debug, Clone)]
60pub struct GCLegacyHeader {
61 pub job_id_source: u64,
63 pub job_id_target: u64,
65}
66
67impl Default for GCLegacyHeader {
68 fn default() -> Self {
69 Self { job_id_source: u64::MAX, job_id_target: u64::MAX }
70 }
71}
72
73impl SteamClient {
74 pub async fn send_to_gc(&mut self, appid: u32, msg_type: u32, payload: &[u8]) -> Result<(), SteamError> {
90 self.send_to_gc_with_options(appid, msg_type, payload, GCSendOptions::default()).await
91 }
92
93 pub async fn send_to_gc_proto(&mut self, appid: u32, msg_type: u32, payload: &[u8], header: GCProtoHeader) -> Result<(), SteamError> {
101 self.send_to_gc_with_options(appid, msg_type, payload, GCSendOptions { proto_header: Some(header), legacy_header: None }).await
102 }
103
104 pub async fn send_to_gc_with_options(&mut self, appid: u32, msg_type: u32, payload: &[u8], options: GCSendOptions) -> Result<(), SteamError> {
106 if !self.is_logged_in() {
107 return Err(SteamError::NotLoggedOn);
108 }
109
110 let is_protobuf = options.proto_header.is_some();
112
113 let gc_payload = if let Some(proto_header) = options.proto_header {
114 let msg_type_with_proto = msg_type | PROTO_MASK;
116
117 let proto_header_bytes = encode_gc_proto_header(&proto_header);
119
120 let mut header = Vec::with_capacity(8 + proto_header_bytes.len());
122 header.extend_from_slice(&msg_type_with_proto.to_le_bytes());
123 header.extend_from_slice(&(proto_header_bytes.len() as i32).to_le_bytes());
124 header.extend_from_slice(&proto_header_bytes);
125 header.extend_from_slice(payload);
126 header
127 } else {
128 let (job_id_source, job_id_target) = if let Some(header) = options.legacy_header { (header.job_id_source, header.job_id_target) } else { (u64::MAX, u64::MAX) };
130
131 let mut header = Vec::with_capacity(18 + payload.len());
132 header.extend_from_slice(&1u16.to_le_bytes()); header.extend_from_slice(&job_id_target.to_le_bytes()); header.extend_from_slice(&job_id_source.to_le_bytes()); header.extend_from_slice(payload);
136 header
137 };
138
139 let final_msg_type = if is_protobuf { msg_type | PROTO_MASK } else { msg_type };
143
144 let gc_msg = steam_protos::CMsgGCClient { appid: Some(appid), msgtype: Some(final_msg_type), payload: Some(gc_payload), ..Default::default() };
145
146 self.send_message_with_routing(EMsg::ClientToGC, appid, &gc_msg).await
147 }
148
149 pub async fn request_players_profile(&mut self, steam_id: steamid::SteamID) -> Result<(), SteamError> {
154 if !self.is_logged_in() {
155 return Err(SteamError::NotLoggedOn);
156 }
157
158 let msg = steam_protos::CMsgGccStrike15V2ClientRequestPlayersProfile {
159 account_id: Some(steam_id.account_id),
160 request_level: Some(32), };
162
163 self.send_to_gc_proto(crate::services::csgo::APP_ID, ECsgoGCMsg::ClientRequestPlayersProfile as u32, &msg.encode_to_vec(), GCProtoHeader::default()).await
165 }
166}
167
168fn encode_gc_proto_header(header: &GCProtoHeader) -> Vec<u8> {
170 let msg = CMsgProtoBufHeader {
171 jobid_source: Some(header.job_id_source),
172 jobid_target: Some(header.job_id_target),
173 target_job_name: header.target_job_name.clone(),
174 ..Default::default()
175 };
176 msg.encode_to_vec()
177}
178
179pub fn parse_gc_message(body: &steam_protos::CMsgGCClient) -> Option<GCMessage> {
181 let appid = body.appid?;
182 let raw_msg_type = body.msgtype?;
183 let payload_bytes = body.payload.as_ref()?;
184
185 let is_protobuf = (raw_msg_type & PROTO_MASK) != 0;
186 let msg_type = raw_msg_type & !PROTO_MASK;
187
188 let (payload, target_job_id, source_job_id) = if is_protobuf {
189 if payload_bytes.len() < 8 {
191 return None;
192 }
193 let header_len = i32::from_le_bytes([payload_bytes[4], payload_bytes[5], payload_bytes[6], payload_bytes[7]]) as usize;
194
195 if payload_bytes.len() < 8 + header_len {
196 return None;
197 }
198
199 let proto_header_bytes = &payload_bytes[8..8 + header_len];
201 let proto_header = match CMsgProtoBufHeader::decode(proto_header_bytes) {
202 Ok(header) => header,
203 Err(_) => return None,
204 };
205
206 (payload_bytes[8 + header_len..].to_vec(), proto_header.jobid_target, proto_header.jobid_source)
207 } else {
208 if payload_bytes.len() < 18 {
211 return None;
212 }
213
214 let target_job_id = u64::from_le_bytes([payload_bytes[2], payload_bytes[3], payload_bytes[4], payload_bytes[5], payload_bytes[6], payload_bytes[7], payload_bytes[8], payload_bytes[9]]);
215
216 let source_job_id = u64::from_le_bytes([payload_bytes[10], payload_bytes[11], payload_bytes[12], payload_bytes[13], payload_bytes[14], payload_bytes[15], payload_bytes[16], payload_bytes[17]]);
217
218 (payload_bytes[18..].to_vec(), Some(target_job_id), Some(source_job_id))
219 };
220
221 Some(GCMessage { appid, msg_type, is_protobuf, payload, target_job_id, source_job_id })
222}
223
224use std::{
229 collections::HashMap,
230 sync::{Arc, Mutex},
231 time::{Duration, Instant},
232};
233
234use tokio::sync::oneshot;
235use tracing::{debug, warn};
236
237#[derive(Debug)]
239pub enum GCJobResponse {
240 Success(Vec<u8>),
242 Timeout,
244}
245
246pub struct PendingGCJob {
248 pub created_at: Instant,
250 pub timeout: Duration,
252 pub response_tx: oneshot::Sender<GCJobResponse>,
254}
255
256#[derive(Debug, Clone, Hash, PartialEq, Eq)]
258pub struct GCJobKey {
259 pub appid: u32,
261 pub response_msg_type: u32,
263}
264
265#[derive(Clone)]
267pub struct GCJobManager {
268 inner: Arc<GCJobManagerInner>,
269}
270
271struct GCJobManagerInner {
272 pending_jobs: Mutex<HashMap<GCJobKey, PendingGCJob>>,
274 default_timeout: Duration,
276}
277
278impl GCJobManager {
279 pub fn new() -> Self {
281 Self::with_timeout(Duration::from_secs(30))
282 }
283
284 pub fn with_timeout(default_timeout: Duration) -> Self {
286 Self { inner: Arc::new(GCJobManagerInner { pending_jobs: Mutex::new(HashMap::new()), default_timeout }) }
287 }
288
289 pub fn create_job(&self, appid: u32, response_msg_type: u32) -> oneshot::Receiver<GCJobResponse> {
294 self.create_job_with_timeout(appid, response_msg_type, self.inner.default_timeout)
295 }
296
297 pub fn create_job_with_timeout(&self, appid: u32, response_msg_type: u32, timeout: Duration) -> oneshot::Receiver<GCJobResponse> {
299 let (tx, rx) = oneshot::channel();
300 let key = GCJobKey { appid, response_msg_type };
301
302 let job = PendingGCJob { created_at: Instant::now(), timeout, response_tx: tx };
303
304 self.inner.pending_jobs.lock().expect("mutex poisoned").insert(key.clone(), job);
305 debug!("Created GC job for appid={}, response_msg_type={}", appid, response_msg_type);
306
307 rx
308 }
309
310 pub fn try_complete(&self, appid: u32, msg_type: u32, payload: Vec<u8>) -> bool {
314 let key = GCJobKey { appid, response_msg_type: msg_type };
315
316 if let Some(job) = self.inner.pending_jobs.lock().expect("mutex poisoned").remove(&key) {
317 debug!("Completing GC job for appid={}, msg_type={}", appid, msg_type);
318 let _ = job.response_tx.send(GCJobResponse::Success(payload));
319 true
320 } else {
321 false
322 }
323 }
324
325 pub fn cleanup_expired(&self) {
327 let now = Instant::now();
328 let mut pending = self.inner.pending_jobs.lock().expect("mutex poisoned");
329 let expired_keys: Vec<GCJobKey> = pending.iter().filter(|(_, job)| now.duration_since(job.created_at) > job.timeout).map(|(key, _)| key.clone()).collect();
330
331 for key in expired_keys {
332 if let Some(job) = pending.remove(&key) {
333 warn!("GC job expired for appid={}, response_msg_type={}", key.appid, key.response_msg_type);
334 let _ = job.response_tx.send(GCJobResponse::Timeout);
335 }
336 }
337 }
338
339 pub fn pending_count(&self) -> usize {
341 self.inner.pending_jobs.lock().expect("mutex poisoned").len()
342 }
343}
344
345impl Default for GCJobManager {
346 fn default() -> Self {
347 Self::new()
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use steam_protos::CMsgGCClient;
354
355 use super::*;
356 use crate::protocol::header::CMsgProtoBufHeader;
357
358 #[test]
359 fn test_parse_gc_message_protobuf() -> Result<(), Box<dyn std::error::Error>> {
360 let mut payload = Vec::new();
363 let msg_type_raw: u32 = 1000 | PROTO_MASK;
364 payload.extend_from_slice(&msg_type_raw.to_le_bytes());
365
366 let header = CMsgProtoBufHeader { jobid_source: Some(1), jobid_target: Some(2), ..Default::default() };
367 let header_bytes = header.encode_to_vec();
368
369 payload.extend_from_slice(&(header_bytes.len() as i32).to_le_bytes());
370 payload.extend_from_slice(&header_bytes);
371
372 let body_content = vec![0x01, 0x02, 0x03];
373 payload.extend_from_slice(&body_content);
374
375 let input = CMsgGCClient {
376 appid: Some(crate::services::csgo::APP_ID),
377 msgtype: Some(msg_type_raw),
378 payload: Some(payload),
379 ..Default::default()
380 };
381
382 let result = parse_gc_message(&input).ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid data"))?;
383
384 assert_eq!(result.appid, crate::services::csgo::APP_ID);
385 assert_eq!(result.msg_type, 1000);
386 assert!(result.is_protobuf);
387 assert_eq!(result.payload, body_content);
388 assert_eq!(result.target_job_id, Some(2));
389 assert_eq!(result.source_job_id, Some(1));
390 Ok(())
391 }
392
393 #[test]
394 fn test_parse_gc_message_legacy() -> Result<(), Box<dyn std::error::Error>> {
395 let mut payload = Vec::new();
398 payload.extend_from_slice(&1u16.to_le_bytes()); payload.extend_from_slice(&u64::MAX.to_le_bytes()); payload.extend_from_slice(&u64::MAX.to_le_bytes()); let body_content = vec![0xAA, 0xBB, 0xCC];
403 payload.extend_from_slice(&body_content);
404
405 let input = CMsgGCClient {
406 appid: Some(440),
407 msgtype: Some(2000), payload: Some(payload),
409 ..Default::default()
410 };
411
412 let result = parse_gc_message(&input).ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid data"))?;
413
414 assert_eq!(result.appid, 440);
415 assert_eq!(result.msg_type, 2000);
416 assert!(!result.is_protobuf);
417 assert_eq!(result.payload, body_content);
418 assert_eq!(result.target_job_id, Some(u64::MAX));
419 assert_eq!(result.source_job_id, Some(u64::MAX));
420 Ok(())
421 }
422
423 #[test]
424 fn test_parse_gc_message_truncated() {
425 let input = CMsgGCClient {
427 appid: Some(730),
428 msgtype: Some(1000 | PROTO_MASK),
429 payload: Some(vec![0x00]), ..Default::default()
431 };
432
433 let result = parse_gc_message(&input);
434 assert!(result.is_none());
435 }
436}