steam_vent/connection/
mod.rs

1mod filter;
2pub(crate) mod raw;
3pub(crate) mod unauthenticated;
4
5use crate::auth::{AuthConfirmationHandler, GuardDataStore};
6use crate::message::{
7    EncodableMessage, NetMessage, ServiceMethodMessage, ServiceMethodResponseMessage,
8};
9use crate::net::{NetMessageHeader, NetworkError, RawNetMessage};
10use crate::serverlist::ServerList;
11use crate::service_method::ServiceMethodRequest;
12use crate::session::{ConnectionError, Session};
13use async_stream::try_stream;
14pub(crate) use filter::MessageFilter;
15use futures_util::{FutureExt, Sink, SinkExt};
16use raw::RawConnection;
17use std::fmt::{Debug, Formatter};
18use std::future::Future;
19use std::net::IpAddr;
20use std::sync::Arc;
21use std::time::Duration;
22use steam_vent_proto::{JobMultiple, MsgKindEnum};
23use steamid_ng::SteamID;
24use tokio::sync::Mutex;
25use tokio::time::timeout;
26use tokio_stream::wrappers::BroadcastStream;
27use tokio_stream::{Stream, StreamExt};
28use tracing::instrument;
29pub use unauthenticated::UnAuthenticatedConnection;
30
31pub(crate) type Result<T, E = NetworkError> = std::result::Result<T, E>;
32
33type TransportWriter = Arc<Mutex<dyn Sink<RawNetMessage, Error = NetworkError> + Unpin + Send>>;
34
35/// Send raw messages to steam
36#[derive(Clone)]
37pub(crate) struct MessageSender {
38    write: TransportWriter,
39}
40
41impl MessageSender {
42    pub async fn send_raw(&self, raw_message: RawNetMessage) -> Result<()> {
43        self.write.lock().await.send(raw_message).await?;
44        Ok(())
45    }
46}
47
48/// A connection to the steam server
49#[derive(Clone)]
50pub struct Connection(RawConnection);
51
52impl Debug for Connection {
53    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
54        f.debug_struct("Connection").finish_non_exhaustive()
55    }
56}
57
58impl Connection {
59    pub(self) fn new(raw: RawConnection) -> Self {
60        Self(raw)
61    }
62
63    pub async fn anonymous(server_list: &ServerList) -> Result<Self, ConnectionError> {
64        UnAuthenticatedConnection::connect(server_list)
65            .await?
66            .anonymous()
67            .await
68    }
69
70    pub async fn anonymous_server(server_list: &ServerList) -> Result<Self, ConnectionError> {
71        UnAuthenticatedConnection::connect(server_list)
72            .await?
73            .anonymous_server()
74            .await
75    }
76
77    pub async fn login<H: AuthConfirmationHandler, G: GuardDataStore>(
78        server_list: &ServerList,
79        account: &str,
80        password: &str,
81        guard_data_store: G,
82        confirmation_handler: H,
83    ) -> Result<Self, ConnectionError> {
84        UnAuthenticatedConnection::connect(server_list)
85            .await?
86            .login(account, password, guard_data_store, confirmation_handler)
87            .await
88    }
89
90    pub fn steam_id(&self) -> SteamID {
91        self.session().steam_id
92    }
93
94    pub fn session_id(&self) -> i32 {
95        self.session().session_id
96    }
97
98    pub fn cell_id(&self) -> u32 {
99        self.session().cell_id
100    }
101
102    pub fn public_ip(&self) -> Option<IpAddr> {
103        self.session().public_ip
104    }
105
106    pub fn ip_country_code(&self) -> Option<String> {
107        self.session().ip_country_code.clone()
108    }
109
110    pub fn set_timeout(&mut self, timeout: Duration) {
111        self.0.timeout = timeout;
112    }
113
114    pub(crate) fn sender(&self) -> &MessageSender {
115        &self.0.sender
116    }
117
118    /// Get all messages that haven't been filtered by any of the filters
119    ///
120    /// Note that at most 32 unprocessed connections are stored and calling
121    /// this method clears the buffer
122    pub fn take_unprocessed(&self) -> Vec<RawNetMessage> {
123        self.0.filter.unprocessed()
124    }
125}
126
127pub(crate) trait ConnectionImpl: Sync + Debug {
128    fn timeout(&self) -> Duration;
129    fn filter(&self) -> &MessageFilter;
130    fn session(&self) -> &Session;
131
132    fn raw_send_with_kind<Msg: EncodableMessage, K: MsgKindEnum>(
133        &self,
134        header: NetMessageHeader,
135        msg: Msg,
136        kind: K,
137        is_protobuf: bool,
138    ) -> impl Future<Output = Result<()>> + Send;
139}
140
141/// A trait for connections that only allow listening for messages coming from steam
142pub trait ReadonlyConnection {
143    fn on_notification<T: ServiceMethodRequest>(&self) -> impl Stream<Item = Result<T>> + 'static;
144
145    /// Wait for one message of a specific kind, also returning the header
146    fn one_with_header<T: NetMessage + 'static>(
147        &self,
148    ) -> impl Future<Output = Result<(NetMessageHeader, T)>> + 'static;
149
150    /// Wait for one message of a specific kind
151    fn one<T: NetMessage + 'static>(&self) -> impl Future<Output = Result<T>> + 'static;
152
153    /// Listen to messages of a specific kind, also returning the header
154    fn on_with_header<T: NetMessage + 'static>(
155        &self,
156    ) -> impl Stream<Item = Result<(NetMessageHeader, T)>> + 'static;
157
158    /// Listen to messages of a specific kind
159    fn on<T: NetMessage + 'static>(&self) -> impl Stream<Item = Result<T>> + 'static;
160}
161
162/// A trait for sending messages to steam
163pub trait ConnectionTrait {
164    fn on_notification<T: ServiceMethodRequest>(&self) -> impl Stream<Item = Result<T>> + 'static;
165
166    /// Wait for one message of a specific kind, also returning the header
167    fn one_with_header<T: NetMessage + 'static>(
168        &self,
169    ) -> impl Future<Output = Result<(NetMessageHeader, T)>> + 'static;
170
171    /// Wait for one message of a specific kind
172    fn one<T: NetMessage + 'static>(&self) -> impl Future<Output = Result<T>> + 'static;
173
174    /// Listen to messages of a specific kind, also returning the header
175    fn on_with_header<T: NetMessage + 'static>(
176        &self,
177    ) -> impl Stream<Item = Result<(NetMessageHeader, T)>> + 'static;
178
179    /// Listen to messages of a specific kind
180    fn on<T: NetMessage + 'static>(&self) -> impl Stream<Item = Result<T>> + 'static;
181
182    /// Send a rpc-request to steam, waiting for the matching rpc-response
183    fn service_method<Msg: ServiceMethodRequest>(
184        &self,
185        msg: Msg,
186    ) -> impl Future<Output = Result<Msg::Response>> + Send;
187
188    /// Send a message to steam, waiting for a response with the same job id
189    fn job<Msg: NetMessage, Rsp: NetMessage>(
190        &self,
191        msg: Msg,
192    ) -> impl Future<Output = Result<Rsp>> + Send;
193
194    /// Send a message to steam, receiving responses until the response marks that the response is complete
195    fn job_multi<Msg: NetMessage, Rsp: NetMessage + JobMultiple>(
196        &self,
197        msg: Msg,
198    ) -> impl Stream<Item = Result<Rsp>> + Send;
199
200    /// Send a message to steam without waiting for a response
201    fn send<Msg: NetMessage>(&self, msg: Msg) -> impl Future<Output = Result<()>> + Send;
202
203    /// Send a message to steam without waiting for a response, overwriting the kind of the message
204    fn send_with_kind<Msg: NetMessage, K: MsgKindEnum>(
205        &self,
206        msg: Msg,
207        kind: K,
208    ) -> impl Future<Output = Result<()>> + Send;
209
210    fn raw_send<Msg: NetMessage>(
211        &self,
212        header: NetMessageHeader,
213        msg: Msg,
214    ) -> impl Future<Output = Result<()>> + Send;
215
216    fn raw_send_with_kind<Msg: EncodableMessage, K: MsgKindEnum>(
217        &self,
218        header: NetMessageHeader,
219        msg: Msg,
220        kind: K,
221        is_protobuf: bool,
222    ) -> impl Future<Output = Result<()>> + Send;
223}
224
225impl ConnectionImpl for Connection {
226    fn timeout(&self) -> Duration {
227        self.0.timeout()
228    }
229
230    fn filter(&self) -> &MessageFilter {
231        self.0.filter()
232    }
233
234    fn session(&self) -> &Session {
235        self.0.session()
236    }
237
238    async fn raw_send_with_kind<Msg: EncodableMessage, K: MsgKindEnum>(
239        &self,
240        header: NetMessageHeader,
241        msg: Msg,
242        kind: K,
243        is_protobuf: bool,
244    ) -> Result<()> {
245        <RawConnection as ConnectionImpl>::raw_send_with_kind(
246            &self.0,
247            header,
248            msg,
249            kind,
250            is_protobuf,
251        )
252        .await
253    }
254}
255
256impl<C: ConnectionImpl> ConnectionTrait for C {
257    fn on_notification<T: ServiceMethodRequest>(&self) -> impl Stream<Item = Result<T>> + 'static {
258        BroadcastStream::new(self.filter().on_notification(T::REQ_NAME))
259            .filter_map(|res| res.ok())
260            .map(|raw| raw.into_notification())
261    }
262
263    fn one_with_header<T: NetMessage + 'static>(
264        &self,
265    ) -> impl Future<Output = Result<(NetMessageHeader, T)>> + 'static {
266        // async block instead of async fn, so we don't have to tie the lifetime of the returned future
267        // to the lifetime of &self
268        let fut = self.filter().one_kind(T::KIND);
269        async move {
270            let raw = fut.await.map_err(|_| NetworkError::EOF)?;
271            raw.into_header_and_message()
272        }
273    }
274
275    fn one<T: NetMessage + 'static>(&self) -> impl Future<Output = Result<T>> + 'static {
276        self.one_with_header::<T>()
277            .map(|res| res.map(|(_, msg)| msg))
278    }
279
280    fn on_with_header<T: NetMessage + 'static>(
281        &self,
282    ) -> impl Stream<Item = Result<(NetMessageHeader, T)>> + 'static {
283        BroadcastStream::new(self.filter().on_kind(T::KIND)).map(|raw| {
284            let raw = raw.map_err(|_| NetworkError::EOF)?;
285            raw.into_header_and_message()
286        })
287    }
288
289    fn on<T: NetMessage + 'static>(&self) -> impl Stream<Item = Result<T>> + 'static {
290        self.on_with_header::<T>()
291            .map(|res| res.map(|(_, msg)| msg))
292    }
293
294    async fn service_method<Msg: ServiceMethodRequest>(&self, msg: Msg) -> Result<Msg::Response> {
295        let header = self.session().header(true);
296        let recv = self.filter().on_job_id(header.source_job_id);
297        self.raw_send(header, ServiceMethodMessage(msg)).await?;
298        let message = timeout(self.timeout(), recv)
299            .await
300            .map_err(|_| NetworkError::Timeout)?
301            .map_err(|_| NetworkError::EOF)?
302            .into_message::<ServiceMethodResponseMessage>()?;
303        message.into_response::<Msg>()
304    }
305
306    async fn job<Msg: NetMessage, Rsp: NetMessage>(&self, msg: Msg) -> Result<Rsp> {
307        let header = self.session().header(true);
308        let recv = self.filter().on_job_id(header.source_job_id);
309        self.raw_send(header, msg).await?;
310        timeout(self.timeout(), recv)
311            .await
312            .map_err(|_| NetworkError::Timeout)?
313            .map_err(|_| NetworkError::EOF)?
314            .into_message()
315    }
316
317    fn job_multi<Msg: NetMessage, Rsp: NetMessage + JobMultiple>(
318        &self,
319        msg: Msg,
320    ) -> impl Stream<Item = Result<Rsp>> + Send {
321        try_stream! {
322            let header = self.session().header(true);
323            let source_job_id = header.source_job_id;
324            let mut recv = self.filter().on_job_id_multi(source_job_id);
325            self.raw_send(header, msg).await?;
326            loop {
327                let msg: Rsp = timeout(self.timeout(), recv.recv())
328                    .await
329                    .map_err(|_| NetworkError::Timeout)?
330                    .ok_or(NetworkError::EOF)?
331                    .into_message()?;
332                let completed = msg.completed();
333                yield msg;
334                if completed {
335                    break;
336                }
337            }
338            self.filter().complete_job_id_multi(source_job_id);
339        }
340    }
341
342    #[instrument(skip(msg), fields(kind = ?Msg::KIND))]
343    fn send<Msg: NetMessage>(&self, msg: Msg) -> impl Future<Output = Result<()>> + Send {
344        self.raw_send(self.session().header(false), msg)
345    }
346
347    #[instrument(skip(msg, kind), fields(kind = ?kind))]
348    fn send_with_kind<Msg: NetMessage, K: MsgKindEnum>(
349        &self,
350        msg: Msg,
351        kind: K,
352    ) -> impl Future<Output = Result<()>> + Send {
353        let header = self.session().header(false);
354        self.raw_send_with_kind(header, msg, kind, Msg::IS_PROTOBUF)
355    }
356
357    fn raw_send<Msg: NetMessage>(
358        &self,
359        header: NetMessageHeader,
360        msg: Msg,
361    ) -> impl Future<Output = Result<()>> + Send {
362        self.raw_send_with_kind(header, msg, Msg::KIND, Msg::IS_PROTOBUF)
363    }
364
365    fn raw_send_with_kind<Msg: EncodableMessage, K: MsgKindEnum>(
366        &self,
367        header: NetMessageHeader,
368        msg: Msg,
369        kind: K,
370        is_protobuf: bool,
371    ) -> impl Future<Output = Result<()>> + Send {
372        <Self as ConnectionImpl>::raw_send_with_kind(self, header, msg, kind, is_protobuf)
373    }
374}