1mod filter;
2pub(crate) mod raw;
3pub(crate) mod unauthenticated;
4
5use crate::auth::{AuthConfirmationHandler, GuardDataStore};
6use crate::message::{
7 EncodableMessage, NetMessage, ServiceMethodMessage, ServiceMethodResponseMessage,
8};
9use crate::net::{NetMessageHeader, NetworkError, RawNetMessage};
10use crate::serverlist::ServerList;
11use crate::service_method::ServiceMethodRequest;
12use crate::session::{ConnectionError, Session};
13use crate::GameCoordinator;
14use async_stream::try_stream;
15pub(crate) use filter::MessageFilter;
16use futures_util::{FutureExt, Sink, SinkExt};
17use raw::RawConnection;
18use std::fmt::{Debug, Formatter};
19use std::future::Future;
20use std::net::IpAddr;
21use std::sync::Arc;
22use std::time::Duration;
23use steam_vent_proto::{GCHandshake, JobMultiple, MsgKindEnum};
24use steamid_ng::SteamID;
25use tokio::sync::Mutex;
26use tokio::time::timeout;
27use tokio_stream::wrappers::BroadcastStream;
28use tokio_stream::{Stream, StreamExt};
29use tracing::instrument;
30pub use unauthenticated::UnAuthenticatedConnection;
31
32pub(crate) type Result<T, E = NetworkError> = std::result::Result<T, E>;
33
34type TransportWriter = Arc<Mutex<dyn Sink<RawNetMessage, Error = NetworkError> + Unpin + Send>>;
35
36#[derive(Clone)]
38pub(crate) struct MessageSender {
39 write: TransportWriter,
40}
41
42impl MessageSender {
43 pub async fn send_raw(&self, raw_message: RawNetMessage) -> Result<()> {
44 self.write.lock().await.send(raw_message).await?;
45 Ok(())
46 }
47}
48
49#[derive(Clone)]
51pub struct Connection(RawConnection);
52
53impl Debug for Connection {
54 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55 f.debug_struct("Connection").finish_non_exhaustive()
56 }
57}
58
59impl Connection {
60 pub(self) fn new(raw: RawConnection) -> Self {
61 Self(raw)
62 }
63
64 pub async fn anonymous(server_list: &ServerList) -> Result<Self, ConnectionError> {
66 UnAuthenticatedConnection::connect(server_list)
67 .await?
68 .anonymous()
69 .await
70 }
71
72 pub async fn anonymous_server(server_list: &ServerList) -> Result<Self, ConnectionError> {
74 UnAuthenticatedConnection::connect(server_list)
75 .await?
76 .anonymous_server()
77 .await
78 }
79
80 pub async fn login<H: AuthConfirmationHandler, G: GuardDataStore>(
82 server_list: &ServerList,
83 account: &str,
84 password: &str,
85 guard_data_store: G,
86 confirmation_handler: H,
87 ) -> Result<Self, ConnectionError> {
88 UnAuthenticatedConnection::connect(server_list)
89 .await?
90 .login(account, password, guard_data_store, confirmation_handler)
91 .await
92 }
93
94 pub async fn access(
95 server_list: &ServerList,
96 account: &str,
97 access_token: &str,
98 ) -> Result<Self, ConnectionError> {
99 UnAuthenticatedConnection::connect(server_list)
100 .await?
101 .access(account, access_token)
102 .await
103 }
104
105 pub fn access_token(&self) -> Option<&str> {
106 self.session().access_token.as_deref()
107 }
108
109 pub fn steam_id(&self) -> SteamID {
110 self.session().steam_id
111 }
112
113 pub fn session_id(&self) -> i32 {
114 self.session().session_id
115 }
116
117 pub fn cell_id(&self) -> u32 {
118 self.session().cell_id
119 }
120
121 pub fn public_ip(&self) -> Option<IpAddr> {
122 self.session().public_ip
123 }
124
125 pub fn ip_country_code(&self) -> Option<String> {
126 self.session().ip_country_code.clone()
127 }
128
129 pub fn set_timeout(&mut self, timeout: Duration) {
130 self.0.timeout = timeout;
131 }
132
133 pub(crate) fn sender(&self) -> &MessageSender {
134 &self.0.sender
135 }
136
137 pub fn take_unprocessed(&self) -> Vec<RawNetMessage> {
142 self.0.filter.unprocessed()
143 }
144}
145
146impl Connection {
147 pub async fn game_coordinator<Handshake: GCHandshake>(
149 &self,
150 handshake: &Handshake,
151 ) -> Result<(GameCoordinator, Handshake::Welcome), NetworkError> {
152 GameCoordinator::with_handshake(self, handshake).await
153 }
154}
155
156pub(crate) trait ConnectionImpl: Sync + Debug {
157 fn timeout(&self) -> Duration;
158 fn filter(&self) -> &MessageFilter;
159 fn session(&self) -> &Session;
160
161 fn raw_send_with_kind<Msg: EncodableMessage, K: MsgKindEnum>(
162 &self,
163 header: NetMessageHeader,
164 msg: Msg,
165 kind: K,
166 is_protobuf: bool,
167 ) -> impl Future<Output = Result<()>> + Send;
168}
169
170pub trait ReadonlyConnection {
172 fn on_notification<T: ServiceMethodRequest>(&self) -> impl Stream<Item = Result<T>> + 'static;
173
174 fn one_with_header<T: NetMessage + 'static>(
176 &self,
177 ) -> impl Future<Output = Result<(NetMessageHeader, T)>> + 'static;
178
179 fn one<T: NetMessage + 'static>(&self) -> impl Future<Output = Result<T>> + 'static;
181
182 fn on_with_header<T: NetMessage + 'static>(
184 &self,
185 ) -> impl Stream<Item = Result<(NetMessageHeader, T)>> + 'static;
186
187 fn on<T: NetMessage + 'static>(&self) -> impl Stream<Item = Result<T>> + 'static;
189}
190
191pub trait ConnectionTrait {
193 fn on_notification<T: ServiceMethodRequest>(&self) -> impl Stream<Item = Result<T>> + 'static;
195
196 fn one_with_header<T: NetMessage + 'static>(
198 &self,
199 ) -> impl Future<Output = Result<(NetMessageHeader, T)>> + 'static;
200
201 fn one<T: NetMessage + 'static>(&self) -> impl Future<Output = Result<T>> + 'static;
203
204 fn on_with_header<T: NetMessage + 'static>(
206 &self,
207 ) -> impl Stream<Item = Result<(NetMessageHeader, T)>> + 'static;
208
209 fn on<T: NetMessage + 'static>(&self) -> impl Stream<Item = Result<T>> + 'static;
211
212 fn service_method<Msg: ServiceMethodRequest>(
214 &self,
215 msg: Msg,
216 ) -> impl Future<Output = Result<Msg::Response>> + Send;
217
218 fn job<Msg: NetMessage, Rsp: NetMessage>(
220 &self,
221 msg: Msg,
222 ) -> impl Future<Output = Result<Rsp>> + Send;
223
224 fn job_multi<Msg: NetMessage, Rsp: NetMessage + JobMultiple>(
226 &self,
227 msg: Msg,
228 ) -> impl Stream<Item = Result<Rsp>> + Send;
229
230 fn send<Msg: NetMessage>(&self, msg: Msg) -> impl Future<Output = Result<()>> + Send;
232
233 fn send_with_kind<Msg: NetMessage, K: MsgKindEnum>(
235 &self,
236 msg: Msg,
237 kind: K,
238 ) -> impl Future<Output = Result<()>> + Send;
239
240 fn raw_send<Msg: NetMessage>(
242 &self,
243 header: NetMessageHeader,
244 msg: Msg,
245 ) -> impl Future<Output = Result<()>> + Send;
246
247 fn raw_send_with_kind<Msg: EncodableMessage, K: MsgKindEnum>(
249 &self,
250 header: NetMessageHeader,
251 msg: Msg,
252 kind: K,
253 is_protobuf: bool,
254 ) -> impl Future<Output = Result<()>> + Send;
255}
256
257impl ConnectionImpl for Connection {
258 fn timeout(&self) -> Duration {
259 self.0.timeout()
260 }
261
262 fn filter(&self) -> &MessageFilter {
263 self.0.filter()
264 }
265
266 fn session(&self) -> &Session {
267 self.0.session()
268 }
269
270 async fn raw_send_with_kind<Msg: EncodableMessage, K: MsgKindEnum>(
271 &self,
272 header: NetMessageHeader,
273 msg: Msg,
274 kind: K,
275 is_protobuf: bool,
276 ) -> Result<()> {
277 <RawConnection as ConnectionImpl>::raw_send_with_kind(
278 &self.0,
279 header,
280 msg,
281 kind,
282 is_protobuf,
283 )
284 .await
285 }
286}
287
288impl<C: ConnectionImpl> ConnectionTrait for C {
289 fn on_notification<T: ServiceMethodRequest>(&self) -> impl Stream<Item = Result<T>> + 'static {
290 BroadcastStream::new(self.filter().on_notification(T::REQ_NAME))
291 .filter_map(|res| res.ok())
292 .map(|raw| raw.into_notification())
293 }
294
295 fn one_with_header<T: NetMessage + 'static>(
296 &self,
297 ) -> impl Future<Output = Result<(NetMessageHeader, T)>> + 'static {
298 let fut = self.filter().one_kind(T::KIND);
301 async move {
302 let raw = fut.await.map_err(|_| NetworkError::EOF)?;
303 raw.into_header_and_message()
304 }
305 }
306
307 fn one<T: NetMessage + 'static>(&self) -> impl Future<Output = Result<T>> + 'static {
308 self.one_with_header::<T>()
309 .map(|res| res.map(|(_, msg)| msg))
310 }
311
312 fn on_with_header<T: NetMessage + 'static>(
313 &self,
314 ) -> impl Stream<Item = Result<(NetMessageHeader, T)>> + 'static {
315 BroadcastStream::new(self.filter().on_kind(T::KIND)).map(|raw| {
316 let raw = raw.map_err(|_| NetworkError::EOF)?;
317 raw.into_header_and_message()
318 })
319 }
320
321 fn on<T: NetMessage + 'static>(&self) -> impl Stream<Item = Result<T>> + 'static {
322 self.on_with_header::<T>()
323 .map(|res| res.map(|(_, msg)| msg))
324 }
325
326 async fn service_method<Msg: ServiceMethodRequest>(&self, msg: Msg) -> Result<Msg::Response> {
327 let header = self.session().header(true);
328 let recv = self.filter().on_job_id(header.source_job_id);
329 self.raw_send(header, ServiceMethodMessage(msg)).await?;
330 let message = timeout(self.timeout(), recv)
331 .await
332 .map_err(|_| NetworkError::Timeout)?
333 .map_err(|_| NetworkError::EOF)?
334 .into_message::<ServiceMethodResponseMessage>()?;
335 message.into_response::<Msg>()
336 }
337
338 async fn job<Msg: NetMessage, Rsp: NetMessage>(&self, msg: Msg) -> Result<Rsp> {
339 let header = self.session().header(true);
340 let recv = self.filter().on_job_id(header.source_job_id);
341 self.raw_send(header, msg).await?;
342 timeout(self.timeout(), recv)
343 .await
344 .map_err(|_| NetworkError::Timeout)?
345 .map_err(|_| NetworkError::EOF)?
346 .into_message()
347 }
348
349 fn job_multi<Msg: NetMessage, Rsp: NetMessage + JobMultiple>(
350 &self,
351 msg: Msg,
352 ) -> impl Stream<Item = Result<Rsp>> + Send {
353 try_stream! {
354 let header = self.session().header(true);
355 let source_job_id = header.source_job_id;
356 let mut recv = self.filter().on_job_id_multi(source_job_id);
357 self.raw_send(header, msg).await?;
358 loop {
359 let msg: Rsp = timeout(self.timeout(), recv.recv())
360 .await
361 .map_err(|_| NetworkError::Timeout)?
362 .ok_or(NetworkError::EOF)?
363 .into_message()?;
364 let completed = msg.completed();
365 yield msg;
366 if completed {
367 break;
368 }
369 }
370 self.filter().complete_job_id_multi(source_job_id);
371 }
372 }
373
374 #[instrument(skip(msg), fields(kind = ?Msg::KIND))]
375 fn send<Msg: NetMessage>(&self, msg: Msg) -> impl Future<Output = Result<()>> + Send {
376 self.raw_send(self.session().header(false), msg)
377 }
378
379 #[instrument(skip(msg, kind), fields(kind = ?kind))]
380 fn send_with_kind<Msg: NetMessage, K: MsgKindEnum>(
381 &self,
382 msg: Msg,
383 kind: K,
384 ) -> impl Future<Output = Result<()>> + Send {
385 let header = self.session().header(false);
386 self.raw_send_with_kind(header, msg, kind, Msg::IS_PROTOBUF)
387 }
388
389 fn raw_send<Msg: NetMessage>(
390 &self,
391 header: NetMessageHeader,
392 msg: Msg,
393 ) -> impl Future<Output = Result<()>> + Send {
394 self.raw_send_with_kind(header, msg, Msg::KIND, Msg::IS_PROTOBUF)
395 }
396
397 fn raw_send_with_kind<Msg: EncodableMessage, K: MsgKindEnum>(
398 &self,
399 header: NetMessageHeader,
400 msg: Msg,
401 kind: K,
402 is_protobuf: bool,
403 ) -> impl Future<Output = Result<()>> + Send {
404 <Self as ConnectionImpl>::raw_send_with_kind(self, header, msg, kind, is_protobuf)
405 }
406}