steam_vent/
message.rs

1use crate::net::{NetMessageHeader, NetworkError, RawNetMessage};
2use crate::service_method::ServiceMethodRequest;
3use binrw::BinRead;
4use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
5use bytes::{Buf, BytesMut};
6use crc::{Crc, CRC_32_ISO_HDLC};
7use flate2::read::GzDecoder;
8use futures_util::{
9    future::ready,
10    stream::{iter, once},
11    StreamExt,
12};
13use num_bigint_dig::ParseBigIntError;
14use protobuf::Message;
15use std::any::type_name;
16use std::fmt::Debug;
17use std::io::{Cursor, Read, Write};
18use steam_vent_proto::enums_clientserver::EMsg;
19use steam_vent_proto::steammessages_base::CMsgMulti;
20use steam_vent_proto::{MsgKind, MsgKindEnum, RpcMessage, RpcMessageWithKind};
21use thiserror::Error;
22use tokio_stream::Stream;
23use tracing::{debug, trace};
24
25/// Malformed message body
26#[derive(Error, Debug)]
27#[error("Malformed message body for {0:?}: {1}")]
28pub struct MalformedBody(MsgKind, MessageBodyError);
29
30impl MalformedBody {
31    pub fn new<K: Into<MsgKind>>(kind: K, err: impl Into<MessageBodyError>) -> Self {
32        MalformedBody(kind.into(), err.into())
33    }
34}
35
36/// Error while parsing the message body
37#[derive(Error, Debug)]
38#[non_exhaustive]
39pub enum MessageBodyError {
40    #[error("{0}")]
41    Protobuf(#[from] protobuf::Error),
42    #[error("{0}")]
43    BinRead(#[from] binrw::Error),
44    #[error("{0}")]
45    IO(#[from] std::io::Error),
46    #[error("{0}")]
47    Other(String),
48    #[error("malformed big int: {0:#}")]
49    BigInt(#[from] ParseBigIntError),
50    #[error("invalid rsa key: {0:#}")]
51    Rsa(#[from] rsa::Error),
52}
53
54impl From<String> for MessageBodyError {
55    fn from(e: String) -> Self {
56        MessageBodyError::Other(e)
57    }
58}
59
60/// A message which can be encoded and/or decoded
61///
62/// Applications can implement this trait on a struct to allow sending it using
63/// [`raw_send_with_kind`](crate::ConnectionTrait::raw_send_with_kind). To use the higher level messages a struct also needs to implement
64/// [`NetMessage`]
65pub trait EncodableMessage: Sized + Debug + Send {
66    fn read_body(_data: BytesMut, _header: &NetMessageHeader) -> Result<Self, MalformedBody> {
67        panic!("Reading not implemented for {}", type_name::<Self>())
68    }
69
70    fn write_body<W: Write>(&self, _writer: W) -> Result<(), std::io::Error> {
71        panic!("Writing not implemented for {}", type_name::<Self>())
72    }
73
74    fn encode_size(&self) -> usize {
75        panic!("Writing not implemented for {}", type_name::<Self>())
76    }
77
78    fn process_header(&self, _header: &mut NetMessageHeader) {}
79}
80
81/// A message with associated kind
82pub trait NetMessage: EncodableMessage {
83    type KindEnum: MsgKindEnum;
84    const KIND: Self::KindEnum;
85    const IS_PROTOBUF: bool = false;
86}
87
88#[derive(Debug, BinRead)]
89#[brw(little)]
90pub(crate) struct ChannelEncryptRequest {
91    pub protocol: u32,
92    #[allow(dead_code)]
93    pub universe: u32,
94    pub nonce: [u8; 16],
95}
96
97impl EncodableMessage for ChannelEncryptRequest {
98    fn read_body(data: BytesMut, _header: &NetMessageHeader) -> Result<Self, MalformedBody> {
99        trace!("reading body of {:?} message", Self::KIND);
100        let mut reader = Cursor::new(data);
101        ChannelEncryptRequest::read(&mut reader).map_err(|e| MalformedBody::new(Self::KIND, e))
102    }
103}
104
105impl NetMessage for ChannelEncryptRequest {
106    type KindEnum = EMsg;
107    const KIND: Self::KindEnum = EMsg::k_EMsgChannelEncryptRequest;
108}
109
110#[derive(Debug, BinRead)]
111#[brw(little)]
112pub(crate) struct ChannelEncryptResult {
113    pub result: u32,
114}
115
116impl EncodableMessage for ChannelEncryptResult {
117    fn read_body(data: BytesMut, _header: &NetMessageHeader) -> Result<Self, MalformedBody> {
118        trace!("reading body of {:?} message", Self::KIND);
119        let mut reader = Cursor::new(data);
120        ChannelEncryptResult::read(&mut reader).map_err(|e| MalformedBody::new(Self::KIND, e))
121    }
122}
123
124impl NetMessage for ChannelEncryptResult {
125    type KindEnum = EMsg;
126    const KIND: Self::KindEnum = EMsg::k_EMsgChannelEncryptResult;
127}
128
129#[derive(Debug)]
130pub(crate) struct ClientEncryptResponse {
131    pub protocol: u32,
132    pub encrypted_key: Vec<u8>,
133}
134
135const CRC: Crc<u32> = Crc::<u32>::new(&CRC_32_ISO_HDLC);
136
137impl EncodableMessage for ClientEncryptResponse {
138    fn write_body<W: Write>(&self, mut writer: W) -> Result<(), std::io::Error> {
139        trace!("writing body of {:?} message", Self::KIND);
140        writer.write_u64::<LittleEndian>(u64::MAX)?;
141        writer.write_u64::<LittleEndian>(u64::MAX)?;
142        writer.write_u32::<LittleEndian>(self.protocol)?;
143        writer.write_u32::<LittleEndian>(self.encrypted_key.len() as u32)?;
144        writer.write_all(&self.encrypted_key)?;
145
146        let mut digest = CRC.digest();
147        digest.update(&self.encrypted_key);
148        writer.write_u32::<LittleEndian>(digest.finalize())?;
149        writer.write_u32::<LittleEndian>(0)?;
150        Ok(())
151    }
152
153    fn encode_size(&self) -> usize {
154        8 + 8 + 4 + 4 + self.encrypted_key.len() + 4 + 4
155    }
156}
157
158impl NetMessage for ClientEncryptResponse {
159    type KindEnum = EMsg;
160    const KIND: Self::KindEnum = EMsg::k_EMsgChannelEncryptResponse;
161}
162
163enum MaybeZipReader {
164    Raw(Cursor<Vec<u8>>),
165    Zipped(Box<GzDecoder<Cursor<Vec<u8>>>>),
166}
167
168impl Read for MaybeZipReader {
169    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
170        match self {
171            MaybeZipReader::Raw(raw) => raw.read(buf),
172            MaybeZipReader::Zipped(zipped) => zipped.read(buf),
173        }
174    }
175}
176
177/// Flatten any "multi" messages in a stream of raw messages
178pub(crate) fn flatten_multi<S: Stream<Item = Result<RawNetMessage, NetworkError>>>(
179    source: S,
180) -> impl Stream<Item = Result<RawNetMessage, NetworkError>> {
181    source.flat_map(|res| match res {
182        Ok(next) if next.kind == EMsg::k_EMsgMulti => {
183            let reader = Cursor::new(next.data);
184            let multi = match MultiBodyIter::new(reader) {
185                Err(e) => return once(ready(Err(e.into()))).right_stream(),
186                Ok(iter) => iter,
187            };
188            iter(multi).left_stream()
189        }
190        res => once(ready(res)).right_stream(),
191    })
192}
193
194struct MultiBodyIter<R> {
195    reader: R,
196}
197
198impl MultiBodyIter<MaybeZipReader> {
199    pub fn new<R: Read>(mut reader: R) -> Result<Self, MalformedBody> {
200        let mut multi = CMsgMulti::parse_from_reader(&mut reader)
201            .map_err(|e| MalformedBody(EMsg::k_EMsgMulti.into(), e.into()))?;
202
203        let data = match multi.size_unzipped() {
204            0 => MaybeZipReader::Raw(Cursor::new(multi.take_message_body())),
205            _ => MaybeZipReader::Zipped(Box::new(GzDecoder::new(Cursor::new(
206                multi.take_message_body(),
207            )))),
208        };
209
210        Ok(MultiBodyIter { reader: data })
211    }
212}
213
214impl<R: Read> Iterator for MultiBodyIter<R> {
215    type Item = Result<RawNetMessage, NetworkError>;
216
217    fn next(&mut self) -> Option<Self::Item> {
218        let size = match self.reader.read_u32::<LittleEndian>() {
219            Ok(size) => size,
220            Err(_) => return None,
221        };
222
223        let mut msg_data = BytesMut::with_capacity(size as usize);
224        msg_data.resize(size as usize, 0);
225        if let Err(e) = self.reader.read_exact(&mut msg_data) {
226            return Some(Err(NetworkError::IO(e)));
227        }
228        let raw = match RawNetMessage::read(msg_data) {
229            Ok(raw) => raw,
230            Err(e) => return Some(Err(e)),
231        };
232
233        debug!("Reading child message {:?}", raw.kind);
234
235        Some(Ok(raw))
236    }
237}
238
239#[derive(Debug)]
240pub(crate) struct ServiceMethodMessage<Request: Debug>(pub Request);
241
242impl<Request: ServiceMethodRequest + Debug> EncodableMessage for ServiceMethodMessage<Request> {
243    fn read_body(data: BytesMut, _header: &NetMessageHeader) -> Result<Self, MalformedBody> {
244        trace!("reading body of protobuf message {:?}", Self::KIND);
245        Request::parse(&mut data.reader())
246            .map_err(|e| MalformedBody::new(Self::KIND, e))
247            .map(ServiceMethodMessage)
248    }
249
250    fn write_body<W: Write>(&self, mut writer: W) -> Result<(), std::io::Error> {
251        trace!("writing body of protobuf message {:?}", Self::KIND);
252        self.0
253            .write(&mut writer)
254            .map_err(|_| std::io::Error::from(std::io::ErrorKind::InvalidData))
255    }
256
257    fn encode_size(&self) -> usize {
258        self.0.compute_size() as usize
259    }
260
261    fn process_header(&self, header: &mut NetMessageHeader) {
262        header.target_job_name = Some(Request::REQ_NAME.into())
263    }
264}
265
266impl<Request: ServiceMethodRequest + Debug> NetMessage for ServiceMethodMessage<Request> {
267    type KindEnum = EMsg;
268    const KIND: Self::KindEnum = EMsg::k_EMsgServiceMethodCallFromClient;
269    const IS_PROTOBUF: bool = true;
270}
271
272#[derive(Debug)]
273pub(crate) struct ServiceMethodResponseMessage {
274    job_name: String,
275    body: BytesMut,
276}
277
278impl ServiceMethodResponseMessage {
279    pub fn into_response<Request: ServiceMethodRequest>(
280        self,
281    ) -> Result<Request::Response, NetworkError> {
282        if self.job_name == Request::REQ_NAME {
283            Ok(Request::Response::parse(&mut self.body.reader())
284                .map_err(|e| MalformedBody::new(Self::KIND, e))?)
285        } else {
286            Err(NetworkError::DifferentServiceMethod(
287                Request::REQ_NAME,
288                self.job_name,
289            ))
290        }
291    }
292}
293
294impl EncodableMessage for ServiceMethodResponseMessage {
295    fn read_body(data: BytesMut, header: &NetMessageHeader) -> Result<Self, MalformedBody> {
296        trace!("reading body of protobuf message {:?}", Self::KIND);
297        Ok(ServiceMethodResponseMessage {
298            job_name: header
299                .target_job_name
300                .as_deref()
301                .unwrap_or_default()
302                .to_string(),
303            body: data,
304        })
305    }
306}
307
308impl NetMessage for ServiceMethodResponseMessage {
309    type KindEnum = EMsg;
310    const KIND: Self::KindEnum = EMsg::k_EMsgServiceMethodResponse;
311    const IS_PROTOBUF: bool = true;
312}
313
314#[derive(Debug, Clone)]
315pub(crate) struct ServiceMethodNotification {
316    pub(crate) job_name: String,
317    body: BytesMut,
318}
319
320impl ServiceMethodNotification {
321    pub fn into_notification<Request: ServiceMethodRequest>(self) -> Result<Request, NetworkError> {
322        if self.job_name == Request::REQ_NAME {
323            Ok(Request::parse(&mut self.body.reader())
324                .map_err(|e| MalformedBody::new(Self::KIND, e))?)
325        } else {
326            Err(NetworkError::DifferentServiceMethod(
327                Request::REQ_NAME,
328                self.job_name,
329            ))
330        }
331    }
332}
333
334impl EncodableMessage for ServiceMethodNotification {
335    fn read_body(data: BytesMut, header: &NetMessageHeader) -> Result<Self, MalformedBody> {
336        trace!("reading body of protobuf message {:?}", Self::KIND);
337        Ok(ServiceMethodNotification {
338            job_name: header
339                .target_job_name
340                .as_deref()
341                .unwrap_or_default()
342                .to_string(),
343            body: data,
344        })
345    }
346}
347
348impl NetMessage for ServiceMethodNotification {
349    type KindEnum = EMsg;
350    const KIND: Self::KindEnum = EMsg::k_EMsgServiceMethod;
351    const IS_PROTOBUF: bool = true;
352}
353
354impl<ProtoMsg: RpcMessageWithKind + Send> EncodableMessage for ProtoMsg {
355    fn read_body(data: BytesMut, _header: &NetMessageHeader) -> Result<Self, MalformedBody> {
356        trace!("reading body of protobuf message {:?}", Self::KIND);
357        Self::parse(&mut data.reader()).map_err(|e| MalformedBody::new(Self::KIND, e))
358    }
359
360    fn write_body<W: Write>(&self, mut writer: W) -> Result<(), std::io::Error> {
361        trace!("writing body of protobuf message {:?}", Self::KIND);
362        self.write(&mut writer)
363            .map_err(|_| std::io::Error::from(std::io::ErrorKind::InvalidData))
364    }
365
366    fn encode_size(&self) -> usize {
367        <Self as RpcMessage>::encode_size(self)
368    }
369}
370
371impl<ProtoMsg: RpcMessageWithKind + Send> NetMessage for ProtoMsg {
372    type KindEnum = ProtoMsg::KindEnum;
373    const KIND: Self::KindEnum = <ProtoMsg as RpcMessageWithKind>::KIND;
374    const IS_PROTOBUF: bool = true;
375}