steam_vent/
net.rs

1use crate::eresult::EResult;
2use crate::message::{EncodableMessage, MalformedBody, NetMessage};
3use crate::proto::steammessages_base::CMsgProtoBufHeader;
4use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
5use bytes::{Buf, BufMut, Bytes, BytesMut};
6use protobuf::Message;
7use std::borrow::Cow;
8use std::fmt::Debug;
9use std::io::{Cursor, Seek, SeekFrom};
10use steam_vent_crypto::CryptError;
11use steam_vent_proto::enums_clientserver::EMsg;
12use steam_vent_proto::{MsgKind, MsgKindEnum};
13use steamid_ng::SteamID;
14use thiserror::Error;
15use tracing::{debug, trace};
16
17pub const PROTO_MASK: u32 = 0x80000000;
18
19#[derive(Debug, Error)]
20#[non_exhaustive]
21pub enum NetworkError {
22    #[error("{0}")]
23    IO(#[from] std::io::Error),
24    #[error("{0}")]
25    Ws(#[from] tokio_tungstenite::tungstenite::Error),
26    #[error("Invalid message header")]
27    InvalidHeader,
28    #[error("Invalid message kind {0}")]
29    InvalidMessageKind(i32),
30    #[error("Failed to perform crypto handshake")]
31    CryptoHandshakeFailed,
32    #[error("Different message expected, expected {0:?}, got {1:?}")]
33    DifferentMessage(MsgKind, MsgKind),
34    #[error("Different service method expected, expected {0:?}, got {1:?}")]
35    DifferentServiceMethod(&'static str, String),
36    #[error("{0}")]
37    MalformedBody(#[from] MalformedBody),
38    #[error("Crypto error: {0}")]
39    CryptoError(#[from] CryptError),
40    #[error("Unexpected end of stream")]
41    EOF,
42    #[error("Response timed out")]
43    Timeout,
44    #[error("Remote returned an error code: {0:?}")]
45    ApiError(EResult),
46}
47
48impl From<EResult> for NetworkError {
49    fn from(value: EResult) -> Self {
50        NetworkError::ApiError(value)
51    }
52}
53
54pub type Result<T, E = NetworkError> = std::result::Result<T, E>;
55
56/// A unique (per-session) identifier that links request-response pairs
57#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
58pub struct JobId(pub(crate) u64);
59
60impl Default for JobId {
61    fn default() -> Self {
62        JobId::NONE
63    }
64}
65
66impl JobId {
67    pub const NONE: JobId = JobId(u64::MAX);
68}
69
70#[derive(Debug, Default, Clone)]
71pub struct NetMessageHeader {
72    pub source_job_id: JobId,
73    pub target_job_id: JobId,
74    pub steam_id: SteamID,
75    pub session_id: i32,
76    pub target_job_name: Option<Cow<'static, str>>,
77    pub result: Option<i32>,
78    pub source_app_id: Option<u32>,
79}
80
81impl From<CMsgProtoBufHeader> for NetMessageHeader {
82    fn from(header: CMsgProtoBufHeader) -> Self {
83        NetMessageHeader {
84            source_job_id: JobId(header.jobid_source()),
85            target_job_id: JobId(header.jobid_target()),
86            steam_id: header.steamid().try_into().unwrap_or_default(),
87            session_id: header.client_sessionid(),
88            target_job_name: header
89                .has_target_job_name()
90                .then(|| header.target_job_name().to_string().into()),
91            result: header.eresult,
92            source_app_id: header.routing_appid,
93        }
94    }
95}
96
97impl NetMessageHeader {
98    fn read<R: ReadBytesExt + Seek>(
99        mut reader: R,
100        kind: MsgKind,
101        is_protobuf: bool,
102    ) -> Result<(Self, usize)> {
103        if is_protobuf {
104            let header_length = reader.read_u32::<LittleEndian>()?;
105            trace!("reading protobuf header of {} bytes", header_length);
106            let header = if header_length > 0 {
107                let mut bytes = vec![0; header_length as usize];
108                let num = reader.read(&mut bytes)?;
109                CMsgProtoBufHeader::parse_from_bytes(&bytes[0..num])
110                    .map_err(|_| NetworkError::InvalidHeader)?
111                    .into()
112            } else {
113                NetMessageHeader::default()
114            };
115            Ok((header, 8 + header_length as usize))
116        } else if kind == EMsg::k_EMsgChannelEncryptRequest
117            || kind == EMsg::k_EMsgChannelEncryptResult
118        {
119            let target_job_id = reader.read_u64::<LittleEndian>()?;
120            let source_job_id = reader.read_u64::<LittleEndian>()?;
121            Ok((
122                NetMessageHeader {
123                    target_job_id: JobId(target_job_id),
124                    source_job_id: JobId(source_job_id),
125                    session_id: 0,
126                    steam_id: SteamID::default(),
127                    ..NetMessageHeader::default()
128                },
129                4 + 8 + 8,
130            ))
131        } else {
132            reader.seek(SeekFrom::Current(3))?; // 1 byte (fixed) header size, 2 bytes (fixed) header version
133            let target_job_id = reader.read_u64::<LittleEndian>()?;
134            let source_job_id = reader.read_u64::<LittleEndian>()?;
135            reader.seek(SeekFrom::Current(1))?; // header canary (fixed)
136            let steam_id = reader
137                .read_u64::<LittleEndian>()?
138                .try_into()
139                .unwrap_or_default();
140            let session_id = reader.read_i32::<LittleEndian>()?;
141            Ok((
142                NetMessageHeader {
143                    source_job_id: JobId(source_job_id),
144                    target_job_id: JobId(target_job_id),
145                    steam_id,
146                    session_id,
147                    target_job_name: None,
148                    result: None,
149                    source_app_id: None,
150                },
151                4 + 3 + 8 + 8 + 1 + 8 + 4,
152            ))
153        }
154    }
155
156    pub(crate) fn write<W: WriteBytesExt, K: MsgKindEnum>(
157        &self,
158        writer: &mut W,
159        kind: K,
160        proto: bool,
161    ) -> std::io::Result<()> {
162        if MsgKind::from(kind) == EMsg::k_EMsgChannelEncryptResponse {
163            writer.write_u32::<LittleEndian>(kind.value() as u32)?;
164        } else if proto {
165            trace!("writing header for {:?} protobuf message: {:?}", kind, self);
166            let proto_header = self.proto_header(kind.into());
167            writer.write_u32::<LittleEndian>(kind.encode_kind(true))?;
168            writer.write_u32::<LittleEndian>(proto_header.compute_size() as u32)?;
169            proto_header.write_to_writer(writer)?;
170        } else {
171            trace!("writing header for {:?} message: {:?}", kind, self);
172            writer.write_u32::<LittleEndian>(kind.value() as u32)?;
173            writer.write_u8(32)?;
174            writer.write_u16::<LittleEndian>(2)?;
175            writer.write_u64::<LittleEndian>(self.target_job_id.0)?;
176            writer.write_u64::<LittleEndian>(self.source_job_id.0)?;
177            writer.write_u8(239)?;
178            writer.write_u64::<LittleEndian>(self.steam_id.into())?;
179            writer.write_i32::<LittleEndian>(self.session_id)?;
180        }
181        Ok(())
182    }
183
184    fn proto_header(&self, kind: MsgKind) -> CMsgProtoBufHeader {
185        let mut proto_header = CMsgProtoBufHeader::new();
186        if self.source_job_id != JobId::NONE {
187            proto_header.set_jobid_source(self.source_job_id.0);
188        }
189        if self.target_job_id != JobId::NONE {
190            proto_header.set_jobid_target(self.target_job_id.0);
191        }
192        if self.steam_id != SteamID::default() {
193            proto_header.set_steamid(
194                if kind == EMsg::k_EMsgServiceMethodCallFromClientNonAuthed {
195                    0
196                } else {
197                    self.steam_id.into()
198                },
199            );
200        }
201        if self.session_id != 0 {
202            proto_header.set_client_sessionid(self.session_id);
203        }
204        if kind == EMsg::k_EMsgServiceMethodCallFromClientNonAuthed
205            || kind == EMsg::k_EMsgServiceMethodCallFromClient
206        {
207            proto_header.set_realm(1);
208        }
209        if let Some(target_job_name) = self.target_job_name.as_deref() {
210            proto_header.set_target_job_name(target_job_name.into());
211        }
212        proto_header.routing_appid = self.source_app_id;
213        proto_header
214    }
215
216    pub fn encode_size(&self, kind: MsgKind, proto: bool) -> usize {
217        if kind == EMsg::k_EMsgChannelEncryptResponse {
218            4
219        } else if proto {
220            let proto_header = self.proto_header(kind);
221            4 + 4 + proto_header.compute_size() as usize
222        } else {
223            4 + 1 + 2 + 8 + 8 + 1 + 8 + 4 + 4
224        }
225    }
226}
227
228#[derive(Debug, Clone)]
229pub struct RawNetMessage {
230    pub kind: MsgKind,
231    pub is_protobuf: bool,
232    pub header: NetMessageHeader,
233    pub data: BytesMut,
234    pub(crate) frame_header_buffer: Option<BytesMut>,
235    pub(crate) iv_buffer: Option<BytesMut>,
236    pub(crate) header_buffer: BytesMut,
237}
238
239pub(crate) fn decode_kind(kind: u32) -> (MsgKind, bool) {
240    let is_protobuf = kind & PROTO_MASK == PROTO_MASK;
241    let kind = MsgKind((kind & !PROTO_MASK) as i32);
242    (kind, is_protobuf)
243}
244
245impl RawNetMessage {
246    pub fn read<Body: Into<Bytes>>(body: Body) -> Result<Self> {
247        let mut value = BytesMut::from(body.into());
248        let mut reader = Cursor::new(&value);
249        let kind = reader
250            .read_u32::<LittleEndian>()
251            .map_err(|_| NetworkError::InvalidHeader)?;
252
253        let is_protobuf = kind & PROTO_MASK == PROTO_MASK;
254        let kind = MsgKind((kind & !PROTO_MASK) as i32);
255
256        trace!(
257            "reading header for {:?} {}message",
258            kind,
259            if is_protobuf { "protobuf " } else { "" }
260        );
261
262        let header_start = reader.position() as usize;
263        let (header, body_start) = NetMessageHeader::read(&mut reader, kind, is_protobuf)?;
264
265        value.advance(header_start);
266        let header_buffer = value.split_to(body_start - header_start);
267
268        Ok(RawNetMessage {
269            kind,
270            is_protobuf,
271            header,
272            data: value,
273            frame_header_buffer: None,
274            iv_buffer: None,
275            header_buffer,
276        })
277    }
278
279    pub fn from_message<T: NetMessage>(header: NetMessageHeader, message: T) -> Result<Self> {
280        Self::from_message_with_kind(header, message, T::KIND, T::IS_PROTOBUF)
281    }
282
283    pub fn from_message_with_kind<T: EncodableMessage, K: MsgKindEnum>(
284        mut header: NetMessageHeader,
285        message: T,
286        kind: K,
287        is_protobuf: bool,
288    ) -> Result<Self> {
289        debug!("writing raw {:?} message", kind);
290
291        message.process_header(&mut header);
292
293        let body_size = message.encode_size();
294
295        // allocate the buffer with extra bytes and split those off
296        // this allows later re-joining the bytes and use the space for the frame header and iv
297        // without having to copy the message again
298        //
299        // 8 byte frame header, 16 byte iv, header, body, 16 byte encryption padding
300        let mut buff = BytesMut::with_capacity(
301            8 + 16 + header.encode_size(kind.into(), is_protobuf) + body_size + 16,
302        );
303        buff.extend([0; 8 + 16]);
304        let frame_header_buffer = buff.split_to(8);
305        let iv_buffer = buff.split_to(16);
306
307        {
308            let mut writer = (&mut buff).writer();
309            header.write(&mut writer, kind, is_protobuf)?;
310        }
311
312        let header_buffer = buff.split();
313        let mut writer = (&mut buff).writer();
314        message.write_body(&mut writer)?;
315        trace!("encoded body({} bytes): {:x?}", buff.len(), buff.as_ref());
316
317        Ok(RawNetMessage {
318            kind: kind.into(),
319            is_protobuf,
320            header,
321            data: buff,
322            frame_header_buffer: Some(frame_header_buffer),
323            iv_buffer: Some(iv_buffer),
324            header_buffer,
325        })
326    }
327
328    /// Return a buffer containing the raw message bytes
329    pub fn into_bytes(self) -> BytesMut {
330        let mut body = self.header_buffer;
331        body.unsplit(self.data);
332        body
333    }
334}
335
336impl RawNetMessage {
337    pub fn into_header_and_message<T: NetMessage>(self) -> Result<(NetMessageHeader, T)> {
338        if let Some(result) = self.header.result {
339            EResult::from_result(result)?;
340        }
341        if self.kind == T::KIND {
342            trace!(
343                "reading body of {:?} message({} bytes)",
344                self.kind,
345                self.data.len()
346            );
347            let body = T::read_body(self.data, &self.header)?;
348            Ok((self.header, body))
349        } else {
350            Err(NetworkError::DifferentMessage(T::KIND.into(), self.kind))
351        }
352    }
353
354    pub fn into_message<T: NetMessage>(self) -> Result<T> {
355        self.into_header_and_message().map(|(_, msg)| msg)
356    }
357}