1mod filter;
2pub(crate) mod raw;
3pub(crate) mod unauthenticated;
4
5use crate::auth::{AuthConfirmationHandler, GuardDataStore};
6use crate::message::{
7 EncodableMessage, NetMessage, ServiceMethodMessage, ServiceMethodResponseMessage,
8};
9use crate::net::{NetMessageHeader, NetworkError, RawNetMessage};
10use crate::serverlist::ServerList;
11use crate::service_method::ServiceMethodRequest;
12use crate::session::{ConnectionError, Session};
13use async_stream::try_stream;
14pub(crate) use filter::MessageFilter;
15use futures_util::{FutureExt, Sink, SinkExt};
16use raw::RawConnection;
17use std::fmt::{Debug, Formatter};
18use std::future::Future;
19use std::net::IpAddr;
20use std::sync::Arc;
21use std::time::Duration;
22use steam_vent_proto::{JobMultiple, MsgKindEnum};
23use steamid_ng::SteamID;
24use tokio::sync::Mutex;
25use tokio::time::timeout;
26use tokio_stream::wrappers::BroadcastStream;
27use tokio_stream::{Stream, StreamExt};
28use tracing::instrument;
29pub use unauthenticated::UnAuthenticatedConnection;
30
31pub(crate) type Result<T, E = NetworkError> = std::result::Result<T, E>;
32
33type TransportWriter = Arc<Mutex<dyn Sink<RawNetMessage, Error = NetworkError> + Unpin + Send>>;
34
35#[derive(Clone)]
37pub(crate) struct MessageSender {
38 write: TransportWriter,
39}
40
41impl MessageSender {
42 pub async fn send_raw(&self, raw_message: RawNetMessage) -> Result<()> {
43 self.write.lock().await.send(raw_message).await?;
44 Ok(())
45 }
46}
47
48#[derive(Clone)]
50pub struct Connection(RawConnection);
51
52impl Debug for Connection {
53 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
54 f.debug_struct("Connection").finish_non_exhaustive()
55 }
56}
57
58impl Connection {
59 pub(self) fn new(raw: RawConnection) -> Self {
60 Self(raw)
61 }
62
63 pub async fn anonymous(server_list: &ServerList) -> Result<Self, ConnectionError> {
64 UnAuthenticatedConnection::connect(server_list)
65 .await?
66 .anonymous()
67 .await
68 }
69
70 pub async fn anonymous_server(server_list: &ServerList) -> Result<Self, ConnectionError> {
71 UnAuthenticatedConnection::connect(server_list)
72 .await?
73 .anonymous_server()
74 .await
75 }
76
77 pub async fn login<H: AuthConfirmationHandler, G: GuardDataStore>(
78 server_list: &ServerList,
79 account: &str,
80 password: &str,
81 guard_data_store: G,
82 confirmation_handler: H,
83 ) -> Result<Self, ConnectionError> {
84 UnAuthenticatedConnection::connect(server_list)
85 .await?
86 .login(account, password, guard_data_store, confirmation_handler)
87 .await
88 }
89
90 pub fn steam_id(&self) -> SteamID {
91 self.session().steam_id
92 }
93
94 pub fn session_id(&self) -> i32 {
95 self.session().session_id
96 }
97
98 pub fn cell_id(&self) -> u32 {
99 self.session().cell_id
100 }
101
102 pub fn public_ip(&self) -> Option<IpAddr> {
103 self.session().public_ip
104 }
105
106 pub fn ip_country_code(&self) -> Option<String> {
107 self.session().ip_country_code.clone()
108 }
109
110 pub fn set_timeout(&mut self, timeout: Duration) {
111 self.0.timeout = timeout;
112 }
113
114 pub(crate) fn sender(&self) -> &MessageSender {
115 &self.0.sender
116 }
117
118 pub fn take_unprocessed(&self) -> Vec<RawNetMessage> {
123 self.0.filter.unprocessed()
124 }
125}
126
127pub(crate) trait ConnectionImpl: Sync + Debug {
128 fn timeout(&self) -> Duration;
129 fn filter(&self) -> &MessageFilter;
130 fn session(&self) -> &Session;
131
132 fn raw_send_with_kind<Msg: EncodableMessage, K: MsgKindEnum>(
133 &self,
134 header: NetMessageHeader,
135 msg: Msg,
136 kind: K,
137 is_protobuf: bool,
138 ) -> impl Future<Output = Result<()>> + Send;
139}
140
141pub trait ReadonlyConnection {
143 fn on_notification<T: ServiceMethodRequest>(&self) -> impl Stream<Item = Result<T>> + 'static;
144
145 fn one_with_header<T: NetMessage + 'static>(
147 &self,
148 ) -> impl Future<Output = Result<(NetMessageHeader, T)>> + 'static;
149
150 fn one<T: NetMessage + 'static>(&self) -> impl Future<Output = Result<T>> + 'static;
152
153 fn on_with_header<T: NetMessage + 'static>(
155 &self,
156 ) -> impl Stream<Item = Result<(NetMessageHeader, T)>> + 'static;
157
158 fn on<T: NetMessage + 'static>(&self) -> impl Stream<Item = Result<T>> + 'static;
160}
161
162pub trait ConnectionTrait {
164 fn on_notification<T: ServiceMethodRequest>(&self) -> impl Stream<Item = Result<T>> + 'static;
165
166 fn one_with_header<T: NetMessage + 'static>(
168 &self,
169 ) -> impl Future<Output = Result<(NetMessageHeader, T)>> + 'static;
170
171 fn one<T: NetMessage + 'static>(&self) -> impl Future<Output = Result<T>> + 'static;
173
174 fn on_with_header<T: NetMessage + 'static>(
176 &self,
177 ) -> impl Stream<Item = Result<(NetMessageHeader, T)>> + 'static;
178
179 fn on<T: NetMessage + 'static>(&self) -> impl Stream<Item = Result<T>> + 'static;
181
182 fn service_method<Msg: ServiceMethodRequest>(
184 &self,
185 msg: Msg,
186 ) -> impl Future<Output = Result<Msg::Response>> + Send;
187
188 fn job<Msg: NetMessage, Rsp: NetMessage>(
190 &self,
191 msg: Msg,
192 ) -> impl Future<Output = Result<Rsp>> + Send;
193
194 fn job_multi<Msg: NetMessage, Rsp: NetMessage + JobMultiple>(
196 &self,
197 msg: Msg,
198 ) -> impl Stream<Item = Result<Rsp>> + Send;
199
200 fn send<Msg: NetMessage>(&self, msg: Msg) -> impl Future<Output = Result<()>> + Send;
202
203 fn send_with_kind<Msg: NetMessage, K: MsgKindEnum>(
205 &self,
206 msg: Msg,
207 kind: K,
208 ) -> impl Future<Output = Result<()>> + Send;
209
210 fn raw_send<Msg: NetMessage>(
211 &self,
212 header: NetMessageHeader,
213 msg: Msg,
214 ) -> impl Future<Output = Result<()>> + Send;
215
216 fn raw_send_with_kind<Msg: EncodableMessage, K: MsgKindEnum>(
217 &self,
218 header: NetMessageHeader,
219 msg: Msg,
220 kind: K,
221 is_protobuf: bool,
222 ) -> impl Future<Output = Result<()>> + Send;
223}
224
225impl ConnectionImpl for Connection {
226 fn timeout(&self) -> Duration {
227 self.0.timeout()
228 }
229
230 fn filter(&self) -> &MessageFilter {
231 self.0.filter()
232 }
233
234 fn session(&self) -> &Session {
235 self.0.session()
236 }
237
238 async fn raw_send_with_kind<Msg: EncodableMessage, K: MsgKindEnum>(
239 &self,
240 header: NetMessageHeader,
241 msg: Msg,
242 kind: K,
243 is_protobuf: bool,
244 ) -> Result<()> {
245 <RawConnection as ConnectionImpl>::raw_send_with_kind(
246 &self.0,
247 header,
248 msg,
249 kind,
250 is_protobuf,
251 )
252 .await
253 }
254}
255
256impl<C: ConnectionImpl> ConnectionTrait for C {
257 fn on_notification<T: ServiceMethodRequest>(&self) -> impl Stream<Item = Result<T>> + 'static {
258 BroadcastStream::new(self.filter().on_notification(T::REQ_NAME))
259 .filter_map(|res| res.ok())
260 .map(|raw| raw.into_notification())
261 }
262
263 fn one_with_header<T: NetMessage + 'static>(
264 &self,
265 ) -> impl Future<Output = Result<(NetMessageHeader, T)>> + 'static {
266 let fut = self.filter().one_kind(T::KIND);
269 async move {
270 let raw = fut.await.map_err(|_| NetworkError::EOF)?;
271 raw.into_header_and_message()
272 }
273 }
274
275 fn one<T: NetMessage + 'static>(&self) -> impl Future<Output = Result<T>> + 'static {
276 self.one_with_header::<T>()
277 .map(|res| res.map(|(_, msg)| msg))
278 }
279
280 fn on_with_header<T: NetMessage + 'static>(
281 &self,
282 ) -> impl Stream<Item = Result<(NetMessageHeader, T)>> + 'static {
283 BroadcastStream::new(self.filter().on_kind(T::KIND)).map(|raw| {
284 let raw = raw.map_err(|_| NetworkError::EOF)?;
285 raw.into_header_and_message()
286 })
287 }
288
289 fn on<T: NetMessage + 'static>(&self) -> impl Stream<Item = Result<T>> + 'static {
290 self.on_with_header::<T>()
291 .map(|res| res.map(|(_, msg)| msg))
292 }
293
294 async fn service_method<Msg: ServiceMethodRequest>(&self, msg: Msg) -> Result<Msg::Response> {
295 let header = self.session().header(true);
296 let recv = self.filter().on_job_id(header.source_job_id);
297 self.raw_send(header, ServiceMethodMessage(msg)).await?;
298 let message = timeout(self.timeout(), recv)
299 .await
300 .map_err(|_| NetworkError::Timeout)?
301 .map_err(|_| NetworkError::EOF)?
302 .into_message::<ServiceMethodResponseMessage>()?;
303 message.into_response::<Msg>()
304 }
305
306 async fn job<Msg: NetMessage, Rsp: NetMessage>(&self, msg: Msg) -> Result<Rsp> {
307 let header = self.session().header(true);
308 let recv = self.filter().on_job_id(header.source_job_id);
309 self.raw_send(header, msg).await?;
310 timeout(self.timeout(), recv)
311 .await
312 .map_err(|_| NetworkError::Timeout)?
313 .map_err(|_| NetworkError::EOF)?
314 .into_message()
315 }
316
317 fn job_multi<Msg: NetMessage, Rsp: NetMessage + JobMultiple>(
318 &self,
319 msg: Msg,
320 ) -> impl Stream<Item = Result<Rsp>> + Send {
321 try_stream! {
322 let header = self.session().header(true);
323 let source_job_id = header.source_job_id;
324 let mut recv = self.filter().on_job_id_multi(source_job_id);
325 self.raw_send(header, msg).await?;
326 loop {
327 let msg: Rsp = timeout(self.timeout(), recv.recv())
328 .await
329 .map_err(|_| NetworkError::Timeout)?
330 .ok_or(NetworkError::EOF)?
331 .into_message()?;
332 let completed = msg.completed();
333 yield msg;
334 if completed {
335 break;
336 }
337 }
338 self.filter().complete_job_id_multi(source_job_id);
339 }
340 }
341
342 #[instrument(skip(msg), fields(kind = ?Msg::KIND))]
343 fn send<Msg: NetMessage>(&self, msg: Msg) -> impl Future<Output = Result<()>> + Send {
344 self.raw_send(self.session().header(false), msg)
345 }
346
347 #[instrument(skip(msg, kind), fields(kind = ?kind))]
348 fn send_with_kind<Msg: NetMessage, K: MsgKindEnum>(
349 &self,
350 msg: Msg,
351 kind: K,
352 ) -> impl Future<Output = Result<()>> + Send {
353 let header = self.session().header(false);
354 self.raw_send_with_kind(header, msg, kind, Msg::IS_PROTOBUF)
355 }
356
357 fn raw_send<Msg: NetMessage>(
358 &self,
359 header: NetMessageHeader,
360 msg: Msg,
361 ) -> impl Future<Output = Result<()>> + Send {
362 self.raw_send_with_kind(header, msg, Msg::KIND, Msg::IS_PROTOBUF)
363 }
364
365 fn raw_send_with_kind<Msg: EncodableMessage, K: MsgKindEnum>(
366 &self,
367 header: NetMessageHeader,
368 msg: Msg,
369 kind: K,
370 is_protobuf: bool,
371 ) -> impl Future<Output = Result<()>> + Send {
372 <Self as ConnectionImpl>::raw_send_with_kind(self, header, msg, kind, is_protobuf)
373 }
374}