steam_vent/connection/
mod.rs

1mod filter;
2pub(crate) mod raw;
3pub(crate) mod unauthenticated;
4
5use crate::auth::{AuthConfirmationHandler, GuardDataStore};
6use crate::message::{
7    EncodableMessage, NetMessage, ServiceMethodMessage, ServiceMethodResponseMessage,
8};
9use crate::net::{NetMessageHeader, NetworkError, RawNetMessage};
10use crate::serverlist::ServerList;
11use crate::service_method::ServiceMethodRequest;
12use crate::session::{ConnectionError, Session};
13use crate::GameCoordinator;
14use async_stream::try_stream;
15pub(crate) use filter::MessageFilter;
16use futures_util::{FutureExt, Sink, SinkExt};
17use raw::RawConnection;
18use std::fmt::{Debug, Formatter};
19use std::future::Future;
20use std::net::IpAddr;
21use std::sync::Arc;
22use std::time::Duration;
23use steam_vent_proto::{GCHandshake, JobMultiple, MsgKindEnum};
24use steamid_ng::SteamID;
25use tokio::sync::Mutex;
26use tokio::time::timeout;
27use tokio_stream::wrappers::BroadcastStream;
28use tokio_stream::{Stream, StreamExt};
29use tracing::instrument;
30pub use unauthenticated::UnAuthenticatedConnection;
31
32pub(crate) type Result<T, E = NetworkError> = std::result::Result<T, E>;
33
34type TransportWriter = Arc<Mutex<dyn Sink<RawNetMessage, Error = NetworkError> + Unpin + Send>>;
35
36/// Send raw messages to steam
37#[derive(Clone)]
38pub(crate) struct MessageSender {
39    write: TransportWriter,
40}
41
42impl MessageSender {
43    pub async fn send_raw(&self, raw_message: RawNetMessage) -> Result<()> {
44        self.write.lock().await.send(raw_message).await?;
45        Ok(())
46    }
47}
48
49/// A connection to the steam server
50#[derive(Clone)]
51pub struct Connection(RawConnection);
52
53impl Debug for Connection {
54    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55        f.debug_struct("Connection").finish_non_exhaustive()
56    }
57}
58
59impl Connection {
60    pub(self) fn new(raw: RawConnection) -> Self {
61        Self(raw)
62    }
63
64    /// Start an anonymous client session on a new connection
65    pub async fn anonymous(server_list: &ServerList) -> Result<Self, ConnectionError> {
66        UnAuthenticatedConnection::connect(server_list)
67            .await?
68            .anonymous()
69            .await
70    }
71
72    /// Start an anonymous server session on a new connection
73    pub async fn anonymous_server(server_list: &ServerList) -> Result<Self, ConnectionError> {
74        UnAuthenticatedConnection::connect(server_list)
75            .await?
76            .anonymous_server()
77            .await
78    }
79
80    /// Start a client session on a new connection
81    pub async fn login<H: AuthConfirmationHandler, G: GuardDataStore>(
82        server_list: &ServerList,
83        account: &str,
84        password: &str,
85        guard_data_store: G,
86        confirmation_handler: H,
87    ) -> Result<Self, ConnectionError> {
88        UnAuthenticatedConnection::connect(server_list)
89            .await?
90            .login(account, password, guard_data_store, confirmation_handler)
91            .await
92    }
93
94    pub async fn access(
95        server_list: &ServerList,
96        account: &str,
97        access_token: &str,
98    ) -> Result<Self, ConnectionError> {
99        UnAuthenticatedConnection::connect(server_list)
100            .await?
101            .access(account, access_token)
102            .await
103    }
104
105    pub fn access_token(&self) -> Option<&str> {
106        self.session().access_token.as_deref()
107    }
108
109    pub fn steam_id(&self) -> SteamID {
110        self.session().steam_id
111    }
112
113    pub fn session_id(&self) -> i32 {
114        self.session().session_id
115    }
116
117    pub fn cell_id(&self) -> u32 {
118        self.session().cell_id
119    }
120
121    pub fn public_ip(&self) -> Option<IpAddr> {
122        self.session().public_ip
123    }
124
125    pub fn ip_country_code(&self) -> Option<String> {
126        self.session().ip_country_code.clone()
127    }
128
129    pub fn set_timeout(&mut self, timeout: Duration) {
130        self.0.timeout = timeout;
131    }
132
133    pub(crate) fn sender(&self) -> &MessageSender {
134        &self.0.sender
135    }
136
137    /// Get all messages that haven't been filtered by any of the filters
138    ///
139    /// Note that at most 32 unprocessed connections are stored and calling
140    /// this method clears the buffer
141    pub fn take_unprocessed(&self) -> Vec<RawNetMessage> {
142        self.0.filter.unprocessed()
143    }
144}
145
146impl Connection {
147    /// Create new `GameCoordinator` instance using this connection
148    pub async fn game_coordinator<Handshake: GCHandshake>(
149        &self,
150        handshake: &Handshake,
151    ) -> Result<(GameCoordinator, Handshake::Welcome), NetworkError> {
152        GameCoordinator::with_handshake(self, handshake).await
153    }
154}
155
156pub(crate) trait ConnectionImpl: Sync + Debug {
157    fn timeout(&self) -> Duration;
158    fn filter(&self) -> &MessageFilter;
159    fn session(&self) -> &Session;
160
161    fn raw_send_with_kind<Msg: EncodableMessage, K: MsgKindEnum>(
162        &self,
163        header: NetMessageHeader,
164        msg: Msg,
165        kind: K,
166        is_protobuf: bool,
167    ) -> impl Future<Output = Result<()>> + Send;
168}
169
170/// A trait for connections that only allow listening for messages coming from steam
171pub trait ReadonlyConnection {
172    fn on_notification<T: ServiceMethodRequest>(&self) -> impl Stream<Item = Result<T>> + 'static;
173
174    /// Wait for one message of a specific kind, also returning the header
175    fn one_with_header<T: NetMessage + 'static>(
176        &self,
177    ) -> impl Future<Output = Result<(NetMessageHeader, T)>> + 'static;
178
179    /// Wait for one message of a specific kind
180    fn one<T: NetMessage + 'static>(&self) -> impl Future<Output = Result<T>> + 'static;
181
182    /// Listen to messages of a specific kind, also returning the header
183    fn on_with_header<T: NetMessage + 'static>(
184        &self,
185    ) -> impl Stream<Item = Result<(NetMessageHeader, T)>> + 'static;
186
187    /// Listen to messages of a specific kind
188    fn on<T: NetMessage + 'static>(&self) -> impl Stream<Item = Result<T>> + 'static;
189}
190
191/// A trait for sending messages to steam
192pub trait ConnectionTrait {
193    /// Listen for notification messages from steam
194    fn on_notification<T: ServiceMethodRequest>(&self) -> impl Stream<Item = Result<T>> + 'static;
195
196    /// Wait for one message of a specific kind, also returning the header
197    fn one_with_header<T: NetMessage + 'static>(
198        &self,
199    ) -> impl Future<Output = Result<(NetMessageHeader, T)>> + 'static;
200
201    /// Wait for one message of a specific kind
202    fn one<T: NetMessage + 'static>(&self) -> impl Future<Output = Result<T>> + 'static;
203
204    /// Listen to messages of a specific kind, also returning the header
205    fn on_with_header<T: NetMessage + 'static>(
206        &self,
207    ) -> impl Stream<Item = Result<(NetMessageHeader, T)>> + 'static;
208
209    /// Listen to messages of a specific kind
210    fn on<T: NetMessage + 'static>(&self) -> impl Stream<Item = Result<T>> + 'static;
211
212    /// Send a rpc-request to steam, waiting for the matching rpc-response
213    fn service_method<Msg: ServiceMethodRequest>(
214        &self,
215        msg: Msg,
216    ) -> impl Future<Output = Result<Msg::Response>> + Send;
217
218    /// Send a message to steam, waiting for a response with the same job id
219    fn job<Msg: NetMessage, Rsp: NetMessage>(
220        &self,
221        msg: Msg,
222    ) -> impl Future<Output = Result<Rsp>> + Send;
223
224    /// Send a message to steam, receiving responses until the response marks that the response is complete
225    fn job_multi<Msg: NetMessage, Rsp: NetMessage + JobMultiple>(
226        &self,
227        msg: Msg,
228    ) -> impl Stream<Item = Result<Rsp>> + Send;
229
230    /// Send a message to steam without waiting for a response
231    fn send<Msg: NetMessage>(&self, msg: Msg) -> impl Future<Output = Result<()>> + Send;
232
233    /// Send a message to steam without waiting for a response, overwriting the kind of the message
234    fn send_with_kind<Msg: NetMessage, K: MsgKindEnum>(
235        &self,
236        msg: Msg,
237        kind: K,
238    ) -> impl Future<Output = Result<()>> + Send;
239
240    /// Send a message to steam without waiting for a response, with a customized header↑
241    fn raw_send<Msg: NetMessage>(
242        &self,
243        header: NetMessageHeader,
244        msg: Msg,
245    ) -> impl Future<Output = Result<()>> + Send;
246
247    /// Send a message to steam without waiting for a response, with a customized header↑ and overwriting the kind of the message
248    fn raw_send_with_kind<Msg: EncodableMessage, K: MsgKindEnum>(
249        &self,
250        header: NetMessageHeader,
251        msg: Msg,
252        kind: K,
253        is_protobuf: bool,
254    ) -> impl Future<Output = Result<()>> + Send;
255}
256
257impl ConnectionImpl for Connection {
258    fn timeout(&self) -> Duration {
259        self.0.timeout()
260    }
261
262    fn filter(&self) -> &MessageFilter {
263        self.0.filter()
264    }
265
266    fn session(&self) -> &Session {
267        self.0.session()
268    }
269
270    async fn raw_send_with_kind<Msg: EncodableMessage, K: MsgKindEnum>(
271        &self,
272        header: NetMessageHeader,
273        msg: Msg,
274        kind: K,
275        is_protobuf: bool,
276    ) -> Result<()> {
277        <RawConnection as ConnectionImpl>::raw_send_with_kind(
278            &self.0,
279            header,
280            msg,
281            kind,
282            is_protobuf,
283        )
284        .await
285    }
286}
287
288impl<C: ConnectionImpl> ConnectionTrait for C {
289    fn on_notification<T: ServiceMethodRequest>(&self) -> impl Stream<Item = Result<T>> + 'static {
290        BroadcastStream::new(self.filter().on_notification(T::REQ_NAME))
291            .filter_map(|res| res.ok())
292            .map(|raw| raw.into_notification())
293    }
294
295    fn one_with_header<T: NetMessage + 'static>(
296        &self,
297    ) -> impl Future<Output = Result<(NetMessageHeader, T)>> + 'static {
298        // async block instead of async fn, so we don't have to tie the lifetime of the returned future
299        // to the lifetime of &self
300        let fut = self.filter().one_kind(T::KIND);
301        async move {
302            let raw = fut.await.map_err(|_| NetworkError::EOF)?;
303            raw.into_header_and_message()
304        }
305    }
306
307    fn one<T: NetMessage + 'static>(&self) -> impl Future<Output = Result<T>> + 'static {
308        self.one_with_header::<T>()
309            .map(|res| res.map(|(_, msg)| msg))
310    }
311
312    fn on_with_header<T: NetMessage + 'static>(
313        &self,
314    ) -> impl Stream<Item = Result<(NetMessageHeader, T)>> + 'static {
315        BroadcastStream::new(self.filter().on_kind(T::KIND)).map(|raw| {
316            let raw = raw.map_err(|_| NetworkError::EOF)?;
317            raw.into_header_and_message()
318        })
319    }
320
321    fn on<T: NetMessage + 'static>(&self) -> impl Stream<Item = Result<T>> + 'static {
322        self.on_with_header::<T>()
323            .map(|res| res.map(|(_, msg)| msg))
324    }
325
326    async fn service_method<Msg: ServiceMethodRequest>(&self, msg: Msg) -> Result<Msg::Response> {
327        let header = self.session().header(true);
328        let recv = self.filter().on_job_id(header.source_job_id);
329        self.raw_send(header, ServiceMethodMessage(msg)).await?;
330        let message = timeout(self.timeout(), recv)
331            .await
332            .map_err(|_| NetworkError::Timeout)?
333            .map_err(|_| NetworkError::EOF)?
334            .into_message::<ServiceMethodResponseMessage>()?;
335        message.into_response::<Msg>()
336    }
337
338    async fn job<Msg: NetMessage, Rsp: NetMessage>(&self, msg: Msg) -> Result<Rsp> {
339        let header = self.session().header(true);
340        let recv = self.filter().on_job_id(header.source_job_id);
341        self.raw_send(header, msg).await?;
342        timeout(self.timeout(), recv)
343            .await
344            .map_err(|_| NetworkError::Timeout)?
345            .map_err(|_| NetworkError::EOF)?
346            .into_message()
347    }
348
349    fn job_multi<Msg: NetMessage, Rsp: NetMessage + JobMultiple>(
350        &self,
351        msg: Msg,
352    ) -> impl Stream<Item = Result<Rsp>> + Send {
353        try_stream! {
354            let header = self.session().header(true);
355            let source_job_id = header.source_job_id;
356            let mut recv = self.filter().on_job_id_multi(source_job_id);
357            self.raw_send(header, msg).await?;
358            loop {
359                let msg: Rsp = timeout(self.timeout(), recv.recv())
360                    .await
361                    .map_err(|_| NetworkError::Timeout)?
362                    .ok_or(NetworkError::EOF)?
363                    .into_message()?;
364                let completed = msg.completed();
365                yield msg;
366                if completed {
367                    break;
368                }
369            }
370            self.filter().complete_job_id_multi(source_job_id);
371        }
372    }
373
374    #[instrument(skip(msg), fields(kind = ?Msg::KIND))]
375    fn send<Msg: NetMessage>(&self, msg: Msg) -> impl Future<Output = Result<()>> + Send {
376        self.raw_send(self.session().header(false), msg)
377    }
378
379    #[instrument(skip(msg, kind), fields(kind = ?kind))]
380    fn send_with_kind<Msg: NetMessage, K: MsgKindEnum>(
381        &self,
382        msg: Msg,
383        kind: K,
384    ) -> impl Future<Output = Result<()>> + Send {
385        let header = self.session().header(false);
386        self.raw_send_with_kind(header, msg, kind, Msg::IS_PROTOBUF)
387    }
388
389    fn raw_send<Msg: NetMessage>(
390        &self,
391        header: NetMessageHeader,
392        msg: Msg,
393    ) -> impl Future<Output = Result<()>> + Send {
394        self.raw_send_with_kind(header, msg, Msg::KIND, Msg::IS_PROTOBUF)
395    }
396
397    fn raw_send_with_kind<Msg: EncodableMessage, K: MsgKindEnum>(
398        &self,
399        header: NetMessageHeader,
400        msg: Msg,
401        kind: K,
402        is_protobuf: bool,
403    ) -> impl Future<Output = Result<()>> + Send {
404        <Self as ConnectionImpl>::raw_send_with_kind(self, header, msg, kind, is_protobuf)
405    }
406}