1use crate::eresult::EResult;
2use crate::message::{EncodableMessage, MalformedBody, NetMessage};
3use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
4use bytes::{Buf, BufMut, Bytes, BytesMut};
5use protobuf::Message;
6use std::borrow::Cow;
7use std::fmt::Debug;
8use std::io::{Cursor, Seek, SeekFrom};
9use steam_vent_crypto::CryptError;
10use steam_vent_proto_common::{MsgKind, MsgKindEnum};
11use steam_vent_proto_steam::enums_clientserver::EMsg;
12use steam_vent_proto_steam::steammessages_base::CMsgProtoBufHeader;
13use steamid_ng::SteamID;
14use thiserror::Error;
15use tracing::{debug, trace};
16
17pub const PROTO_MASK: u32 = 0x80000000;
18
19#[derive(Debug, Error)]
20#[non_exhaustive]
21pub enum NetworkError {
22 #[error("{0}")]
23 IO(#[from] std::io::Error),
24 #[error("{0}")]
25 Ws(#[from] tokio_tungstenite::tungstenite::Error),
26 #[error("Invalid message header")]
27 InvalidHeader,
28 #[error("Invalid message kind {0}")]
29 InvalidMessageKind(i32),
30 #[error("Failed to perform crypto handshake")]
31 CryptoHandshakeFailed,
32 #[error("Different message expected, expected {0:?}, got {1:?}")]
33 DifferentMessage(MsgKind, MsgKind),
34 #[error("Different service method expected, expected {0:?}, got {1:?}")]
35 DifferentServiceMethod(&'static str, String),
36 #[error("{0}")]
37 MalformedBody(#[from] MalformedBody),
38 #[error("Crypto error: {0}")]
39 CryptoError(#[from] CryptError),
40 #[error("Unexpected end of stream")]
41 EOF,
42 #[error("Response timed out")]
43 Timeout,
44 #[error("Remote returned an error code: {0:?}")]
45 ApiError(EResult),
46}
47
48impl From<EResult> for NetworkError {
49 fn from(value: EResult) -> Self {
50 NetworkError::ApiError(value)
51 }
52}
53
54pub type Result<T, E = NetworkError> = std::result::Result<T, E>;
55
56#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
58pub struct JobId(pub(crate) u64);
59
60impl Default for JobId {
61 fn default() -> Self {
62 JobId::NONE
63 }
64}
65
66impl JobId {
67 pub const NONE: JobId = JobId(u64::MAX);
68}
69
70pub(crate) fn steam_id_nil() -> SteamID {
71 SteamID::try_from(0u64).unwrap()
72}
73
74#[derive(Debug, Clone)]
75pub struct NetMessageHeader {
76 pub source_job_id: JobId,
77 pub target_job_id: JobId,
78 pub steam_id: SteamID,
79 pub session_id: i32,
80 pub target_job_name: Option<Cow<'static, str>>,
81 pub result: Option<i32>,
82 pub source_app_id: Option<u32>,
83}
84
85impl Default for NetMessageHeader {
86 fn default() -> Self {
87 Self {
88 source_job_id: JobId::default(),
89 target_job_id: JobId::default(),
90 steam_id: steam_id_nil(),
91 session_id: 0,
92 target_job_name: None,
93 result: None,
94 source_app_id: None,
95 }
96 }
97}
98
99impl From<CMsgProtoBufHeader> for NetMessageHeader {
100 fn from(header: CMsgProtoBufHeader) -> Self {
101 NetMessageHeader {
102 source_job_id: JobId(header.jobid_source()),
103 target_job_id: JobId(header.jobid_target()),
104 steam_id: header.steamid().try_into().unwrap_or(steam_id_nil()),
105 session_id: header.client_sessionid(),
106 target_job_name: header
107 .has_target_job_name()
108 .then(|| header.target_job_name().to_string().into()),
109 result: header.eresult,
110 source_app_id: header.routing_appid,
111 }
112 }
113}
114
115impl NetMessageHeader {
116 fn read<R: ReadBytesExt + Seek>(
117 mut reader: R,
118 kind: MsgKind,
119 is_protobuf: bool,
120 ) -> Result<(Self, usize)> {
121 if is_protobuf {
122 let header_length = reader.read_u32::<LittleEndian>()?;
123 trace!("reading protobuf header of {} bytes", header_length);
124 let header = if header_length > 0 {
125 let mut bytes = vec![0; header_length as usize];
126 let num = reader.read(&mut bytes)?;
127 CMsgProtoBufHeader::parse_from_bytes(&bytes[0..num])
128 .map_err(|_| NetworkError::InvalidHeader)?
129 .into()
130 } else {
131 NetMessageHeader::default()
132 };
133 Ok((header, 8 + header_length as usize))
134 } else if kind == EMsg::k_EMsgChannelEncryptRequest
135 || kind == EMsg::k_EMsgChannelEncryptResult
136 {
137 let target_job_id = reader.read_u64::<LittleEndian>()?;
138 let source_job_id = reader.read_u64::<LittleEndian>()?;
139 Ok((
140 NetMessageHeader {
141 target_job_id: JobId(target_job_id),
142 source_job_id: JobId(source_job_id),
143 session_id: 0,
144 steam_id: steam_id_nil(),
145 ..NetMessageHeader::default()
146 },
147 4 + 8 + 8,
148 ))
149 } else {
150 reader.seek(SeekFrom::Current(3))?; let target_job_id = reader.read_u64::<LittleEndian>()?;
152 let source_job_id = reader.read_u64::<LittleEndian>()?;
153 reader.seek(SeekFrom::Current(1))?; let steam_id = reader
155 .read_u64::<LittleEndian>()?
156 .try_into()
157 .unwrap_or(steam_id_nil());
158 let session_id = reader.read_i32::<LittleEndian>()?;
159 Ok((
160 NetMessageHeader {
161 source_job_id: JobId(source_job_id),
162 target_job_id: JobId(target_job_id),
163 steam_id,
164 session_id,
165 target_job_name: None,
166 result: None,
167 source_app_id: None,
168 },
169 4 + 3 + 8 + 8 + 1 + 8 + 4,
170 ))
171 }
172 }
173
174 pub(crate) fn write<W: WriteBytesExt, K: MsgKindEnum>(
175 &self,
176 writer: &mut W,
177 kind: K,
178 proto: bool,
179 ) -> std::io::Result<()> {
180 if MsgKind::from(kind) == EMsg::k_EMsgChannelEncryptResponse {
181 writer.write_u32::<LittleEndian>(kind.value() as u32)?;
182 } else if proto {
183 trace!("writing header for {:?} protobuf message: {:?}", kind, self);
184 let proto_header = self.proto_header(kind.into());
185 writer.write_u32::<LittleEndian>(kind.encode_kind(true))?;
186 writer.write_u32::<LittleEndian>(proto_header.compute_size() as u32)?;
187 proto_header.write_to_writer(writer)?;
188 } else {
189 trace!("writing header for {:?} message: {:?}", kind, self);
190 writer.write_u32::<LittleEndian>(kind.value() as u32)?;
191 writer.write_u8(32)?;
192 writer.write_u16::<LittleEndian>(2)?;
193 writer.write_u64::<LittleEndian>(self.target_job_id.0)?;
194 writer.write_u64::<LittleEndian>(self.source_job_id.0)?;
195 writer.write_u8(239)?;
196 writer.write_u64::<LittleEndian>(self.steam_id.into())?;
197 writer.write_i32::<LittleEndian>(self.session_id)?;
198 }
199 Ok(())
200 }
201
202 fn proto_header(&self, kind: MsgKind) -> CMsgProtoBufHeader {
203 let mut proto_header = CMsgProtoBufHeader::new();
204 if self.source_job_id != JobId::NONE {
205 proto_header.set_jobid_source(self.source_job_id.0);
206 }
207 if self.target_job_id != JobId::NONE {
208 proto_header.set_jobid_target(self.target_job_id.0);
209 }
210 if self.steam_id != steam_id_nil() {
211 proto_header.set_steamid(
212 if kind == EMsg::k_EMsgServiceMethodCallFromClientNonAuthed {
213 0
214 } else {
215 self.steam_id.into()
216 },
217 );
218 }
219 if self.session_id != 0 {
220 proto_header.set_client_sessionid(self.session_id);
221 }
222 if kind == EMsg::k_EMsgServiceMethodCallFromClientNonAuthed
223 || kind == EMsg::k_EMsgServiceMethodCallFromClient
224 {
225 proto_header.set_realm(1);
226 }
227 if let Some(target_job_name) = self.target_job_name.as_deref() {
228 proto_header.set_target_job_name(target_job_name.into());
229 }
230 proto_header.routing_appid = self.source_app_id;
231 proto_header
232 }
233
234 pub fn encode_size(&self, kind: MsgKind, proto: bool) -> usize {
235 if kind == EMsg::k_EMsgChannelEncryptResponse {
236 4
237 } else if proto {
238 let proto_header = self.proto_header(kind);
239 4 + 4 + proto_header.compute_size() as usize
240 } else {
241 4 + 1 + 2 + 8 + 8 + 1 + 8 + 4 + 4
242 }
243 }
244}
245
246#[derive(Debug, Clone)]
247pub struct RawNetMessage {
248 pub kind: MsgKind,
249 pub is_protobuf: bool,
250 pub header: NetMessageHeader,
251 pub data: BytesMut,
252 pub(crate) frame_header_buffer: Option<BytesMut>,
253 pub(crate) iv_buffer: Option<BytesMut>,
254 pub(crate) header_buffer: BytesMut,
255}
256
257pub(crate) fn decode_kind(kind: u32) -> (MsgKind, bool) {
258 let is_protobuf = kind & PROTO_MASK == PROTO_MASK;
259 let kind = MsgKind((kind & !PROTO_MASK) as i32);
260 (kind, is_protobuf)
261}
262
263impl RawNetMessage {
264 pub fn read<Body: Into<Bytes>>(body: Body) -> Result<Self> {
265 let mut value = BytesMut::from(body.into());
266 let mut reader = Cursor::new(&value);
267 let kind = reader
268 .read_u32::<LittleEndian>()
269 .map_err(|_| NetworkError::InvalidHeader)?;
270
271 let is_protobuf = kind & PROTO_MASK == PROTO_MASK;
272 let kind = MsgKind((kind & !PROTO_MASK) as i32);
273
274 trace!(
275 "reading header for {:?} {}message",
276 kind,
277 if is_protobuf { "protobuf " } else { "" }
278 );
279
280 let header_start = reader.position() as usize;
281 let (header, body_start) = NetMessageHeader::read(&mut reader, kind, is_protobuf)?;
282
283 value.advance(header_start);
284 let header_buffer = value.split_to(body_start - header_start);
285
286 Ok(RawNetMessage {
287 kind,
288 is_protobuf,
289 header,
290 data: value,
291 frame_header_buffer: None,
292 iv_buffer: None,
293 header_buffer,
294 })
295 }
296
297 pub fn from_message<T: NetMessage>(header: NetMessageHeader, message: T) -> Result<Self> {
298 Self::from_message_with_kind(header, message, T::KIND, T::IS_PROTOBUF)
299 }
300
301 pub fn from_message_with_kind<T: EncodableMessage, K: MsgKindEnum>(
302 mut header: NetMessageHeader,
303 message: T,
304 kind: K,
305 is_protobuf: bool,
306 ) -> Result<Self> {
307 debug!("writing raw {:?} message", kind);
308
309 message.process_header(&mut header);
310
311 let body_size = message.encode_size();
312
313 let mut buff = BytesMut::with_capacity(
319 8 + 16 + header.encode_size(kind.into(), is_protobuf) + body_size + 16,
320 );
321 buff.extend([0; 8 + 16]);
322 let frame_header_buffer = buff.split_to(8);
323 let iv_buffer = buff.split_to(16);
324
325 {
326 let mut writer = (&mut buff).writer();
327 header.write(&mut writer, kind, is_protobuf)?;
328 }
329
330 let header_buffer = buff.split();
331 let mut writer = (&mut buff).writer();
332 message.write_body(&mut writer)?;
333 trace!("encoded body({} bytes): {:x?}", buff.len(), buff.as_ref());
334
335 Ok(RawNetMessage {
336 kind: kind.into(),
337 is_protobuf,
338 header,
339 data: buff,
340 frame_header_buffer: Some(frame_header_buffer),
341 iv_buffer: Some(iv_buffer),
342 header_buffer,
343 })
344 }
345
346 pub fn into_bytes(self) -> BytesMut {
348 let mut body = self.header_buffer;
349 body.unsplit(self.data);
350 body
351 }
352}
353
354impl RawNetMessage {
355 pub fn into_header_and_message<T: NetMessage>(self) -> Result<(NetMessageHeader, T)> {
356 if let Some(result) = self.header.result {
357 EResult::from_result(result)?;
358 }
359 if self.kind == T::KIND {
360 trace!(
361 "reading body of {:?} message({} bytes)",
362 self.kind,
363 self.data.len()
364 );
365 let body = T::read_body(self.data, &self.header)?;
366 Ok((self.header, body))
367 } else {
368 Err(NetworkError::DifferentMessage(T::KIND.into(), self.kind))
369 }
370 }
371
372 pub fn into_message<T: NetMessage>(self) -> Result<T> {
373 self.into_header_and_message().map(|(_, msg)| msg)
374 }
375}