1use crate::net::{NetMessageHeader, NetworkError, RawNetMessage};
2use crate::service_method::ServiceMethodRequest;
3use binrw::BinRead;
4use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
5use bytes::{Buf, BytesMut};
6use crc::{Crc, CRC_32_ISO_HDLC};
7use flate2::read::GzDecoder;
8use futures_util::{
9 future::ready,
10 stream::{iter, once},
11 StreamExt,
12};
13use num_bigint_dig::ParseBigIntError;
14use protobuf::Message;
15use std::any::type_name;
16use std::fmt::Debug;
17use std::io::{Cursor, Read, Write};
18use steam_vent_proto::enums_clientserver::EMsg;
19use steam_vent_proto::steammessages_base::CMsgMulti;
20use steam_vent_proto::{MsgKind, MsgKindEnum, RpcMessage, RpcMessageWithKind};
21use thiserror::Error;
22use tokio_stream::Stream;
23use tracing::{debug, trace};
24
25#[derive(Error, Debug)]
27#[error("Malformed message body for {0:?}: {1}")]
28pub struct MalformedBody(MsgKind, MessageBodyError);
29
30impl MalformedBody {
31 pub fn new<K: Into<MsgKind>>(kind: K, err: impl Into<MessageBodyError>) -> Self {
32 MalformedBody(kind.into(), err.into())
33 }
34}
35
36#[derive(Error, Debug)]
38#[non_exhaustive]
39pub enum MessageBodyError {
40 #[error("{0}")]
41 Protobuf(#[from] protobuf::Error),
42 #[error("{0}")]
43 BinRead(#[from] binrw::Error),
44 #[error("{0}")]
45 IO(#[from] std::io::Error),
46 #[error("{0}")]
47 Other(String),
48 #[error("malformed big int: {0:#}")]
49 BigInt(#[from] ParseBigIntError),
50 #[error("invalid rsa key: {0:#}")]
51 Rsa(#[from] rsa::Error),
52}
53
54impl From<String> for MessageBodyError {
55 fn from(e: String) -> Self {
56 MessageBodyError::Other(e)
57 }
58}
59
60pub trait EncodableMessage: Sized + Debug + Send {
66 fn read_body(_data: BytesMut, _header: &NetMessageHeader) -> Result<Self, MalformedBody> {
67 panic!("Reading not implemented for {}", type_name::<Self>())
68 }
69
70 fn write_body<W: Write>(&self, _writer: W) -> Result<(), std::io::Error> {
71 panic!("Writing not implemented for {}", type_name::<Self>())
72 }
73
74 fn encode_size(&self) -> usize {
75 panic!("Writing not implemented for {}", type_name::<Self>())
76 }
77
78 fn process_header(&self, _header: &mut NetMessageHeader) {}
79}
80
81pub trait NetMessage: EncodableMessage {
83 type KindEnum: MsgKindEnum;
84 const KIND: Self::KindEnum;
85 const IS_PROTOBUF: bool = false;
86}
87
88#[derive(Debug, BinRead)]
89#[brw(little)]
90pub(crate) struct ChannelEncryptRequest {
91 pub protocol: u32,
92 #[allow(dead_code)]
93 pub universe: u32,
94 pub nonce: [u8; 16],
95}
96
97impl EncodableMessage for ChannelEncryptRequest {
98 fn read_body(data: BytesMut, _header: &NetMessageHeader) -> Result<Self, MalformedBody> {
99 trace!("reading body of {:?} message", Self::KIND);
100 let mut reader = Cursor::new(data);
101 ChannelEncryptRequest::read(&mut reader).map_err(|e| MalformedBody::new(Self::KIND, e))
102 }
103}
104
105impl NetMessage for ChannelEncryptRequest {
106 type KindEnum = EMsg;
107 const KIND: Self::KindEnum = EMsg::k_EMsgChannelEncryptRequest;
108}
109
110#[derive(Debug, BinRead)]
111#[brw(little)]
112pub(crate) struct ChannelEncryptResult {
113 pub result: u32,
114}
115
116impl EncodableMessage for ChannelEncryptResult {
117 fn read_body(data: BytesMut, _header: &NetMessageHeader) -> Result<Self, MalformedBody> {
118 trace!("reading body of {:?} message", Self::KIND);
119 let mut reader = Cursor::new(data);
120 ChannelEncryptResult::read(&mut reader).map_err(|e| MalformedBody::new(Self::KIND, e))
121 }
122}
123
124impl NetMessage for ChannelEncryptResult {
125 type KindEnum = EMsg;
126 const KIND: Self::KindEnum = EMsg::k_EMsgChannelEncryptResult;
127}
128
129#[derive(Debug)]
130pub(crate) struct ClientEncryptResponse {
131 pub protocol: u32,
132 pub encrypted_key: Vec<u8>,
133}
134
135const CRC: Crc<u32> = Crc::<u32>::new(&CRC_32_ISO_HDLC);
136
137impl EncodableMessage for ClientEncryptResponse {
138 fn write_body<W: Write>(&self, mut writer: W) -> Result<(), std::io::Error> {
139 trace!("writing body of {:?} message", Self::KIND);
140 writer.write_u64::<LittleEndian>(u64::MAX)?;
141 writer.write_u64::<LittleEndian>(u64::MAX)?;
142 writer.write_u32::<LittleEndian>(self.protocol)?;
143 writer.write_u32::<LittleEndian>(self.encrypted_key.len() as u32)?;
144 writer.write_all(&self.encrypted_key)?;
145
146 let mut digest = CRC.digest();
147 digest.update(&self.encrypted_key);
148 writer.write_u32::<LittleEndian>(digest.finalize())?;
149 writer.write_u32::<LittleEndian>(0)?;
150 Ok(())
151 }
152
153 fn encode_size(&self) -> usize {
154 8 + 8 + 4 + 4 + self.encrypted_key.len() + 4 + 4
155 }
156}
157
158impl NetMessage for ClientEncryptResponse {
159 type KindEnum = EMsg;
160 const KIND: Self::KindEnum = EMsg::k_EMsgChannelEncryptResponse;
161}
162
163enum MaybeZipReader {
164 Raw(Cursor<Vec<u8>>),
165 Zipped(Box<GzDecoder<Cursor<Vec<u8>>>>),
166}
167
168impl Read for MaybeZipReader {
169 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
170 match self {
171 MaybeZipReader::Raw(raw) => raw.read(buf),
172 MaybeZipReader::Zipped(zipped) => zipped.read(buf),
173 }
174 }
175}
176
177pub(crate) fn flatten_multi<S: Stream<Item = Result<RawNetMessage, NetworkError>>>(
179 source: S,
180) -> impl Stream<Item = Result<RawNetMessage, NetworkError>> {
181 source.flat_map(|res| match res {
182 Ok(next) if next.kind == EMsg::k_EMsgMulti => {
183 let reader = Cursor::new(next.data);
184 let multi = match MultiBodyIter::new(reader) {
185 Err(e) => return once(ready(Err(e.into()))).right_stream(),
186 Ok(iter) => iter,
187 };
188 iter(multi).left_stream()
189 }
190 res => once(ready(res)).right_stream(),
191 })
192}
193
194struct MultiBodyIter<R> {
195 reader: R,
196}
197
198impl MultiBodyIter<MaybeZipReader> {
199 pub fn new<R: Read>(mut reader: R) -> Result<Self, MalformedBody> {
200 let mut multi = CMsgMulti::parse_from_reader(&mut reader)
201 .map_err(|e| MalformedBody(EMsg::k_EMsgMulti.into(), e.into()))?;
202
203 let data = match multi.size_unzipped() {
204 0 => MaybeZipReader::Raw(Cursor::new(multi.take_message_body())),
205 _ => MaybeZipReader::Zipped(Box::new(GzDecoder::new(Cursor::new(
206 multi.take_message_body(),
207 )))),
208 };
209
210 Ok(MultiBodyIter { reader: data })
211 }
212}
213
214impl<R: Read> Iterator for MultiBodyIter<R> {
215 type Item = Result<RawNetMessage, NetworkError>;
216
217 fn next(&mut self) -> Option<Self::Item> {
218 let size = match self.reader.read_u32::<LittleEndian>() {
219 Ok(size) => size,
220 Err(_) => return None,
221 };
222
223 let mut msg_data = BytesMut::with_capacity(size as usize);
224 msg_data.resize(size as usize, 0);
225 if let Err(e) = self.reader.read_exact(&mut msg_data) {
226 return Some(Err(NetworkError::IO(e)));
227 }
228 let raw = match RawNetMessage::read(msg_data) {
229 Ok(raw) => raw,
230 Err(e) => return Some(Err(e)),
231 };
232
233 debug!("Reading child message {:?}", raw.kind);
234
235 Some(Ok(raw))
236 }
237}
238
239#[derive(Debug)]
240pub(crate) struct ServiceMethodMessage<Request: Debug>(pub Request);
241
242impl<Request: ServiceMethodRequest + Debug> EncodableMessage for ServiceMethodMessage<Request> {
243 fn read_body(data: BytesMut, _header: &NetMessageHeader) -> Result<Self, MalformedBody> {
244 trace!("reading body of protobuf message {:?}", Self::KIND);
245 Request::parse(&mut data.reader())
246 .map_err(|e| MalformedBody::new(Self::KIND, e))
247 .map(ServiceMethodMessage)
248 }
249
250 fn write_body<W: Write>(&self, mut writer: W) -> Result<(), std::io::Error> {
251 trace!("writing body of protobuf message {:?}", Self::KIND);
252 self.0
253 .write(&mut writer)
254 .map_err(|_| std::io::Error::from(std::io::ErrorKind::InvalidData))
255 }
256
257 fn encode_size(&self) -> usize {
258 self.0.compute_size() as usize
259 }
260
261 fn process_header(&self, header: &mut NetMessageHeader) {
262 header.target_job_name = Some(Request::REQ_NAME.into())
263 }
264}
265
266impl<Request: ServiceMethodRequest + Debug> NetMessage for ServiceMethodMessage<Request> {
267 type KindEnum = EMsg;
268 const KIND: Self::KindEnum = EMsg::k_EMsgServiceMethodCallFromClient;
269 const IS_PROTOBUF: bool = true;
270}
271
272#[derive(Debug)]
273pub(crate) struct ServiceMethodResponseMessage {
274 job_name: String,
275 body: BytesMut,
276}
277
278impl ServiceMethodResponseMessage {
279 pub fn into_response<Request: ServiceMethodRequest>(
280 self,
281 ) -> Result<Request::Response, NetworkError> {
282 if self.job_name == Request::REQ_NAME {
283 Ok(Request::Response::parse(&mut self.body.reader())
284 .map_err(|e| MalformedBody::new(Self::KIND, e))?)
285 } else {
286 Err(NetworkError::DifferentServiceMethod(
287 Request::REQ_NAME,
288 self.job_name,
289 ))
290 }
291 }
292}
293
294impl EncodableMessage for ServiceMethodResponseMessage {
295 fn read_body(data: BytesMut, header: &NetMessageHeader) -> Result<Self, MalformedBody> {
296 trace!("reading body of protobuf message {:?}", Self::KIND);
297 Ok(ServiceMethodResponseMessage {
298 job_name: header
299 .target_job_name
300 .as_deref()
301 .unwrap_or_default()
302 .to_string(),
303 body: data,
304 })
305 }
306}
307
308impl NetMessage for ServiceMethodResponseMessage {
309 type KindEnum = EMsg;
310 const KIND: Self::KindEnum = EMsg::k_EMsgServiceMethodResponse;
311 const IS_PROTOBUF: bool = true;
312}
313
314#[derive(Debug, Clone)]
315pub(crate) struct ServiceMethodNotification {
316 pub(crate) job_name: String,
317 body: BytesMut,
318}
319
320impl ServiceMethodNotification {
321 pub fn into_notification<Request: ServiceMethodRequest>(self) -> Result<Request, NetworkError> {
322 if self.job_name == Request::REQ_NAME {
323 Ok(Request::parse(&mut self.body.reader())
324 .map_err(|e| MalformedBody::new(Self::KIND, e))?)
325 } else {
326 Err(NetworkError::DifferentServiceMethod(
327 Request::REQ_NAME,
328 self.job_name,
329 ))
330 }
331 }
332}
333
334impl EncodableMessage for ServiceMethodNotification {
335 fn read_body(data: BytesMut, header: &NetMessageHeader) -> Result<Self, MalformedBody> {
336 trace!("reading body of protobuf message {:?}", Self::KIND);
337 Ok(ServiceMethodNotification {
338 job_name: header
339 .target_job_name
340 .as_deref()
341 .unwrap_or_default()
342 .to_string(),
343 body: data,
344 })
345 }
346}
347
348impl NetMessage for ServiceMethodNotification {
349 type KindEnum = EMsg;
350 const KIND: Self::KindEnum = EMsg::k_EMsgServiceMethod;
351 const IS_PROTOBUF: bool = true;
352}
353
354impl<ProtoMsg: RpcMessageWithKind + Send> EncodableMessage for ProtoMsg {
355 fn read_body(data: BytesMut, _header: &NetMessageHeader) -> Result<Self, MalformedBody> {
356 trace!("reading body of protobuf message {:?}", Self::KIND);
357 Self::parse(&mut data.reader()).map_err(|e| MalformedBody::new(Self::KIND, e))
358 }
359
360 fn write_body<W: Write>(&self, mut writer: W) -> Result<(), std::io::Error> {
361 trace!("writing body of protobuf message {:?}", Self::KIND);
362 self.write(&mut writer)
363 .map_err(|_| std::io::Error::from(std::io::ErrorKind::InvalidData))
364 }
365
366 fn encode_size(&self) -> usize {
367 <Self as RpcMessage>::encode_size(self)
368 }
369}
370
371impl<ProtoMsg: RpcMessageWithKind + Send> NetMessage for ProtoMsg {
372 type KindEnum = ProtoMsg::KindEnum;
373 const KIND: Self::KindEnum = <ProtoMsg as RpcMessageWithKind>::KIND;
374 const IS_PROTOBUF: bool = true;
375}