use crate::eresult::EResult;
use crate::message::{EncodableMessage, MalformedBody, NetMessage};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use protobuf::Message;
use std::borrow::Cow;
use std::fmt::Debug;
use std::io::{Cursor, Seek, SeekFrom};
use steam_vent_crypto::CryptError;
use steam_vent_proto_common::{MsgKind, MsgKindEnum};
use steam_vent_proto_steam::enums_clientserver::EMsg;
use steam_vent_proto_steam::steammessages_base::CMsgProtoBufHeader;
use steamid_ng::SteamID;
use thiserror::Error;
use tracing::{debug, trace};
pub const PROTO_MASK: u32 = 0x80000000;
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum NetworkError {
#[error("{0}")]
IO(#[from] std::io::Error),
#[error("{0}")]
Ws(#[from] tokio_tungstenite::tungstenite::Error),
#[error("Invalid message header")]
InvalidHeader,
#[error("Invalid message kind {0}")]
InvalidMessageKind(i32),
#[error("Failed to perform crypto handshake")]
CryptoHandshakeFailed,
#[error("Different message expected, expected {0:?}, got {1:?}")]
DifferentMessage(MsgKind, MsgKind),
#[error("Different service method expected, expected {0:?}, got {1:?}")]
DifferentServiceMethod(&'static str, String),
#[error("{0}")]
MalformedBody(#[from] MalformedBody),
#[error("Crypto error: {0}")]
CryptoError(#[from] CryptError),
#[error("Unexpected end of stream")]
EOF,
#[error("Response timed out")]
Timeout,
#[error("Remote returned an error code: {0:?}")]
ApiError(EResult),
}
impl From<EResult> for NetworkError {
fn from(value: EResult) -> Self {
NetworkError::ApiError(value)
}
}
pub type Result<T, E = NetworkError> = std::result::Result<T, E>;
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub struct JobId(pub(crate) u64);
impl Default for JobId {
fn default() -> Self {
JobId::NONE
}
}
impl JobId {
pub const NONE: JobId = JobId(u64::MAX);
}
pub(crate) fn steam_id_nil() -> SteamID {
SteamID::try_from(0u64).unwrap()
}
#[derive(Debug, Clone)]
pub struct NetMessageHeader {
pub source_job_id: JobId,
pub target_job_id: JobId,
pub steam_id: SteamID,
pub session_id: i32,
pub target_job_name: Option<Cow<'static, str>>,
pub result: Option<i32>,
pub source_app_id: Option<u32>,
}
impl Default for NetMessageHeader {
fn default() -> Self {
Self {
source_job_id: JobId::default(),
target_job_id: JobId::default(),
steam_id: steam_id_nil(),
session_id: 0,
target_job_name: None,
result: None,
source_app_id: None,
}
}
}
impl From<CMsgProtoBufHeader> for NetMessageHeader {
fn from(header: CMsgProtoBufHeader) -> Self {
NetMessageHeader {
source_job_id: JobId(header.jobid_source()),
target_job_id: JobId(header.jobid_target()),
steam_id: header.steamid().try_into().unwrap_or(steam_id_nil()),
session_id: header.client_sessionid(),
target_job_name: header
.has_target_job_name()
.then(|| header.target_job_name().to_string().into()),
result: header.eresult,
source_app_id: header.routing_appid,
}
}
}
impl NetMessageHeader {
fn read<R: ReadBytesExt + Seek>(
mut reader: R,
kind: MsgKind,
is_protobuf: bool,
) -> Result<(Self, usize)> {
if is_protobuf {
let header_length = reader.read_u32::<LittleEndian>()?;
trace!("reading protobuf header of {} bytes", header_length);
let header = if header_length > 0 {
let mut bytes = vec![0; header_length as usize];
let num = reader.read(&mut bytes)?;
CMsgProtoBufHeader::parse_from_bytes(&bytes[0..num])
.map_err(|_| NetworkError::InvalidHeader)?
.into()
} else {
NetMessageHeader::default()
};
Ok((header, 8 + header_length as usize))
} else if kind == EMsg::k_EMsgChannelEncryptRequest
|| kind == EMsg::k_EMsgChannelEncryptResult
{
let target_job_id = reader.read_u64::<LittleEndian>()?;
let source_job_id = reader.read_u64::<LittleEndian>()?;
Ok((
NetMessageHeader {
target_job_id: JobId(target_job_id),
source_job_id: JobId(source_job_id),
session_id: 0,
steam_id: steam_id_nil(),
..NetMessageHeader::default()
},
4 + 8 + 8,
))
} else {
reader.seek(SeekFrom::Current(3))?; let target_job_id = reader.read_u64::<LittleEndian>()?;
let source_job_id = reader.read_u64::<LittleEndian>()?;
reader.seek(SeekFrom::Current(1))?; let steam_id = reader
.read_u64::<LittleEndian>()?
.try_into()
.unwrap_or(steam_id_nil());
let session_id = reader.read_i32::<LittleEndian>()?;
Ok((
NetMessageHeader {
source_job_id: JobId(source_job_id),
target_job_id: JobId(target_job_id),
steam_id,
session_id,
target_job_name: None,
result: None,
source_app_id: None,
},
4 + 3 + 8 + 8 + 1 + 8 + 4,
))
}
}
pub(crate) fn write<W: WriteBytesExt, K: MsgKindEnum>(
&self,
writer: &mut W,
kind: K,
proto: bool,
) -> std::io::Result<()> {
if MsgKind::from(kind) == EMsg::k_EMsgChannelEncryptResponse {
writer.write_u32::<LittleEndian>(kind.value() as u32)?;
} else if proto {
trace!("writing header for {:?} protobuf message: {:?}", kind, self);
let proto_header = self.proto_header(kind.into());
writer.write_u32::<LittleEndian>(kind.encode_kind(true))?;
writer.write_u32::<LittleEndian>(proto_header.compute_size() as u32)?;
proto_header.write_to_writer(writer)?;
} else {
trace!("writing header for {:?} message: {:?}", kind, self);
writer.write_u32::<LittleEndian>(kind.value() as u32)?;
writer.write_u8(32)?;
writer.write_u16::<LittleEndian>(2)?;
writer.write_u64::<LittleEndian>(self.target_job_id.0)?;
writer.write_u64::<LittleEndian>(self.source_job_id.0)?;
writer.write_u8(239)?;
writer.write_u64::<LittleEndian>(self.steam_id.into())?;
writer.write_i32::<LittleEndian>(self.session_id)?;
}
Ok(())
}
fn proto_header(&self, kind: MsgKind) -> CMsgProtoBufHeader {
let mut proto_header = CMsgProtoBufHeader::new();
if self.source_job_id != JobId::NONE {
proto_header.set_jobid_source(self.source_job_id.0);
}
if self.target_job_id != JobId::NONE {
proto_header.set_jobid_target(self.target_job_id.0);
}
if self.steam_id != steam_id_nil() {
proto_header.set_steamid(
if kind == EMsg::k_EMsgServiceMethodCallFromClientNonAuthed {
0
} else {
self.steam_id.into()
},
);
}
if self.session_id != 0 {
proto_header.set_client_sessionid(self.session_id);
}
if kind == EMsg::k_EMsgServiceMethodCallFromClientNonAuthed
|| kind == EMsg::k_EMsgServiceMethodCallFromClient
{
proto_header.set_realm(1);
}
if let Some(target_job_name) = self.target_job_name.as_deref() {
proto_header.set_target_job_name(target_job_name.into());
}
proto_header.routing_appid = self.source_app_id;
proto_header
}
pub fn encode_size(&self, kind: MsgKind, proto: bool) -> usize {
if kind == EMsg::k_EMsgChannelEncryptResponse {
4
} else if proto {
let proto_header = self.proto_header(kind);
4 + 4 + proto_header.compute_size() as usize
} else {
4 + 1 + 2 + 8 + 8 + 1 + 8 + 4 + 4
}
}
}
#[derive(Debug, Clone)]
pub struct RawNetMessage {
pub kind: MsgKind,
pub is_protobuf: bool,
pub header: NetMessageHeader,
pub data: BytesMut,
pub(crate) frame_header_buffer: Option<BytesMut>,
pub(crate) iv_buffer: Option<BytesMut>,
pub(crate) header_buffer: BytesMut,
}
pub(crate) fn decode_kind(kind: u32) -> (MsgKind, bool) {
let is_protobuf = kind & PROTO_MASK == PROTO_MASK;
let kind = MsgKind((kind & !PROTO_MASK) as i32);
(kind, is_protobuf)
}
impl RawNetMessage {
pub fn read<Body: Into<Bytes>>(body: Body) -> Result<Self> {
let mut value = BytesMut::from(body.into());
let mut reader = Cursor::new(&value);
let kind = reader
.read_u32::<LittleEndian>()
.map_err(|_| NetworkError::InvalidHeader)?;
let is_protobuf = kind & PROTO_MASK == PROTO_MASK;
let kind = MsgKind((kind & !PROTO_MASK) as i32);
trace!(
"reading header for {:?} {}message",
kind,
if is_protobuf { "protobuf " } else { "" }
);
let header_start = reader.position() as usize;
let (header, body_start) = NetMessageHeader::read(&mut reader, kind, is_protobuf)?;
value.advance(header_start);
let header_buffer = value.split_to(body_start - header_start);
Ok(RawNetMessage {
kind,
is_protobuf,
header,
data: value,
frame_header_buffer: None,
iv_buffer: None,
header_buffer,
})
}
pub fn from_message<T: NetMessage>(header: NetMessageHeader, message: T) -> Result<Self> {
Self::from_message_with_kind(header, message, T::KIND, T::IS_PROTOBUF)
}
pub fn from_message_with_kind<T: EncodableMessage, K: MsgKindEnum>(
mut header: NetMessageHeader,
message: T,
kind: K,
is_protobuf: bool,
) -> Result<Self> {
debug!("writing raw {:?} message", kind);
message.process_header(&mut header);
let body_size = message.encode_size();
let mut buff = BytesMut::with_capacity(
8 + 16 + header.encode_size(kind.into(), is_protobuf) + body_size + 16,
);
buff.extend([0; 8 + 16]);
let frame_header_buffer = buff.split_to(8);
let iv_buffer = buff.split_to(16);
{
let mut writer = (&mut buff).writer();
header.write(&mut writer, kind, is_protobuf)?;
}
let header_buffer = buff.split();
let mut writer = (&mut buff).writer();
message.write_body(&mut writer)?;
trace!("encoded body({} bytes): {:x?}", buff.len(), buff.as_ref());
Ok(RawNetMessage {
kind: kind.into(),
is_protobuf,
header,
data: buff,
frame_header_buffer: Some(frame_header_buffer),
iv_buffer: Some(iv_buffer),
header_buffer,
})
}
pub fn into_bytes(self) -> BytesMut {
let mut body = self.header_buffer;
body.unsplit(self.data);
body
}
}
impl RawNetMessage {
pub fn into_header_and_message<T: NetMessage>(self) -> Result<(NetMessageHeader, T)> {
if let Some(result) = self.header.result {
EResult::from_result(result)?;
}
if self.kind == T::KIND {
trace!(
"reading body of {:?} message({} bytes)",
self.kind,
self.data.len()
);
let body = T::read_body(self.data, &self.header)?;
Ok((self.header, body))
} else {
Err(NetworkError::DifferentMessage(T::KIND.into(), self.kind))
}
}
pub fn into_message<T: NetMessage>(self) -> Result<T> {
self.into_header_and_message().map(|(_, msg)| msg)
}
}