Skip to main content

steam_client/connection/
tcp.rs

1//! TCP connection to Steam CM servers.
2//!
3//! This module implements the raw TCP connection protocol used by Steam
4//! clients. TCP connections require an encryption handshake before any messages
5//! can be exchanged.
6
7#![allow(dead_code)]
8
9use std::io::{self, ErrorKind};
10
11use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
12use bytes::{Buf, Bytes, BytesMut};
13use steam_crypto::{calculate_key_crc, decrypt_with_hmac_iv, encrypt_with_hmac_iv, generate_session_key, SessionKey};
14use steam_enums::{EMsg, EResult};
15use tokio::{
16    io::{AsyncReadExt, AsyncWriteExt},
17    net::TcpStream,
18    time::{timeout, Duration},
19};
20use tracing::{debug, error, info};
21
22use crate::{connection::CmServer, error::SteamError};
23
24/// Magic bytes for the VT01 wire protocol.
25const VT01_MAGIC: &[u8; 4] = b"VT01";
26
27/// Default TCP connection timeout.
28const CONNECTION_TIMEOUT_SECS: u64 = 10;
29/// Timeout for waiting for the encryption handshake to complete.
30const HANDSHAKE_TIMEOUT_SECS: u64 = 5;
31
32/// TCP connection to a Steam CM server.
33pub struct TcpConnection {
34    stream: TcpStream,
35    server: CmServer,
36    session_key: Option<SessionKey>,
37    /// Buffer for incomplete messages.
38    read_buffer: BytesMut,
39    /// Expected message length (if we're in the middle of reading).
40    expected_length: Option<u32>,
41}
42
43impl TcpConnection {
44    /// Connect to a Steam CM server via TCP.
45    ///
46    /// This establishes the TCP connection but does NOT perform the encryption
47    /// handshake. Call [`wait_for_encrypt_request`] followed by
48    /// [`complete_handshake`] to set up encryption before sending any
49    /// messages.
50    pub async fn connect(server: CmServer) -> Result<Self, SteamError> {
51        // Parse endpoint (host:port)
52        let endpoint = &server.endpoint;
53        info!("Connecting to TCP CM: {}", endpoint);
54
55        let stream = timeout(Duration::from_secs(CONNECTION_TIMEOUT_SECS), TcpStream::connect(endpoint)).await.map_err(|_| SteamError::Timeout)?.map_err(|e| SteamError::ConnectionError(format!("TCP connect failed: {}", e)))?;
56
57        debug!("TCP connection established to {}", endpoint);
58
59        Ok(Self { stream, server, session_key: None, read_buffer: BytesMut::with_capacity(8192), expected_length: None })
60    }
61
62    /// Wait for the ChannelEncryptRequest from the server.
63    ///
64    /// Returns the protocol version, universe, and nonce from the request.
65    pub async fn wait_for_encrypt_request(&mut self) -> Result<(u32, u32, [u8; 16]), SteamError> {
66        debug!("Waiting for ChannelEncryptRequest...");
67
68        // Read the raw message
69        let msg = timeout(Duration::from_secs(HANDSHAKE_TIMEOUT_SECS), self.recv_raw()).await.map_err(|_| SteamError::Timeout)??.ok_or_else(|| SteamError::ConnectionError("Connection closed".into()))?;
70
71        // Parse minimal header for ChannelEncryptRequest
72        // Header format: EMsg (4 bytes) + targetJobID (8 bytes) + sourceJobID (8 bytes)
73        if msg.len() < 20 + 16 + 8 {
74            return Err(SteamError::ProtocolError("ChannelEncryptRequest too short".into()));
75        }
76
77        let mut cursor = std::io::Cursor::new(&msg);
78        let raw_emsg = ReadBytesExt::read_u32::<LittleEndian>(&mut cursor).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
79        let emsg = EMsg::from_i32((raw_emsg & !0x80000000) as i32).unwrap_or(EMsg::Invalid);
80
81        if emsg != EMsg::ChannelEncryptRequest {
82            return Err(SteamError::ProtocolError(format!("Expected ChannelEncryptRequest, got {:?}", emsg)));
83        }
84
85        // Skip job IDs (8 + 8 bytes)
86        cursor.set_position(20);
87
88        // Read protocol, universe, nonce
89        let protocol = ReadBytesExt::read_u32::<LittleEndian>(&mut cursor).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
90        let universe = ReadBytesExt::read_u32::<LittleEndian>(&mut cursor).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
91
92        let mut nonce = [0u8; 16];
93        std::io::Read::read_exact(&mut cursor, &mut nonce).map_err(|e| SteamError::ProtocolError(format!("Failed to read nonce: {}", e)))?;
94
95        debug!("ChannelEncryptRequest: protocol={}, universe={}, nonce={}", protocol, universe, hex::encode(nonce));
96
97        Ok((protocol, universe, nonce))
98    }
99
100    /// Complete the encryption handshake.
101    ///
102    /// Generates a session key, sends ChannelEncryptResponse, and waits for
103    /// ChannelEncryptResult. After this succeeds, all messages will be
104    /// encrypted.
105    pub async fn complete_handshake(&mut self, protocol: u32, nonce: &[u8; 16]) -> Result<(), SteamError> {
106        // Generate session key and encrypt it on a blocking thread (RSA-OAEP is CPU-bound, ~100us+).
107        let nonce_owned: [u8; 16] = *nonce;
108        let key_pair = tokio::task::spawn_blocking(move || generate_session_key(&nonce_owned))
109            .await
110            .map_err(|e| SteamError::ProtocolError(format!("Key generation task join failed: {}", e)))?
111            .map_err(|e| SteamError::ProtocolError(format!("Key generation failed: {}", e)))?;
112
113        let crc = calculate_key_crc(&key_pair.encrypted);
114
115        debug!("Generated session key, encrypted length={}, crc=0x{:08x}", key_pair.encrypted.len(), crc);
116
117        // Build ChannelEncryptResponse
118        // Header: EMsg (4) + targetJobID (8) + sourceJobID (8) = 20 bytes
119        // Body: protocol (4) + key_size (4) + encrypted_key + crc (4) + padding (4)
120        let body_len = 4 + 4 + key_pair.encrypted.len() + 4 + 4;
121        let mut response = Vec::with_capacity(20 + body_len);
122
123        // Write header
124        WriteBytesExt::write_u32::<LittleEndian>(&mut response, EMsg::ChannelEncryptResponse as u32)?;
125        WriteBytesExt::write_u64::<LittleEndian>(&mut response, u64::MAX)?; // targetJobID = JOBID_NONE
126        WriteBytesExt::write_u64::<LittleEndian>(&mut response, u64::MAX)?; // sourceJobID = JOBID_NONE
127
128        // Write body
129        WriteBytesExt::write_u32::<LittleEndian>(&mut response, protocol)?;
130        WriteBytesExt::write_u32::<LittleEndian>(&mut response, key_pair.encrypted.len() as u32)?;
131        response.extend_from_slice(&key_pair.encrypted);
132        WriteBytesExt::write_u32::<LittleEndian>(&mut response, crc)?;
133        WriteBytesExt::write_u32::<LittleEndian>(&mut response, 0)?; // padding
134
135        // Send response (unencrypted, with VT01 framing)
136        self.send_raw(&response).await?;
137
138        // Wait for ChannelEncryptResult
139        let result_msg = timeout(Duration::from_secs(HANDSHAKE_TIMEOUT_SECS), self.recv_raw()).await.map_err(|_| SteamError::Timeout)??.ok_or_else(|| SteamError::ConnectionError("Connection closed".into()))?;
140
141        // Parse ChannelEncryptResult
142        if result_msg.len() < 24 {
143            return Err(SteamError::ProtocolError("ChannelEncryptResult too short".into()));
144        }
145
146        let mut cursor = std::io::Cursor::new(&result_msg);
147        let raw_emsg = ReadBytesExt::read_u32::<LittleEndian>(&mut cursor).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
148        let emsg = EMsg::from_i32((raw_emsg & !0x80000000) as i32).unwrap_or(EMsg::Invalid);
149
150        if emsg != EMsg::ChannelEncryptResult {
151            return Err(SteamError::ProtocolError(format!("Expected ChannelEncryptResult, got {:?}", emsg)));
152        }
153
154        // Skip job IDs
155        cursor.set_position(20);
156
157        let eresult = EResult::from_i32(ReadBytesExt::read_u32::<LittleEndian>(&mut cursor).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))? as i32).unwrap_or(EResult::Fail);
158        if eresult != EResult::OK {
159            return Err(SteamError::ProtocolError(format!("ChannelEncryptResult failed with eresult={:?}", eresult)));
160        }
161
162        // Encryption is now active
163        self.session_key = Some(key_pair.plain);
164        info!("Encryption handshake completed successfully");
165
166        Ok(())
167    }
168
169    /// Send a binary message.
170    ///
171    /// If encryption is active, the message will be encrypted on a blocking
172    /// thread before sending so AES work doesn't stall the async runtime.
173    pub async fn send(&mut self, data: Vec<u8>) -> Result<(), SteamError> {
174        let to_send = if let Some(key) = self.session_key.clone() {
175            tokio::task::spawn_blocking(move || encrypt_with_hmac_iv(&key, &data))
176                .await
177                .map_err(|e| SteamError::ProtocolError(format!("Encryption task join failed: {}", e)))?
178                .map_err(|e| SteamError::ProtocolError(format!("Encryption failed: {}", e)))?
179        } else {
180            data
181        };
182
183        self.send_raw(&to_send).await
184    }
185
186    /// Send raw bytes with VT01 framing (no encryption).
187    async fn send_raw(&mut self, data: &[u8]) -> Result<(), SteamError> {
188        // VT01 format: length (4 LE) + "VT01" + payload
189        let mut frame = Vec::with_capacity(8 + data.len());
190        WriteBytesExt::write_u32::<LittleEndian>(&mut frame, data.len() as u32)?;
191        frame.extend_from_slice(VT01_MAGIC);
192        frame.extend_from_slice(data);
193
194        timeout(Duration::from_secs(30), self.stream.write_all(&frame))
195            .await
196            .map_err(|_| SteamError::NetworkError(std::io::Error::from(std::io::ErrorKind::TimedOut)))?
197            .map_err(|e| SteamError::ConnectionError(format!("Write failed: {}", e)))?;
198
199        Ok(())
200    }
201
202    /// Receive a message.
203    ///
204    /// If encryption is active, the message will be decrypted.
205    pub async fn recv(&mut self) -> Result<Option<Bytes>, SteamError> {
206        let raw = match self.recv_raw().await {
207            Ok(Some(data)) => data,
208            Ok(None) => return Ok(None),
209            Err(e) => return Err(e),
210        };
211
212        if let Some(key) = self.session_key.clone() {
213            let raw_vec = raw.to_vec();
214            let decrypted = tokio::task::spawn_blocking(move || decrypt_with_hmac_iv(&key, &raw_vec))
215                .await
216                .map_err(|e| SteamError::ProtocolError(format!("Decryption task join failed: {}", e)))?
217                .map_err(|e| SteamError::ProtocolError(format!("Decryption failed: {}", e)))?;
218            Ok(Some(Bytes::from(decrypted)))
219        } else {
220            Ok(Some(raw))
221        }
222    }
223
224    /// Receive raw bytes with VT01 framing (no decryption).
225    async fn recv_raw(&mut self) -> Result<Option<Bytes>, SteamError> {
226        loop {
227            // If we already know the expected length, try to read the message
228            if let Some(expected) = self.expected_length {
229                if self.read_buffer.len() >= expected as usize {
230                    let msg = self.read_buffer.split_to(expected as usize).freeze();
231                    self.expected_length = None;
232                    return Ok(Some(msg));
233                }
234            } else if self.read_buffer.len() >= 8 {
235                // Try to read the header
236                // We peek at the header first without consuming it yet
237                let mut header = &self.read_buffer[..8];
238                let length = byteorder::ReadBytesExt::read_u32::<LittleEndian>(&mut header).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
239                let magic = &self.read_buffer[4..8];
240
241                if magic != VT01_MAGIC {
242                    return Err(SteamError::ProtocolError("Invalid VT01 magic".into()));
243                }
244
245                // Remove the header from buffer
246                self.read_buffer.advance(8);
247                self.expected_length = Some(length);
248
249                // Check if we already have the full message
250                if self.read_buffer.len() >= length as usize {
251                    let msg = self.read_buffer.split_to(length as usize).freeze();
252                    self.expected_length = None;
253                    return Ok(Some(msg));
254                }
255            }
256
257            // Need more data
258            // Read directly into the buffer, avoiding intermediate stack allocation
259            let read_result = timeout(Duration::from_secs(30), self.stream.read_buf(&mut self.read_buffer))
260                .await
261                .map_err(|_| SteamError::NetworkError(std::io::Error::from(std::io::ErrorKind::TimedOut)))?;
262            match read_result {
263                Ok(0) => {
264                    // Connection closed
265                    if self.read_buffer.is_empty() {
266                        return Ok(None);
267                    } else {
268                        return Err(SteamError::ConnectionError("Connection closed mid-message".into()));
269                    }
270                }
271                Ok(_) => {
272                    // Data was read directly into read_buffer, loop to process
273                    // it
274                }
275                Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
276                    continue;
277                }
278                Err(e) => {
279                    error!("TCP read error: {}", e);
280                    return Err(SteamError::ConnectionError(format!("Read failed: {}", e)));
281                }
282            }
283        }
284    }
285
286    /// Close the connection.
287    pub async fn close(mut self) -> Result<(), SteamError> {
288        timeout(Duration::from_secs(10), self.stream.shutdown())
289            .await
290            .map_err(|_| SteamError::NetworkError(std::io::Error::from(std::io::ErrorKind::TimedOut)))?
291            .map_err(|e| SteamError::ConnectionError(format!("Shutdown failed: {}", e)))
292    }
293
294    /// Get the connected server info.
295    pub fn server(&self) -> &CmServer {
296        &self.server
297    }
298
299    /// Check if encryption is active.
300    pub fn is_encrypted(&self) -> bool {
301        self.session_key.is_some()
302    }
303}
304
305use async_trait::async_trait;
306
307use super::traits::SteamConnection;
308
309#[async_trait]
310impl SteamConnection for TcpConnection {
311    async fn send(&mut self, data: Vec<u8>) -> Result<(), SteamError> {
312        TcpConnection::send(self, data).await
313    }
314
315    async fn recv(&mut self) -> Result<Option<Bytes>, SteamError> {
316        TcpConnection::recv(self).await
317    }
318
319    async fn close(self: Box<Self>) -> Result<(), SteamError> {
320        TcpConnection::close(*self).await
321    }
322
323    fn server(&self) -> &CmServer {
324        TcpConnection::server(self)
325    }
326
327    fn set_session_key(&mut self, key: Option<Vec<u8>>) {
328        self.session_key = key.and_then(|k| SessionKey::from_bytes(&k));
329    }
330}
331
332impl From<io::Error> for SteamError {
333    fn from(e: io::Error) -> Self {
334        SteamError::ConnectionError(e.to_string())
335    }
336}