#![allow(dead_code)]
use std::io::{self, ErrorKind};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use bytes::{Buf, Bytes, BytesMut};
use steam_crypto::{calculate_key_crc, decrypt_with_hmac_iv, encrypt_with_hmac_iv, generate_session_key, SessionKey};
use steam_enums::{EMsg, EResult};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
time::{timeout, Duration},
};
use tracing::{debug, error, info};
use crate::{connection::CmServer, error::SteamError};
const VT01_MAGIC: &[u8; 4] = b"VT01";
const CONNECTION_TIMEOUT_SECS: u64 = 10;
const HANDSHAKE_TIMEOUT_SECS: u64 = 5;
pub struct TcpConnection {
stream: TcpStream,
server: CmServer,
session_key: Option<SessionKey>,
read_buffer: BytesMut,
expected_length: Option<u32>,
}
impl TcpConnection {
pub async fn connect(server: CmServer) -> Result<Self, SteamError> {
let endpoint = &server.endpoint;
info!("Connecting to TCP CM: {}", endpoint);
let stream = timeout(Duration::from_secs(CONNECTION_TIMEOUT_SECS), TcpStream::connect(endpoint)).await.map_err(|_| SteamError::Timeout)?.map_err(|e| SteamError::ConnectionError(format!("TCP connect failed: {}", e)))?;
debug!("TCP connection established to {}", endpoint);
Ok(Self { stream, server, session_key: None, read_buffer: BytesMut::with_capacity(8192), expected_length: None })
}
pub async fn wait_for_encrypt_request(&mut self) -> Result<(u32, u32, [u8; 16]), SteamError> {
debug!("Waiting for ChannelEncryptRequest...");
let msg = timeout(Duration::from_secs(HANDSHAKE_TIMEOUT_SECS), self.recv_raw()).await.map_err(|_| SteamError::Timeout)??.ok_or_else(|| SteamError::ConnectionError("Connection closed".into()))?;
if msg.len() < 20 + 16 + 8 {
return Err(SteamError::ProtocolError("ChannelEncryptRequest too short".into()));
}
let mut cursor = std::io::Cursor::new(&msg);
let raw_emsg = ReadBytesExt::read_u32::<LittleEndian>(&mut cursor).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let emsg = EMsg::from_i32((raw_emsg & !0x80000000) as i32).unwrap_or(EMsg::Invalid);
if emsg != EMsg::ChannelEncryptRequest {
return Err(SteamError::ProtocolError(format!("Expected ChannelEncryptRequest, got {:?}", emsg)));
}
cursor.set_position(20);
let protocol = ReadBytesExt::read_u32::<LittleEndian>(&mut cursor).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let universe = ReadBytesExt::read_u32::<LittleEndian>(&mut cursor).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let mut nonce = [0u8; 16];
std::io::Read::read_exact(&mut cursor, &mut nonce).map_err(|e| SteamError::ProtocolError(format!("Failed to read nonce: {}", e)))?;
debug!("ChannelEncryptRequest: protocol={}, universe={}, nonce={}", protocol, universe, hex::encode(nonce));
Ok((protocol, universe, nonce))
}
pub async fn complete_handshake(&mut self, protocol: u32, nonce: &[u8; 16]) -> Result<(), SteamError> {
let nonce_owned: [u8; 16] = *nonce;
let key_pair = tokio::task::spawn_blocking(move || generate_session_key(&nonce_owned))
.await
.map_err(|e| SteamError::ProtocolError(format!("Key generation task join failed: {}", e)))?
.map_err(|e| SteamError::ProtocolError(format!("Key generation failed: {}", e)))?;
let crc = calculate_key_crc(&key_pair.encrypted);
debug!("Generated session key, encrypted length={}, crc=0x{:08x}", key_pair.encrypted.len(), crc);
let body_len = 4 + 4 + key_pair.encrypted.len() + 4 + 4;
let mut response = Vec::with_capacity(20 + body_len);
WriteBytesExt::write_u32::<LittleEndian>(&mut response, EMsg::ChannelEncryptResponse as u32)?;
WriteBytesExt::write_u64::<LittleEndian>(&mut response, u64::MAX)?; WriteBytesExt::write_u64::<LittleEndian>(&mut response, u64::MAX)?;
WriteBytesExt::write_u32::<LittleEndian>(&mut response, protocol)?;
WriteBytesExt::write_u32::<LittleEndian>(&mut response, key_pair.encrypted.len() as u32)?;
response.extend_from_slice(&key_pair.encrypted);
WriteBytesExt::write_u32::<LittleEndian>(&mut response, crc)?;
WriteBytesExt::write_u32::<LittleEndian>(&mut response, 0)?;
self.send_raw(&response).await?;
let result_msg = timeout(Duration::from_secs(HANDSHAKE_TIMEOUT_SECS), self.recv_raw()).await.map_err(|_| SteamError::Timeout)??.ok_or_else(|| SteamError::ConnectionError("Connection closed".into()))?;
if result_msg.len() < 24 {
return Err(SteamError::ProtocolError("ChannelEncryptResult too short".into()));
}
let mut cursor = std::io::Cursor::new(&result_msg);
let raw_emsg = ReadBytesExt::read_u32::<LittleEndian>(&mut cursor).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let emsg = EMsg::from_i32((raw_emsg & !0x80000000) as i32).unwrap_or(EMsg::Invalid);
if emsg != EMsg::ChannelEncryptResult {
return Err(SteamError::ProtocolError(format!("Expected ChannelEncryptResult, got {:?}", emsg)));
}
cursor.set_position(20);
let eresult = EResult::from_i32(ReadBytesExt::read_u32::<LittleEndian>(&mut cursor).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))? as i32).unwrap_or(EResult::Fail);
if eresult != EResult::OK {
return Err(SteamError::ProtocolError(format!("ChannelEncryptResult failed with eresult={:?}", eresult)));
}
self.session_key = Some(key_pair.plain);
info!("Encryption handshake completed successfully");
Ok(())
}
pub async fn send(&mut self, data: Vec<u8>) -> Result<(), SteamError> {
let to_send = if let Some(key) = self.session_key.clone() {
tokio::task::spawn_blocking(move || encrypt_with_hmac_iv(&key, &data))
.await
.map_err(|e| SteamError::ProtocolError(format!("Encryption task join failed: {}", e)))?
.map_err(|e| SteamError::ProtocolError(format!("Encryption failed: {}", e)))?
} else {
data
};
self.send_raw(&to_send).await
}
async fn send_raw(&mut self, data: &[u8]) -> Result<(), SteamError> {
let mut frame = Vec::with_capacity(8 + data.len());
WriteBytesExt::write_u32::<LittleEndian>(&mut frame, data.len() as u32)?;
frame.extend_from_slice(VT01_MAGIC);
frame.extend_from_slice(data);
timeout(Duration::from_secs(30), self.stream.write_all(&frame))
.await
.map_err(|_| SteamError::NetworkError(std::io::Error::from(std::io::ErrorKind::TimedOut)))?
.map_err(|e| SteamError::ConnectionError(format!("Write failed: {}", e)))?;
Ok(())
}
pub async fn recv(&mut self) -> Result<Option<Bytes>, SteamError> {
let raw = match self.recv_raw().await {
Ok(Some(data)) => data,
Ok(None) => return Ok(None),
Err(e) => return Err(e),
};
if let Some(key) = self.session_key.clone() {
let raw_vec = raw.to_vec();
let decrypted = tokio::task::spawn_blocking(move || decrypt_with_hmac_iv(&key, &raw_vec))
.await
.map_err(|e| SteamError::ProtocolError(format!("Decryption task join failed: {}", e)))?
.map_err(|e| SteamError::ProtocolError(format!("Decryption failed: {}", e)))?;
Ok(Some(Bytes::from(decrypted)))
} else {
Ok(Some(raw))
}
}
async fn recv_raw(&mut self) -> Result<Option<Bytes>, SteamError> {
loop {
if let Some(expected) = self.expected_length {
if self.read_buffer.len() >= expected as usize {
let msg = self.read_buffer.split_to(expected as usize).freeze();
self.expected_length = None;
return Ok(Some(msg));
}
} else if self.read_buffer.len() >= 8 {
let mut header = &self.read_buffer[..8];
let length = byteorder::ReadBytesExt::read_u32::<LittleEndian>(&mut header).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let magic = &self.read_buffer[4..8];
if magic != VT01_MAGIC {
return Err(SteamError::ProtocolError("Invalid VT01 magic".into()));
}
self.read_buffer.advance(8);
self.expected_length = Some(length);
if self.read_buffer.len() >= length as usize {
let msg = self.read_buffer.split_to(length as usize).freeze();
self.expected_length = None;
return Ok(Some(msg));
}
}
let read_result = timeout(Duration::from_secs(30), self.stream.read_buf(&mut self.read_buffer))
.await
.map_err(|_| SteamError::NetworkError(std::io::Error::from(std::io::ErrorKind::TimedOut)))?;
match read_result {
Ok(0) => {
if self.read_buffer.is_empty() {
return Ok(None);
} else {
return Err(SteamError::ConnectionError("Connection closed mid-message".into()));
}
}
Ok(_) => {
}
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
error!("TCP read error: {}", e);
return Err(SteamError::ConnectionError(format!("Read failed: {}", e)));
}
}
}
}
pub async fn close(mut self) -> Result<(), SteamError> {
timeout(Duration::from_secs(10), self.stream.shutdown())
.await
.map_err(|_| SteamError::NetworkError(std::io::Error::from(std::io::ErrorKind::TimedOut)))?
.map_err(|e| SteamError::ConnectionError(format!("Shutdown failed: {}", e)))
}
pub fn server(&self) -> &CmServer {
&self.server
}
pub fn is_encrypted(&self) -> bool {
self.session_key.is_some()
}
}
use async_trait::async_trait;
use super::traits::SteamConnection;
#[async_trait]
impl SteamConnection for TcpConnection {
async fn send(&mut self, data: Vec<u8>) -> Result<(), SteamError> {
TcpConnection::send(self, data).await
}
async fn recv(&mut self) -> Result<Option<Bytes>, SteamError> {
TcpConnection::recv(self).await
}
async fn close(self: Box<Self>) -> Result<(), SteamError> {
TcpConnection::close(*self).await
}
fn server(&self) -> &CmServer {
TcpConnection::server(self)
}
fn set_session_key(&mut self, key: Option<Vec<u8>>) {
self.session_key = key.and_then(|k| SessionKey::from_bytes(&k));
}
}
impl From<io::Error> for SteamError {
fn from(e: io::Error) -> Self {
SteamError::ConnectionError(e.to_string())
}
}