1use crate::eresult::EResult;
2use crate::message::{EncodableMessage, MalformedBody, NetMessage};
3use crate::proto::steammessages_base::CMsgProtoBufHeader;
4use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
5use bytes::{Buf, BufMut, Bytes, BytesMut};
6use protobuf::Message;
7use std::borrow::Cow;
8use std::fmt::Debug;
9use std::io::{Cursor, Seek, SeekFrom};
10use steam_vent_crypto::CryptError;
11use steam_vent_proto::enums_clientserver::EMsg;
12use steam_vent_proto::{MsgKind, MsgKindEnum};
13use steamid_ng::SteamID;
14use thiserror::Error;
15use tracing::{debug, trace};
16
17pub const PROTO_MASK: u32 = 0x80000000;
18
19#[derive(Debug, Error)]
20#[non_exhaustive]
21pub enum NetworkError {
22 #[error("{0}")]
23 IO(#[from] std::io::Error),
24 #[error("{0}")]
25 Ws(#[from] tokio_tungstenite::tungstenite::Error),
26 #[error("Invalid message header")]
27 InvalidHeader,
28 #[error("Invalid message kind {0}")]
29 InvalidMessageKind(i32),
30 #[error("Failed to perform crypto handshake")]
31 CryptoHandshakeFailed,
32 #[error("Different message expected, expected {0:?}, got {1:?}")]
33 DifferentMessage(MsgKind, MsgKind),
34 #[error("Different service method expected, expected {0:?}, got {1:?}")]
35 DifferentServiceMethod(&'static str, String),
36 #[error("{0}")]
37 MalformedBody(#[from] MalformedBody),
38 #[error("Crypto error: {0}")]
39 CryptoError(#[from] CryptError),
40 #[error("Unexpected end of stream")]
41 EOF,
42 #[error("Response timed out")]
43 Timeout,
44 #[error("Remote returned an error code: {0:?}")]
45 ApiError(EResult),
46}
47
48impl From<EResult> for NetworkError {
49 fn from(value: EResult) -> Self {
50 NetworkError::ApiError(value)
51 }
52}
53
54pub type Result<T, E = NetworkError> = std::result::Result<T, E>;
55
56#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
58pub struct JobId(pub(crate) u64);
59
60impl Default for JobId {
61 fn default() -> Self {
62 JobId::NONE
63 }
64}
65
66impl JobId {
67 pub const NONE: JobId = JobId(u64::MAX);
68}
69
70#[derive(Debug, Default, Clone)]
71pub struct NetMessageHeader {
72 pub source_job_id: JobId,
73 pub target_job_id: JobId,
74 pub steam_id: SteamID,
75 pub session_id: i32,
76 pub target_job_name: Option<Cow<'static, str>>,
77 pub result: Option<i32>,
78 pub source_app_id: Option<u32>,
79}
80
81impl From<CMsgProtoBufHeader> for NetMessageHeader {
82 fn from(header: CMsgProtoBufHeader) -> Self {
83 NetMessageHeader {
84 source_job_id: JobId(header.jobid_source()),
85 target_job_id: JobId(header.jobid_target()),
86 steam_id: header.steamid().try_into().unwrap_or_default(),
87 session_id: header.client_sessionid(),
88 target_job_name: header
89 .has_target_job_name()
90 .then(|| header.target_job_name().to_string().into()),
91 result: header.eresult,
92 source_app_id: header.routing_appid,
93 }
94 }
95}
96
97impl NetMessageHeader {
98 fn read<R: ReadBytesExt + Seek>(
99 mut reader: R,
100 kind: MsgKind,
101 is_protobuf: bool,
102 ) -> Result<(Self, usize)> {
103 if is_protobuf {
104 let header_length = reader.read_u32::<LittleEndian>()?;
105 trace!("reading protobuf header of {} bytes", header_length);
106 let header = if header_length > 0 {
107 let mut bytes = vec![0; header_length as usize];
108 let num = reader.read(&mut bytes)?;
109 CMsgProtoBufHeader::parse_from_bytes(&bytes[0..num])
110 .map_err(|_| NetworkError::InvalidHeader)?
111 .into()
112 } else {
113 NetMessageHeader::default()
114 };
115 Ok((header, 8 + header_length as usize))
116 } else if kind == EMsg::k_EMsgChannelEncryptRequest
117 || kind == EMsg::k_EMsgChannelEncryptResult
118 {
119 let target_job_id = reader.read_u64::<LittleEndian>()?;
120 let source_job_id = reader.read_u64::<LittleEndian>()?;
121 Ok((
122 NetMessageHeader {
123 target_job_id: JobId(target_job_id),
124 source_job_id: JobId(source_job_id),
125 session_id: 0,
126 steam_id: SteamID::default(),
127 ..NetMessageHeader::default()
128 },
129 4 + 8 + 8,
130 ))
131 } else {
132 reader.seek(SeekFrom::Current(3))?; let target_job_id = reader.read_u64::<LittleEndian>()?;
134 let source_job_id = reader.read_u64::<LittleEndian>()?;
135 reader.seek(SeekFrom::Current(1))?; let steam_id = reader
137 .read_u64::<LittleEndian>()?
138 .try_into()
139 .unwrap_or_default();
140 let session_id = reader.read_i32::<LittleEndian>()?;
141 Ok((
142 NetMessageHeader {
143 source_job_id: JobId(source_job_id),
144 target_job_id: JobId(target_job_id),
145 steam_id,
146 session_id,
147 target_job_name: None,
148 result: None,
149 source_app_id: None,
150 },
151 4 + 3 + 8 + 8 + 1 + 8 + 4,
152 ))
153 }
154 }
155
156 pub(crate) fn write<W: WriteBytesExt, K: MsgKindEnum>(
157 &self,
158 writer: &mut W,
159 kind: K,
160 proto: bool,
161 ) -> std::io::Result<()> {
162 if MsgKind::from(kind) == EMsg::k_EMsgChannelEncryptResponse {
163 writer.write_u32::<LittleEndian>(kind.value() as u32)?;
164 } else if proto {
165 trace!("writing header for {:?} protobuf message: {:?}", kind, self);
166 let proto_header = self.proto_header(kind.into());
167 writer.write_u32::<LittleEndian>(kind.encode_kind(true))?;
168 writer.write_u32::<LittleEndian>(proto_header.compute_size() as u32)?;
169 proto_header.write_to_writer(writer)?;
170 } else {
171 trace!("writing header for {:?} message: {:?}", kind, self);
172 writer.write_u32::<LittleEndian>(kind.value() as u32)?;
173 writer.write_u8(32)?;
174 writer.write_u16::<LittleEndian>(2)?;
175 writer.write_u64::<LittleEndian>(self.target_job_id.0)?;
176 writer.write_u64::<LittleEndian>(self.source_job_id.0)?;
177 writer.write_u8(239)?;
178 writer.write_u64::<LittleEndian>(self.steam_id.into())?;
179 writer.write_i32::<LittleEndian>(self.session_id)?;
180 }
181 Ok(())
182 }
183
184 fn proto_header(&self, kind: MsgKind) -> CMsgProtoBufHeader {
185 let mut proto_header = CMsgProtoBufHeader::new();
186 if self.source_job_id != JobId::NONE {
187 proto_header.set_jobid_source(self.source_job_id.0);
188 }
189 if self.target_job_id != JobId::NONE {
190 proto_header.set_jobid_target(self.target_job_id.0);
191 }
192 if self.steam_id != SteamID::default() {
193 proto_header.set_steamid(
194 if kind == EMsg::k_EMsgServiceMethodCallFromClientNonAuthed {
195 0
196 } else {
197 self.steam_id.into()
198 },
199 );
200 }
201 if self.session_id != 0 {
202 proto_header.set_client_sessionid(self.session_id);
203 }
204 if kind == EMsg::k_EMsgServiceMethodCallFromClientNonAuthed
205 || kind == EMsg::k_EMsgServiceMethodCallFromClient
206 {
207 proto_header.set_realm(1);
208 }
209 if let Some(target_job_name) = self.target_job_name.as_deref() {
210 proto_header.set_target_job_name(target_job_name.into());
211 }
212 proto_header.routing_appid = self.source_app_id;
213 proto_header
214 }
215
216 pub fn encode_size(&self, kind: MsgKind, proto: bool) -> usize {
217 if kind == EMsg::k_EMsgChannelEncryptResponse {
218 4
219 } else if proto {
220 let proto_header = self.proto_header(kind);
221 4 + 4 + proto_header.compute_size() as usize
222 } else {
223 4 + 1 + 2 + 8 + 8 + 1 + 8 + 4 + 4
224 }
225 }
226}
227
228#[derive(Debug, Clone)]
229pub struct RawNetMessage {
230 pub kind: MsgKind,
231 pub is_protobuf: bool,
232 pub header: NetMessageHeader,
233 pub data: BytesMut,
234 pub(crate) frame_header_buffer: Option<BytesMut>,
235 pub(crate) iv_buffer: Option<BytesMut>,
236 pub(crate) header_buffer: BytesMut,
237}
238
239pub(crate) fn decode_kind(kind: u32) -> (MsgKind, bool) {
240 let is_protobuf = kind & PROTO_MASK == PROTO_MASK;
241 let kind = MsgKind((kind & !PROTO_MASK) as i32);
242 (kind, is_protobuf)
243}
244
245impl RawNetMessage {
246 pub fn read<Body: Into<Bytes>>(body: Body) -> Result<Self> {
247 let mut value = BytesMut::from(body.into());
248 let mut reader = Cursor::new(&value);
249 let kind = reader
250 .read_u32::<LittleEndian>()
251 .map_err(|_| NetworkError::InvalidHeader)?;
252
253 let is_protobuf = kind & PROTO_MASK == PROTO_MASK;
254 let kind = MsgKind((kind & !PROTO_MASK) as i32);
255
256 trace!(
257 "reading header for {:?} {}message",
258 kind,
259 if is_protobuf { "protobuf " } else { "" }
260 );
261
262 let header_start = reader.position() as usize;
263 let (header, body_start) = NetMessageHeader::read(&mut reader, kind, is_protobuf)?;
264
265 value.advance(header_start);
266 let header_buffer = value.split_to(body_start - header_start);
267
268 Ok(RawNetMessage {
269 kind,
270 is_protobuf,
271 header,
272 data: value,
273 frame_header_buffer: None,
274 iv_buffer: None,
275 header_buffer,
276 })
277 }
278
279 pub fn from_message<T: NetMessage>(header: NetMessageHeader, message: T) -> Result<Self> {
280 Self::from_message_with_kind(header, message, T::KIND, T::IS_PROTOBUF)
281 }
282
283 pub fn from_message_with_kind<T: EncodableMessage, K: MsgKindEnum>(
284 mut header: NetMessageHeader,
285 message: T,
286 kind: K,
287 is_protobuf: bool,
288 ) -> Result<Self> {
289 debug!("writing raw {:?} message", kind);
290
291 message.process_header(&mut header);
292
293 let body_size = message.encode_size();
294
295 let mut buff = BytesMut::with_capacity(
301 8 + 16 + header.encode_size(kind.into(), is_protobuf) + body_size + 16,
302 );
303 buff.extend([0; 8 + 16]);
304 let frame_header_buffer = buff.split_to(8);
305 let iv_buffer = buff.split_to(16);
306
307 {
308 let mut writer = (&mut buff).writer();
309 header.write(&mut writer, kind, is_protobuf)?;
310 }
311
312 let header_buffer = buff.split();
313 let mut writer = (&mut buff).writer();
314 message.write_body(&mut writer)?;
315 trace!("encoded body({} bytes): {:x?}", buff.len(), buff.as_ref());
316
317 Ok(RawNetMessage {
318 kind: kind.into(),
319 is_protobuf,
320 header,
321 data: buff,
322 frame_header_buffer: Some(frame_header_buffer),
323 iv_buffer: Some(iv_buffer),
324 header_buffer,
325 })
326 }
327
328 pub fn into_bytes(self) -> BytesMut {
330 let mut body = self.header_buffer;
331 body.unsplit(self.data);
332 body
333 }
334}
335
336impl RawNetMessage {
337 pub fn into_header_and_message<T: NetMessage>(self) -> Result<(NetMessageHeader, T)> {
338 if let Some(result) = self.header.result {
339 EResult::from_result(result)?;
340 }
341 if self.kind == T::KIND {
342 trace!(
343 "reading body of {:?} message({} bytes)",
344 self.kind,
345 self.data.len()
346 );
347 let body = T::read_body(self.data, &self.header)?;
348 Ok((self.header, body))
349 } else {
350 Err(NetworkError::DifferentMessage(T::KIND.into(), self.kind))
351 }
352 }
353
354 pub fn into_message<T: NetMessage>(self) -> Result<T> {
355 self.into_header_and_message().map(|(_, msg)| msg)
356 }
357}