Skip to main content

steam_client/connection/
tcp.rs

1//! TCP connection to Steam CM servers.
2//!
3//! This module implements the raw TCP connection protocol used by Steam
4//! clients. TCP connections require an encryption handshake before any messages
5//! can be exchanged.
6
7#![allow(dead_code)]
8
9use std::io::{self, ErrorKind};
10
11use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
12use bytes::{Buf, Bytes, BytesMut};
13use steam_crypto::{calculate_key_crc, decrypt_with_hmac_iv, encrypt_with_hmac_iv, generate_session_key, SessionKey};
14use steam_enums::{EMsg, EResult};
15use tokio::{
16    io::{AsyncReadExt, AsyncWriteExt},
17    net::TcpStream,
18    time::{timeout, Duration},
19};
20use tracing::{debug, error, info};
21
22use crate::{connection::CmServer, error::SteamError};
23
24/// Magic bytes for the VT01 wire protocol.
25const VT01_MAGIC: &[u8; 4] = b"VT01";
26
27/// Default TCP connection timeout.
28const CONNECTION_TIMEOUT_SECS: u64 = 10;
29/// Timeout for waiting for the encryption handshake to complete.
30const HANDSHAKE_TIMEOUT_SECS: u64 = 5;
31
32/// TCP connection to a Steam CM server.
33pub struct TcpConnection {
34    stream: TcpStream,
35    server: CmServer,
36    session_key: Option<SessionKey>,
37    /// Buffer for incomplete messages.
38    read_buffer: BytesMut,
39    /// Expected message length (if we're in the middle of reading).
40    expected_length: Option<u32>,
41}
42
43impl TcpConnection {
44    /// Connect to a Steam CM server via TCP.
45    ///
46    /// This establishes the TCP connection but does NOT perform the encryption
47    /// handshake. Call [`wait_for_encrypt_request`] followed by
48    /// [`complete_handshake`] to set up encryption before sending any
49    /// messages.
50    pub async fn connect(server: CmServer) -> Result<Self, SteamError> {
51        // Parse endpoint (host:port)
52        let endpoint = &server.endpoint;
53        info!("Connecting to TCP CM: {}", endpoint);
54
55        let stream = timeout(Duration::from_secs(CONNECTION_TIMEOUT_SECS), TcpStream::connect(endpoint)).await.map_err(|_| SteamError::Timeout)?.map_err(|e| SteamError::ConnectionError(format!("TCP connect failed: {}", e)))?;
56
57        debug!("TCP connection established to {}", endpoint);
58
59        Ok(Self { stream, server, session_key: None, read_buffer: BytesMut::with_capacity(8192), expected_length: None })
60    }
61
62    /// Wait for the ChannelEncryptRequest from the server.
63    ///
64    /// Returns the protocol version, universe, and nonce from the request.
65    pub async fn wait_for_encrypt_request(&mut self) -> Result<(u32, u32, [u8; 16]), SteamError> {
66        debug!("Waiting for ChannelEncryptRequest...");
67
68        // Read the raw message
69        let msg = timeout(Duration::from_secs(HANDSHAKE_TIMEOUT_SECS), self.recv_raw()).await.map_err(|_| SteamError::Timeout)??.ok_or_else(|| SteamError::ConnectionError("Connection closed".into()))?;
70
71        // Parse minimal header for ChannelEncryptRequest
72        // Header format: EMsg (4 bytes) + targetJobID (8 bytes) + sourceJobID (8 bytes)
73        if msg.len() < 20 + 16 + 8 {
74            return Err(SteamError::ProtocolError("ChannelEncryptRequest too short".into()));
75        }
76
77        let mut cursor = std::io::Cursor::new(&msg);
78        let raw_emsg = ReadBytesExt::read_u32::<LittleEndian>(&mut cursor).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
79        let emsg = EMsg::from_i32((raw_emsg & !0x80000000) as i32).unwrap_or(EMsg::Invalid);
80
81        if emsg != EMsg::ChannelEncryptRequest {
82            return Err(SteamError::ProtocolError(format!("Expected ChannelEncryptRequest, got {:?}", emsg)));
83        }
84
85        // Skip job IDs (8 + 8 bytes)
86        cursor.set_position(20);
87
88        // Read protocol, universe, nonce
89        let protocol = ReadBytesExt::read_u32::<LittleEndian>(&mut cursor).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
90        let universe = ReadBytesExt::read_u32::<LittleEndian>(&mut cursor).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
91
92        let mut nonce = [0u8; 16];
93        std::io::Read::read_exact(&mut cursor, &mut nonce).map_err(|e| SteamError::ProtocolError(format!("Failed to read nonce: {}", e)))?;
94
95        debug!("ChannelEncryptRequest: protocol={}, universe={}, nonce={}", protocol, universe, hex::encode(nonce));
96
97        Ok((protocol, universe, nonce))
98    }
99
100    /// Complete the encryption handshake.
101    ///
102    /// Generates a session key, sends ChannelEncryptResponse, and waits for
103    /// ChannelEncryptResult. After this succeeds, all messages will be
104    /// encrypted.
105    pub async fn complete_handshake(&mut self, protocol: u32, nonce: &[u8; 16]) -> Result<(), SteamError> {
106        // Generate session key and encrypt it
107        let key_pair = generate_session_key(nonce).map_err(|e| SteamError::ProtocolError(format!("Key generation failed: {}", e)))?;
108
109        let crc = calculate_key_crc(&key_pair.encrypted);
110
111        debug!("Generated session key, encrypted length={}, crc=0x{:08x}", key_pair.encrypted.len(), crc);
112
113        // Build ChannelEncryptResponse
114        // Header: EMsg (4) + targetJobID (8) + sourceJobID (8) = 20 bytes
115        // Body: protocol (4) + key_size (4) + encrypted_key + crc (4) + padding (4)
116        let body_len = 4 + 4 + key_pair.encrypted.len() + 4 + 4;
117        let mut response = Vec::with_capacity(20 + body_len);
118
119        // Write header
120        WriteBytesExt::write_u32::<LittleEndian>(&mut response, EMsg::ChannelEncryptResponse as u32)?;
121        WriteBytesExt::write_u64::<LittleEndian>(&mut response, u64::MAX)?; // targetJobID = JOBID_NONE
122        WriteBytesExt::write_u64::<LittleEndian>(&mut response, u64::MAX)?; // sourceJobID = JOBID_NONE
123
124        // Write body
125        WriteBytesExt::write_u32::<LittleEndian>(&mut response, protocol)?;
126        WriteBytesExt::write_u32::<LittleEndian>(&mut response, key_pair.encrypted.len() as u32)?;
127        response.extend_from_slice(&key_pair.encrypted);
128        WriteBytesExt::write_u32::<LittleEndian>(&mut response, crc)?;
129        WriteBytesExt::write_u32::<LittleEndian>(&mut response, 0)?; // padding
130
131        // Send response (unencrypted, with VT01 framing)
132        self.send_raw(&response).await?;
133
134        // Wait for ChannelEncryptResult
135        let result_msg = timeout(Duration::from_secs(HANDSHAKE_TIMEOUT_SECS), self.recv_raw()).await.map_err(|_| SteamError::Timeout)??.ok_or_else(|| SteamError::ConnectionError("Connection closed".into()))?;
136
137        // Parse ChannelEncryptResult
138        if result_msg.len() < 24 {
139            return Err(SteamError::ProtocolError("ChannelEncryptResult too short".into()));
140        }
141
142        let mut cursor = std::io::Cursor::new(&result_msg);
143        let raw_emsg = ReadBytesExt::read_u32::<LittleEndian>(&mut cursor).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
144        let emsg = EMsg::from_i32((raw_emsg & !0x80000000) as i32).unwrap_or(EMsg::Invalid);
145
146        if emsg != EMsg::ChannelEncryptResult {
147            return Err(SteamError::ProtocolError(format!("Expected ChannelEncryptResult, got {:?}", emsg)));
148        }
149
150        // Skip job IDs
151        cursor.set_position(20);
152
153        let eresult = EResult::from_i32(ReadBytesExt::read_u32::<LittleEndian>(&mut cursor).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))? as i32).unwrap_or(EResult::Fail);
154        if eresult != EResult::OK {
155            return Err(SteamError::ProtocolError(format!("ChannelEncryptResult failed with eresult={:?}", eresult)));
156        }
157
158        // Encryption is now active
159        self.session_key = Some(key_pair.plain);
160        info!("Encryption handshake completed successfully");
161
162        Ok(())
163    }
164
165    /// Send a binary message.
166    ///
167    /// If encryption is active, the message will be encrypted before sending.
168    pub async fn send(&mut self, data: Vec<u8>) -> Result<(), SteamError> {
169        let to_send = if let Some(ref key) = self.session_key { encrypt_with_hmac_iv(key, &data).map_err(|e| SteamError::ProtocolError(format!("Encryption failed: {}", e)))? } else { data };
170
171        self.send_raw(&to_send).await
172    }
173
174    /// Send raw bytes with VT01 framing (no encryption).
175    async fn send_raw(&mut self, data: &[u8]) -> Result<(), SteamError> {
176        // VT01 format: length (4 LE) + "VT01" + payload
177        let mut frame = Vec::with_capacity(8 + data.len());
178        WriteBytesExt::write_u32::<LittleEndian>(&mut frame, data.len() as u32)?;
179        frame.extend_from_slice(VT01_MAGIC);
180        frame.extend_from_slice(data);
181
182        self.stream.write_all(&frame).await.map_err(|e| SteamError::ConnectionError(format!("Write failed: {}", e)))?;
183
184        Ok(())
185    }
186
187    /// Receive a message.
188    ///
189    /// If encryption is active, the message will be decrypted.
190    pub async fn recv(&mut self) -> Result<Option<Bytes>, SteamError> {
191        let raw = match self.recv_raw().await {
192            Ok(Some(data)) => data,
193            Ok(None) => return Ok(None),
194            Err(e) => return Err(e),
195        };
196
197        if let Some(ref key) = self.session_key {
198            let decrypted = decrypt_with_hmac_iv(key, &raw).map_err(|e| SteamError::ProtocolError(format!("Decryption failed: {}", e)))?;
199            Ok(Some(Bytes::from(decrypted)))
200        } else {
201            Ok(Some(raw))
202        }
203    }
204
205    /// Receive raw bytes with VT01 framing (no decryption).
206    async fn recv_raw(&mut self) -> Result<Option<Bytes>, SteamError> {
207        loop {
208            // If we already know the expected length, try to read the message
209            if let Some(expected) = self.expected_length {
210                if self.read_buffer.len() >= expected as usize {
211                    let msg = self.read_buffer.split_to(expected as usize).freeze();
212                    self.expected_length = None;
213                    return Ok(Some(msg));
214                }
215            } else if self.read_buffer.len() >= 8 {
216                // Try to read the header
217                // We peek at the header first without consuming it yet
218                let mut header = &self.read_buffer[..8];
219                let length = byteorder::ReadBytesExt::read_u32::<LittleEndian>(&mut header).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
220                let magic = &self.read_buffer[4..8];
221
222                if magic != VT01_MAGIC {
223                    return Err(SteamError::ProtocolError("Invalid VT01 magic".into()));
224                }
225
226                // Remove the header from buffer
227                self.read_buffer.advance(8);
228                self.expected_length = Some(length);
229
230                // Check if we already have the full message
231                if self.read_buffer.len() >= length as usize {
232                    let msg = self.read_buffer.split_to(length as usize).freeze();
233                    self.expected_length = None;
234                    return Ok(Some(msg));
235                }
236            }
237
238            // Need more data
239            // Read directly into the buffer, avoiding intermediate stack allocation
240            match self.stream.read_buf(&mut self.read_buffer).await {
241                Ok(0) => {
242                    // Connection closed
243                    if self.read_buffer.is_empty() {
244                        return Ok(None);
245                    } else {
246                        return Err(SteamError::ConnectionError("Connection closed mid-message".into()));
247                    }
248                }
249                Ok(_) => {
250                    // Data was read directly into read_buffer, loop to process
251                    // it
252                }
253                Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
254                    continue;
255                }
256                Err(e) => {
257                    error!("TCP read error: {}", e);
258                    return Err(SteamError::ConnectionError(format!("Read failed: {}", e)));
259                }
260            }
261        }
262    }
263
264    /// Close the connection.
265    pub async fn close(mut self) -> Result<(), SteamError> {
266        self.stream.shutdown().await.map_err(|e| SteamError::ConnectionError(format!("Shutdown failed: {}", e)))
267    }
268
269    /// Get the connected server info.
270    pub fn server(&self) -> &CmServer {
271        &self.server
272    }
273
274    /// Check if encryption is active.
275    pub fn is_encrypted(&self) -> bool {
276        self.session_key.is_some()
277    }
278}
279
280use async_trait::async_trait;
281
282use super::traits::SteamConnection;
283
284#[async_trait]
285impl SteamConnection for TcpConnection {
286    async fn send(&mut self, data: Vec<u8>) -> Result<(), SteamError> {
287        TcpConnection::send(self, data).await
288    }
289
290    async fn recv(&mut self) -> Result<Option<Bytes>, SteamError> {
291        TcpConnection::recv(self).await
292    }
293
294    async fn close(self: Box<Self>) -> Result<(), SteamError> {
295        TcpConnection::close(*self).await
296    }
297
298    fn server(&self) -> &CmServer {
299        TcpConnection::server(self)
300    }
301
302    fn set_session_key(&mut self, key: Option<Vec<u8>>) {
303        self.session_key = key.and_then(|k| SessionKey::from_bytes(&k));
304    }
305}
306
307impl From<io::Error> for SteamError {
308    fn from(e: io::Error) -> Self {
309        SteamError::ConnectionError(e.to_string())
310    }
311}