steam_vent/connection/
unauthenticated.rs

1use super::raw::RawConnection;
2use super::{ReadonlyConnection, Result};
3use crate::auth::{AuthConfirmationHandler, GuardDataStore, begin_password_auth};
4use crate::message::{ServiceMethodMessage, ServiceMethodResponseMessage};
5use crate::net::{NetMessageHeader, RawNetMessage};
6use crate::service_method::ServiceMethodRequest;
7use crate::session::{anonymous, login};
8use crate::{Connection, ConnectionError, LoginError, NetMessage, NetworkError, ServerList};
9use base64::Engine;
10use base64::prelude::BASE64_URL_SAFE_NO_PAD;
11use bytes::BytesMut;
12use futures_util::Stream;
13use futures_util::future::{Either, select};
14use futures_util::{FutureExt, Sink};
15use serde::Deserialize;
16use std::future::Future;
17use std::pin::pin;
18use steam_vent_proto::enums_clientserver::EMsg;
19use steamid_ng::{AccountType, SteamID};
20use thiserror::Error;
21use tokio::time::timeout;
22use tokio_stream::StreamExt;
23use tokio_stream::wrappers::BroadcastStream;
24use tracing::{debug, error};
25
26/// JWT access token payload descriptor.
27#[derive(Deserialize)]
28#[non_exhaustive]
29pub struct AccessToken {
30    pub iss: String,
31    pub sub: String,
32    #[allow(dead_code)]
33    pub exp: u64,
34    // ..extra unused fields
35}
36
37#[derive(Debug, Error)]
38pub enum AccessTokenError {
39    #[error("expired")]
40    Expired,
41    #[error("malformed token supplied")]
42    Malformed,
43    #[error("invalid issuer")]
44    InvalidIssuer,
45    #[error("{0:#}")]
46    Base64(#[from] base64::DecodeError),
47    #[error("{0:#}")]
48    Json(#[from] serde_json::Error),
49}
50
51/// A Connection that hasn't been authentication yet
52pub struct UnAuthenticatedConnection(RawConnection);
53
54impl UnAuthenticatedConnection {
55    /// Create a connection from a sender, receiver pair.
56    ///
57    /// This allows customizing the transport used by the connection. For example to customize the
58    /// TLS configuration, use an existing websocket client or use a proxy.
59    pub async fn from_sender_receiver<
60        Sender: Sink<BytesMut, Error = NetworkError> + Send + 'static,
61        Receiver: Stream<Item = Result<BytesMut>> + Send + 'static,
62    >(
63        sender: Sender,
64        receiver: Receiver,
65    ) -> Result<Self, ConnectionError> {
66        Ok(UnAuthenticatedConnection(
67            RawConnection::from_sender_receiver(sender, receiver).await?,
68        ))
69    }
70
71    /// Connect to a server from the server list using the default websocket transport
72    pub async fn connect(server_list: &ServerList) -> Result<Self, ConnectionError> {
73        Ok(UnAuthenticatedConnection(
74            RawConnection::connect(server_list).await?,
75        ))
76    }
77
78    /// Start an anonymous client session with this connection
79    pub async fn anonymous(self) -> Result<Connection, ConnectionError> {
80        let mut raw = self.0;
81        raw.session = anonymous(&raw, AccountType::AnonUser).await?;
82        raw.setup_heartbeat();
83        let connection = Connection::new(raw);
84
85        Ok(connection)
86    }
87
88    /// Start an anonymous server session with this connection
89    pub async fn anonymous_server(self) -> Result<Connection, ConnectionError> {
90        let mut raw = self.0;
91        raw.session = anonymous(&raw, AccountType::AnonGameServer).await?;
92        raw.setup_heartbeat();
93        let connection = Connection::new(raw);
94
95        Ok(connection)
96    }
97
98    /// Start a client session with this connection
99    pub async fn login<H: AuthConfirmationHandler, G: GuardDataStore>(
100        self,
101        account: &str,
102        password: &str,
103        mut guard_data_store: G,
104        confirmation_handler: H,
105    ) -> Result<Connection, ConnectionError> {
106        let mut raw = self.0;
107        let guard_data = guard_data_store.load(account).await.unwrap_or_else(|e| {
108            error!(error = ?e, "failed to retrieve guard data");
109            None
110        });
111        if guard_data.is_some() {
112            debug!(account, "found stored guard data");
113        }
114        let begin = begin_password_auth(&mut raw, account, password, guard_data.as_deref()).await?;
115        let steam_id = SteamID::from_steam64(begin.steam_id()).map_err(LoginError::from)?;
116
117        let allowed_confirmations = begin.allowed_confirmations();
118
119        let tokens = match select(
120            pin!(confirmation_handler.handle_confirmation(&allowed_confirmations)),
121            pin!(begin.poll().wait_for_tokens(&raw)),
122        )
123        .await
124        {
125            Either::Left((confirmation_action, tokens_fut)) => {
126                if let Some(confirmation_action) = confirmation_action {
127                    begin.submit_confirmation(&raw, confirmation_action).await?;
128                    tokens_fut.await?
129                } else if begin.action_required() {
130                    return Err(ConnectionError::UnsupportedConfirmationAction(
131                        allowed_confirmations.clone(),
132                    ));
133                } else {
134                    tokens_fut.await?
135                }
136            }
137            Either::Right((tokens, _)) => tokens?,
138        };
139
140        if let Some(guard_data) = tokens.new_guard_data {
141            if let Err(e) = guard_data_store.store(account, guard_data).await {
142                error!(error = ?e, "failed to store guard data");
143            }
144        }
145
146        raw.session = login(
147            &mut raw,
148            account,
149            steam_id,
150            // yes we send the refresh token as access token, yes it makes no sense, yes this is actually required
151            tokens.refresh_token.as_ref(),
152        )
153        .await?;
154        raw.setup_heartbeat();
155        let connection = Connection::new(raw);
156
157        Ok(connection)
158    }
159
160    /// Start a client session with this connection using access token.
161    pub async fn access(
162        self,
163        account: &str,
164        access_token: &str,
165    ) -> Result<Connection, ConnectionError> {
166        let mut raw = self.0;
167
168        let access_token_payload = access_token
169            .split('.')
170            .nth(1)
171            .ok_or_else(|| ConnectionError::AccessToken(AccessTokenError::Malformed))
172            .and_then(|base64| {
173                BASE64_URL_SAFE_NO_PAD
174                    .decode(base64)
175                    .map_err(|error| ConnectionError::AccessToken(AccessTokenError::Base64(error)))
176            })
177            .and_then(|json| {
178                serde_json::from_slice::<AccessToken>(&json)
179                    .map_err(|error| ConnectionError::AccessToken(AccessTokenError::Json(error)))
180            })?;
181
182        if access_token_payload.iss != "steam" {
183            return Err(ConnectionError::AccessToken(
184                AccessTokenError::InvalidIssuer,
185            ));
186        }
187
188        // TODO: Consider adding `AccessToken.exp` check (expiration UNIX timestamp in seconds).
189
190        raw.session = login(
191            &mut raw,
192            account,
193            SteamID::from_steam64(
194                access_token_payload
195                    .sub
196                    .parse()
197                    .map_err(|_| ConnectionError::LoginError(LoginError::InvalidSteamId))?,
198            )
199            .map_err(|_| ConnectionError::LoginError(LoginError::InvalidSteamId))?,
200            access_token,
201        )
202        .await?;
203        raw.setup_heartbeat();
204        Ok(Connection::new(raw))
205    }
206}
207
208/// Listen for messages before starting authentication
209impl ReadonlyConnection for UnAuthenticatedConnection {
210    fn on_notification<T: ServiceMethodRequest>(&self) -> impl Stream<Item = Result<T>> + 'static {
211        BroadcastStream::new(self.0.filter.on_notification(T::REQ_NAME))
212            .filter_map(|res| res.ok())
213            .map(|raw| raw.into_notification())
214    }
215
216    fn one_with_header<T: NetMessage + 'static>(
217        &self,
218    ) -> impl Future<Output = Result<(NetMessageHeader, T)>> + 'static {
219        // async block instead of async fn, so we don't have to tie the lifetime of the returned future
220        // to the lifetime of &self
221        let fut = self.0.filter.one_kind(T::KIND);
222        async move {
223            let raw = fut.await.map_err(|_| NetworkError::EOF)?;
224            raw.into_header_and_message()
225        }
226    }
227
228    fn one<T: NetMessage + 'static>(&self) -> impl Future<Output = Result<T>> + 'static {
229        self.one_with_header::<T>()
230            .map(|res| res.map(|(_, msg)| msg))
231    }
232
233    fn on_with_header<T: NetMessage + 'static>(
234        &self,
235    ) -> impl Stream<Item = Result<(NetMessageHeader, T)>> + 'static {
236        BroadcastStream::new(self.0.filter.on_kind(T::KIND)).map(|raw| {
237            let raw = raw.map_err(|_| NetworkError::EOF)?;
238            raw.into_header_and_message()
239        })
240    }
241
242    fn on<T: NetMessage + 'static>(&self) -> impl Stream<Item = Result<T>> + 'static {
243        self.on_with_header::<T>()
244            .map(|res| res.map(|(_, msg)| msg))
245    }
246}
247
248pub(crate) async fn service_method_un_authenticated<Msg: ServiceMethodRequest>(
249    connection: &RawConnection,
250    msg: Msg,
251) -> Result<Msg::Response> {
252    let header = connection.session.header(true);
253    let recv = connection.filter.on_job_id(header.source_job_id);
254    let msg = RawNetMessage::from_message_with_kind(
255        header,
256        ServiceMethodMessage(msg),
257        EMsg::k_EMsgServiceMethodCallFromClientNonAuthed,
258        true,
259    )?;
260    connection.sender.send_raw(msg).await?;
261    let message = timeout(connection.timeout, recv)
262        .await
263        .map_err(|_| NetworkError::Timeout)?
264        .map_err(|_| NetworkError::Timeout)?
265        .into_message::<ServiceMethodResponseMessage>()?;
266    message.into_response::<Msg>()
267}