Skip to main content

steam_vent/
net.rs

1use crate::eresult::EResult;
2use crate::message::{EncodableMessage, MalformedBody, NetMessage};
3use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
4use bytes::{Buf, BufMut, Bytes, BytesMut};
5use protobuf::Message;
6use std::borrow::Cow;
7use std::fmt::Debug;
8use std::io::{Cursor, Seek, SeekFrom};
9use steam_vent_crypto::CryptError;
10use steam_vent_proto_common::{MsgKind, MsgKindEnum};
11use steam_vent_proto_steam::enums_clientserver::EMsg;
12use steam_vent_proto_steam::steammessages_base::CMsgProtoBufHeader;
13use steamid_ng::SteamID;
14use thiserror::Error;
15use tracing::{debug, trace};
16
17pub const PROTO_MASK: u32 = 0x80000000;
18
19#[derive(Debug, Error)]
20#[non_exhaustive]
21pub enum NetworkError {
22    #[error("{0}")]
23    IO(#[from] std::io::Error),
24    #[error("{0}")]
25    Ws(#[from] tokio_tungstenite::tungstenite::Error),
26    #[error("Invalid message header")]
27    InvalidHeader,
28    #[error("Invalid message kind {0}")]
29    InvalidMessageKind(i32),
30    #[error("Failed to perform crypto handshake")]
31    CryptoHandshakeFailed,
32    #[error("Different message expected, expected {0:?}, got {1:?}")]
33    DifferentMessage(MsgKind, MsgKind),
34    #[error("Different service method expected, expected {0:?}, got {1:?}")]
35    DifferentServiceMethod(&'static str, String),
36    #[error("{0}")]
37    MalformedBody(#[from] MalformedBody),
38    #[error("Crypto error: {0}")]
39    CryptoError(#[from] CryptError),
40    #[error("Unexpected end of stream")]
41    EOF,
42    #[error("Response timed out")]
43    Timeout,
44    #[error("Remote returned an error code: {0:?}")]
45    ApiError(EResult),
46}
47
48impl From<EResult> for NetworkError {
49    fn from(value: EResult) -> Self {
50        NetworkError::ApiError(value)
51    }
52}
53
54pub type Result<T, E = NetworkError> = std::result::Result<T, E>;
55
56/// A unique (per-session) identifier that links request-response pairs
57#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
58pub struct JobId(pub(crate) u64);
59
60impl Default for JobId {
61    fn default() -> Self {
62        JobId::NONE
63    }
64}
65
66impl JobId {
67    pub const NONE: JobId = JobId(u64::MAX);
68}
69
70pub(crate) fn steam_id_nil() -> SteamID {
71    SteamID::try_from(0u64).unwrap()
72}
73
74#[derive(Debug, Clone)]
75pub struct NetMessageHeader {
76    pub source_job_id: JobId,
77    pub target_job_id: JobId,
78    pub steam_id: SteamID,
79    pub session_id: i32,
80    pub target_job_name: Option<Cow<'static, str>>,
81    pub result: Option<i32>,
82    pub source_app_id: Option<u32>,
83}
84
85impl Default for NetMessageHeader {
86    fn default() -> Self {
87        Self {
88            source_job_id: JobId::default(),
89            target_job_id: JobId::default(),
90            steam_id: steam_id_nil(),
91            session_id: 0,
92            target_job_name: None,
93            result: None,
94            source_app_id: None,
95        }
96    }
97}
98
99impl From<CMsgProtoBufHeader> for NetMessageHeader {
100    fn from(header: CMsgProtoBufHeader) -> Self {
101        NetMessageHeader {
102            source_job_id: JobId(header.jobid_source()),
103            target_job_id: JobId(header.jobid_target()),
104            steam_id: header.steamid().try_into().unwrap_or(steam_id_nil()),
105            session_id: header.client_sessionid(),
106            target_job_name: header
107                .has_target_job_name()
108                .then(|| header.target_job_name().to_string().into()),
109            result: header.eresult,
110            source_app_id: header.routing_appid,
111        }
112    }
113}
114
115impl NetMessageHeader {
116    fn read<R: ReadBytesExt + Seek>(
117        mut reader: R,
118        kind: MsgKind,
119        is_protobuf: bool,
120    ) -> Result<(Self, usize)> {
121        if is_protobuf {
122            let header_length = reader.read_u32::<LittleEndian>()?;
123            trace!("reading protobuf header of {} bytes", header_length);
124            let header = if header_length > 0 {
125                let mut bytes = vec![0; header_length as usize];
126                let num = reader.read(&mut bytes)?;
127                CMsgProtoBufHeader::parse_from_bytes(&bytes[0..num])
128                    .map_err(|_| NetworkError::InvalidHeader)?
129                    .into()
130            } else {
131                NetMessageHeader::default()
132            };
133            Ok((header, 8 + header_length as usize))
134        } else if kind == EMsg::k_EMsgChannelEncryptRequest
135            || kind == EMsg::k_EMsgChannelEncryptResult
136        {
137            let target_job_id = reader.read_u64::<LittleEndian>()?;
138            let source_job_id = reader.read_u64::<LittleEndian>()?;
139            Ok((
140                NetMessageHeader {
141                    target_job_id: JobId(target_job_id),
142                    source_job_id: JobId(source_job_id),
143                    session_id: 0,
144                    steam_id: steam_id_nil(),
145                    ..NetMessageHeader::default()
146                },
147                4 + 8 + 8,
148            ))
149        } else {
150            reader.seek(SeekFrom::Current(3))?; // 1 byte (fixed) header size, 2 bytes (fixed) header version
151            let target_job_id = reader.read_u64::<LittleEndian>()?;
152            let source_job_id = reader.read_u64::<LittleEndian>()?;
153            reader.seek(SeekFrom::Current(1))?; // header canary (fixed)
154            let steam_id = reader
155                .read_u64::<LittleEndian>()?
156                .try_into()
157                .unwrap_or(steam_id_nil());
158            let session_id = reader.read_i32::<LittleEndian>()?;
159            Ok((
160                NetMessageHeader {
161                    source_job_id: JobId(source_job_id),
162                    target_job_id: JobId(target_job_id),
163                    steam_id,
164                    session_id,
165                    target_job_name: None,
166                    result: None,
167                    source_app_id: None,
168                },
169                4 + 3 + 8 + 8 + 1 + 8 + 4,
170            ))
171        }
172    }
173
174    pub(crate) fn write<W: WriteBytesExt, K: MsgKindEnum>(
175        &self,
176        writer: &mut W,
177        kind: K,
178        proto: bool,
179    ) -> std::io::Result<()> {
180        if MsgKind::from(kind) == EMsg::k_EMsgChannelEncryptResponse {
181            writer.write_u32::<LittleEndian>(kind.value() as u32)?;
182        } else if proto {
183            trace!("writing header for {:?} protobuf message: {:?}", kind, self);
184            let proto_header = self.proto_header(kind.into());
185            writer.write_u32::<LittleEndian>(kind.encode_kind(true))?;
186            writer.write_u32::<LittleEndian>(proto_header.compute_size() as u32)?;
187            proto_header.write_to_writer(writer)?;
188        } else {
189            trace!("writing header for {:?} message: {:?}", kind, self);
190            writer.write_u32::<LittleEndian>(kind.value() as u32)?;
191            writer.write_u8(32)?;
192            writer.write_u16::<LittleEndian>(2)?;
193            writer.write_u64::<LittleEndian>(self.target_job_id.0)?;
194            writer.write_u64::<LittleEndian>(self.source_job_id.0)?;
195            writer.write_u8(239)?;
196            writer.write_u64::<LittleEndian>(self.steam_id.into())?;
197            writer.write_i32::<LittleEndian>(self.session_id)?;
198        }
199        Ok(())
200    }
201
202    fn proto_header(&self, kind: MsgKind) -> CMsgProtoBufHeader {
203        let mut proto_header = CMsgProtoBufHeader::new();
204        if self.source_job_id != JobId::NONE {
205            proto_header.set_jobid_source(self.source_job_id.0);
206        }
207        if self.target_job_id != JobId::NONE {
208            proto_header.set_jobid_target(self.target_job_id.0);
209        }
210        if self.steam_id != steam_id_nil() {
211            proto_header.set_steamid(
212                if kind == EMsg::k_EMsgServiceMethodCallFromClientNonAuthed {
213                    0
214                } else {
215                    self.steam_id.into()
216                },
217            );
218        }
219        if self.session_id != 0 {
220            proto_header.set_client_sessionid(self.session_id);
221        }
222        if kind == EMsg::k_EMsgServiceMethodCallFromClientNonAuthed
223            || kind == EMsg::k_EMsgServiceMethodCallFromClient
224        {
225            proto_header.set_realm(1);
226        }
227        if let Some(target_job_name) = self.target_job_name.as_deref() {
228            proto_header.set_target_job_name(target_job_name.into());
229        }
230        proto_header.routing_appid = self.source_app_id;
231        proto_header
232    }
233
234    pub fn encode_size(&self, kind: MsgKind, proto: bool) -> usize {
235        if kind == EMsg::k_EMsgChannelEncryptResponse {
236            4
237        } else if proto {
238            let proto_header = self.proto_header(kind);
239            4 + 4 + proto_header.compute_size() as usize
240        } else {
241            4 + 1 + 2 + 8 + 8 + 1 + 8 + 4 + 4
242        }
243    }
244}
245
246#[derive(Debug, Clone)]
247pub struct RawNetMessage {
248    pub kind: MsgKind,
249    pub is_protobuf: bool,
250    pub header: NetMessageHeader,
251    pub data: BytesMut,
252    pub(crate) frame_header_buffer: Option<BytesMut>,
253    pub(crate) iv_buffer: Option<BytesMut>,
254    pub(crate) header_buffer: BytesMut,
255}
256
257pub(crate) fn decode_kind(kind: u32) -> (MsgKind, bool) {
258    let is_protobuf = kind & PROTO_MASK == PROTO_MASK;
259    let kind = MsgKind((kind & !PROTO_MASK) as i32);
260    (kind, is_protobuf)
261}
262
263impl RawNetMessage {
264    pub fn read<Body: Into<Bytes>>(body: Body) -> Result<Self> {
265        let mut value = BytesMut::from(body.into());
266        let mut reader = Cursor::new(&value);
267        let kind = reader
268            .read_u32::<LittleEndian>()
269            .map_err(|_| NetworkError::InvalidHeader)?;
270
271        let is_protobuf = kind & PROTO_MASK == PROTO_MASK;
272        let kind = MsgKind((kind & !PROTO_MASK) as i32);
273
274        trace!(
275            "reading header for {:?} {}message",
276            kind,
277            if is_protobuf { "protobuf " } else { "" }
278        );
279
280        let header_start = reader.position() as usize;
281        let (header, body_start) = NetMessageHeader::read(&mut reader, kind, is_protobuf)?;
282
283        value.advance(header_start);
284        let header_buffer = value.split_to(body_start - header_start);
285
286        Ok(RawNetMessage {
287            kind,
288            is_protobuf,
289            header,
290            data: value,
291            frame_header_buffer: None,
292            iv_buffer: None,
293            header_buffer,
294        })
295    }
296
297    pub fn from_message<T: NetMessage>(header: NetMessageHeader, message: T) -> Result<Self> {
298        Self::from_message_with_kind(header, message, T::KIND, T::IS_PROTOBUF)
299    }
300
301    pub fn from_message_with_kind<T: EncodableMessage, K: MsgKindEnum>(
302        mut header: NetMessageHeader,
303        message: T,
304        kind: K,
305        is_protobuf: bool,
306    ) -> Result<Self> {
307        debug!("writing raw {:?} message", kind);
308
309        message.process_header(&mut header);
310
311        let body_size = message.encode_size();
312
313        // allocate the buffer with extra bytes and split those off
314        // this allows later re-joining the bytes and use the space for the frame header and iv
315        // without having to copy the message again
316        //
317        // 8 byte frame header, 16 byte iv, header, body, 16 byte encryption padding
318        let mut buff = BytesMut::with_capacity(
319            8 + 16 + header.encode_size(kind.into(), is_protobuf) + body_size + 16,
320        );
321        buff.extend([0; 8 + 16]);
322        let frame_header_buffer = buff.split_to(8);
323        let iv_buffer = buff.split_to(16);
324
325        {
326            let mut writer = (&mut buff).writer();
327            header.write(&mut writer, kind, is_protobuf)?;
328        }
329
330        let header_buffer = buff.split();
331        let mut writer = (&mut buff).writer();
332        message.write_body(&mut writer)?;
333        trace!("encoded body({} bytes): {:x?}", buff.len(), buff.as_ref());
334
335        Ok(RawNetMessage {
336            kind: kind.into(),
337            is_protobuf,
338            header,
339            data: buff,
340            frame_header_buffer: Some(frame_header_buffer),
341            iv_buffer: Some(iv_buffer),
342            header_buffer,
343        })
344    }
345
346    /// Return a buffer containing the raw message bytes
347    pub fn into_bytes(self) -> BytesMut {
348        let mut body = self.header_buffer;
349        body.unsplit(self.data);
350        body
351    }
352}
353
354impl RawNetMessage {
355    pub fn into_header_and_message<T: NetMessage>(self) -> Result<(NetMessageHeader, T)> {
356        if let Some(result) = self.header.result {
357            EResult::from_result(result)?;
358        }
359        if self.kind == T::KIND {
360            trace!(
361                "reading body of {:?} message({} bytes)",
362                self.kind,
363                self.data.len()
364            );
365            let body = T::read_body(self.data, &self.header)?;
366            Ok((self.header, body))
367        } else {
368            Err(NetworkError::DifferentMessage(T::KIND.into(), self.kind))
369        }
370    }
371
372    pub fn into_message<T: NetMessage>(self) -> Result<T> {
373        self.into_header_and_message().map(|(_, msg)| msg)
374    }
375}