1use super::raw::RawConnection;
2use super::{ReadonlyConnection, Result};
3use crate::auth::{AuthConfirmationHandler, GuardDataStore, begin_password_auth};
4use crate::message::{ServiceMethodMessage, ServiceMethodResponseMessage};
5use crate::net::{NetMessageHeader, RawNetMessage};
6use crate::service_method::ServiceMethodRequest;
7use crate::session::{anonymous, login};
8use crate::{Connection, ConnectionError, LoginError, NetMessage, NetworkError, ServerList};
9use base64::Engine;
10use base64::prelude::BASE64_URL_SAFE_NO_PAD;
11use bytes::BytesMut;
12use futures_util::Stream;
13use futures_util::future::{Either, select};
14use futures_util::{FutureExt, Sink};
15use serde::Deserialize;
16use std::future::Future;
17use std::pin::pin;
18use steam_vent_proto::enums_clientserver::EMsg;
19use steamid_ng::{AccountType, SteamID};
20use thiserror::Error;
21use tokio::time::timeout;
22use tokio_stream::StreamExt;
23use tokio_stream::wrappers::BroadcastStream;
24use tracing::{debug, error};
25
26#[derive(Deserialize)]
28#[non_exhaustive]
29pub struct AccessToken {
30 pub iss: String,
31 pub sub: String,
32 #[allow(dead_code)]
33 pub exp: u64,
34 }
36
37#[derive(Debug, Error)]
38pub enum AccessTokenError {
39 #[error("expired")]
40 Expired,
41 #[error("malformed token supplied")]
42 Malformed,
43 #[error("invalid issuer")]
44 InvalidIssuer,
45 #[error("{0:#}")]
46 Base64(#[from] base64::DecodeError),
47 #[error("{0:#}")]
48 Json(#[from] serde_json::Error),
49}
50
51pub struct UnAuthenticatedConnection(RawConnection);
53
54impl UnAuthenticatedConnection {
55 pub async fn from_sender_receiver<
60 Sender: Sink<BytesMut, Error = NetworkError> + Send + 'static,
61 Receiver: Stream<Item = Result<BytesMut>> + Send + 'static,
62 >(
63 sender: Sender,
64 receiver: Receiver,
65 ) -> Result<Self, ConnectionError> {
66 Ok(UnAuthenticatedConnection(
67 RawConnection::from_sender_receiver(sender, receiver).await?,
68 ))
69 }
70
71 pub async fn connect(server_list: &ServerList) -> Result<Self, ConnectionError> {
73 Ok(UnAuthenticatedConnection(
74 RawConnection::connect(server_list).await?,
75 ))
76 }
77
78 pub async fn anonymous(self) -> Result<Connection, ConnectionError> {
80 let mut raw = self.0;
81 raw.session = anonymous(&raw, AccountType::AnonUser).await?;
82 raw.setup_heartbeat();
83 let connection = Connection::new(raw);
84
85 Ok(connection)
86 }
87
88 pub async fn anonymous_server(self) -> Result<Connection, ConnectionError> {
90 let mut raw = self.0;
91 raw.session = anonymous(&raw, AccountType::AnonGameServer).await?;
92 raw.setup_heartbeat();
93 let connection = Connection::new(raw);
94
95 Ok(connection)
96 }
97
98 pub async fn login<H: AuthConfirmationHandler, G: GuardDataStore>(
100 self,
101 account: &str,
102 password: &str,
103 mut guard_data_store: G,
104 confirmation_handler: H,
105 ) -> Result<Connection, ConnectionError> {
106 let mut raw = self.0;
107 let guard_data = guard_data_store.load(account).await.unwrap_or_else(|e| {
108 error!(error = ?e, "failed to retrieve guard data");
109 None
110 });
111 if guard_data.is_some() {
112 debug!(account, "found stored guard data");
113 }
114 let begin = begin_password_auth(&mut raw, account, password, guard_data.as_deref()).await?;
115 let steam_id = SteamID::from_steam64(begin.steam_id()).map_err(LoginError::from)?;
116
117 let allowed_confirmations = begin.allowed_confirmations();
118
119 let tokens = match select(
120 pin!(confirmation_handler.handle_confirmation(&allowed_confirmations)),
121 pin!(begin.poll().wait_for_tokens(&raw)),
122 )
123 .await
124 {
125 Either::Left((confirmation_action, tokens_fut)) => {
126 if let Some(confirmation_action) = confirmation_action {
127 begin.submit_confirmation(&raw, confirmation_action).await?;
128 tokens_fut.await?
129 } else if begin.action_required() {
130 return Err(ConnectionError::UnsupportedConfirmationAction(
131 allowed_confirmations.clone(),
132 ));
133 } else {
134 tokens_fut.await?
135 }
136 }
137 Either::Right((tokens, _)) => tokens?,
138 };
139
140 if let Some(guard_data) = tokens.new_guard_data {
141 if let Err(e) = guard_data_store.store(account, guard_data).await {
142 error!(error = ?e, "failed to store guard data");
143 }
144 }
145
146 raw.session = login(
147 &mut raw,
148 account,
149 steam_id,
150 tokens.refresh_token.as_ref(),
152 )
153 .await?;
154 raw.setup_heartbeat();
155 let connection = Connection::new(raw);
156
157 Ok(connection)
158 }
159
160 pub async fn access(
162 self,
163 account: &str,
164 access_token: &str,
165 ) -> Result<Connection, ConnectionError> {
166 let mut raw = self.0;
167
168 let access_token_payload = access_token
169 .split('.')
170 .nth(1)
171 .ok_or_else(|| ConnectionError::AccessToken(AccessTokenError::Malformed))
172 .and_then(|base64| {
173 BASE64_URL_SAFE_NO_PAD
174 .decode(base64)
175 .map_err(|error| ConnectionError::AccessToken(AccessTokenError::Base64(error)))
176 })
177 .and_then(|json| {
178 serde_json::from_slice::<AccessToken>(&json)
179 .map_err(|error| ConnectionError::AccessToken(AccessTokenError::Json(error)))
180 })?;
181
182 if access_token_payload.iss != "steam" {
183 return Err(ConnectionError::AccessToken(
184 AccessTokenError::InvalidIssuer,
185 ));
186 }
187
188 raw.session = login(
191 &mut raw,
192 account,
193 SteamID::from_steam64(
194 access_token_payload
195 .sub
196 .parse()
197 .map_err(|_| ConnectionError::LoginError(LoginError::InvalidSteamId))?,
198 )
199 .map_err(|_| ConnectionError::LoginError(LoginError::InvalidSteamId))?,
200 access_token,
201 )
202 .await?;
203 raw.setup_heartbeat();
204 Ok(Connection::new(raw))
205 }
206}
207
208impl ReadonlyConnection for UnAuthenticatedConnection {
210 fn on_notification<T: ServiceMethodRequest>(&self) -> impl Stream<Item = Result<T>> + 'static {
211 BroadcastStream::new(self.0.filter.on_notification(T::REQ_NAME))
212 .filter_map(|res| res.ok())
213 .map(|raw| raw.into_notification())
214 }
215
216 fn one_with_header<T: NetMessage + 'static>(
217 &self,
218 ) -> impl Future<Output = Result<(NetMessageHeader, T)>> + 'static {
219 let fut = self.0.filter.one_kind(T::KIND);
222 async move {
223 let raw = fut.await.map_err(|_| NetworkError::EOF)?;
224 raw.into_header_and_message()
225 }
226 }
227
228 fn one<T: NetMessage + 'static>(&self) -> impl Future<Output = Result<T>> + 'static {
229 self.one_with_header::<T>()
230 .map(|res| res.map(|(_, msg)| msg))
231 }
232
233 fn on_with_header<T: NetMessage + 'static>(
234 &self,
235 ) -> impl Stream<Item = Result<(NetMessageHeader, T)>> + 'static {
236 BroadcastStream::new(self.0.filter.on_kind(T::KIND)).map(|raw| {
237 let raw = raw.map_err(|_| NetworkError::EOF)?;
238 raw.into_header_and_message()
239 })
240 }
241
242 fn on<T: NetMessage + 'static>(&self) -> impl Stream<Item = Result<T>> + 'static {
243 self.on_with_header::<T>()
244 .map(|res| res.map(|(_, msg)| msg))
245 }
246}
247
248pub(crate) async fn service_method_un_authenticated<Msg: ServiceMethodRequest>(
249 connection: &RawConnection,
250 msg: Msg,
251) -> Result<Msg::Response> {
252 let header = connection.session.header(true);
253 let recv = connection.filter.on_job_id(header.source_job_id);
254 let msg = RawNetMessage::from_message_with_kind(
255 header,
256 ServiceMethodMessage(msg),
257 EMsg::k_EMsgServiceMethodCallFromClientNonAuthed,
258 true,
259 )?;
260 connection.sender.send_raw(msg).await?;
261 let message = timeout(connection.timeout, recv)
262 .await
263 .map_err(|_| NetworkError::Timeout)?
264 .map_err(|_| NetworkError::Timeout)?
265 .into_message::<ServiceMethodResponseMessage>()?;
266 message.into_response::<Msg>()
267}