1use std::collections::HashMap;
2use std::sync::Arc;
3
4use tokio::{
5 sync::{Mutex, mpsc},
6 time::{Duration, sleep, timeout},
7};
8
9use tokio::sync::mpsc::UnboundedReceiver;
10use tokio::sync::mpsc::UnboundedSender;
11
12use crate::{
13 achievements,
14 auth::{credentials, qr},
15 chat,
16 connection::Connection,
17 emsg::EMsg,
18 error::{Error, Result},
19 friends::{self, PersonaState},
20 library, pics,
21 protobuf::{
22 CAuthenticationDeviceDetails, CMsgClientHello, CMsgClientLicenseList, CMsgClientLogon,
23 CMsgClientLogonResponse, CMsgProtoBufHeader, EAuthTokenPlatformType,
24 },
25 serverlist::ServerListCache,
26 token::steamid_from_refresh_token,
27};
28
29const PLAYTIME_TIMEOUT: Duration = Duration::from_secs(10);
32const ACHIEVEMENTS_TIMEOUT: Duration = Duration::from_secs(10);
34const LICENSE_LIST_TIMEOUT: Duration = Duration::from_secs(30);
37const CHAT_SEND_TIMEOUT: Duration = Duration::from_secs(10);
39const CHAT_HISTORY_TIMEOUT: Duration = Duration::from_secs(10);
41
42#[derive(Debug)]
43pub enum RunCommand {
44 SetPersonaState(PersonaState),
45 RequestFriendData(Vec<u64>),
46 GetLibrary,
47 GetPlayerAchievements(u32),
48 SendMessage { steamid: u64, message: String },
49 SendTyping { steamid: u64 },
50 GetRecentMessages { steamid: u64 },
51}
52
53const PROTOCOL_VERSION: u32 = 65580;
54const CLIENT_LANGUAGE: &str = "english";
55const CLIENT_OS_TYPE: u32 = 20;
56const DEFAULT_DEVICE_NAME: &str = "Vapour";
57const DEFAULT_WEBSITE_ID: &str = "Unknown";
58const DEFAULT_GAMING_DEVICE_TYPE: u32 = 1;
59
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub enum AuthMethod {
62 Qr,
63 Credentials { account: String, password: String },
64 RefreshToken(String),
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum GuardKind {
69 EmailCode,
70 DeviceCode,
71 DeviceConfirmation,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct LoggedOn {
76 pub steamid: u64,
77 pub account_name: String,
78 pub refresh_token: String,
79}
80
81#[derive(Debug)]
82pub enum AuthEvent {
83 QrChallenge(String),
84 GuardRequired(GuardKind),
85 Success(LoggedOn),
86 Failure(Error),
87}
88
89#[derive(Debug)]
90enum AuthCommand {
91 GuardCode(String),
92}
93
94#[derive(Debug, Default)]
95pub struct SteamClient {
96 servers: ServerListCache,
97 connection: Option<Arc<Mutex<Connection>>>,
98 auth_commands: Option<mpsc::UnboundedSender<AuthCommand>>,
99 account_name_hint: Option<String>,
100}
101
102impl SteamClient {
103 pub fn new() -> Self {
104 Self::default()
105 }
106
107 pub fn set_account_name_hint(&mut self, account_name: impl Into<String>) {
108 self.account_name_hint = Some(account_name.into());
109 }
110
111 pub async fn connect(&mut self) -> Result<()> {
112 if let Some(connection) = &self.connection {
113 if !connection.lock().await.is_closed().await {
114 return Ok(());
115 }
116 self.connection = None;
117 }
118
119 let mut last_error = None;
120 for force_refresh in [false, true] {
121 let servers = self.servers.list(force_refresh).await?;
122 for server in servers {
123 match Connection::connect(&server.websocket_url()).await {
124 Ok(connection) => {
125 connection
126 .send_message(
127 EMsg::ClientHello,
128 &CMsgProtoBufHeader::default(),
129 &CMsgClientHello {
130 protocol_version: Some(PROTOCOL_VERSION),
131 },
132 )
133 .await?;
134 self.connection = Some(Arc::new(Mutex::new(connection)));
135 return Ok(());
136 }
137 Err(error) => last_error = Some(error),
138 }
139 }
140 }
141
142 Err(last_error.unwrap_or_else(|| Error::Transport("no CM endpoints available".to_owned())))
143 }
144
145 pub async fn begin_auth(
146 &mut self,
147 method: AuthMethod,
148 ) -> Result<mpsc::UnboundedReceiver<AuthEvent>> {
149 self.connect().await?;
150
151 let connection = self
152 .connection
153 .as_ref()
154 .cloned()
155 .ok_or_else(|| Error::Transport("connection missing after connect".to_owned()))?;
156 let account_name_hint = self.account_name_hint.clone();
157 let (event_tx, event_rx) = mpsc::unbounded_channel();
158 let (command_tx, command_rx) = mpsc::unbounded_channel();
159 self.auth_commands = Some(command_tx);
160
161 tokio::spawn(async move {
162 let result = match method {
163 AuthMethod::Qr => run_qr_auth(connection, event_tx.clone()).await,
164 AuthMethod::Credentials { account, password } => {
165 run_credentials_auth(
166 connection,
167 event_tx.clone(),
168 command_rx,
169 account,
170 password,
171 )
172 .await
173 }
174 AuthMethod::RefreshToken(refresh_token) => {
175 run_refresh_token_auth(connection, refresh_token, account_name_hint).await
176 }
177 };
178
179 match result {
180 Ok(logged_on) => {
181 let _ = event_tx.send(AuthEvent::Success(logged_on));
182 }
183 Err(error) => {
184 let _ = event_tx.send(AuthEvent::Failure(error));
185 }
186 }
187 });
188
189 Ok(event_rx)
190 }
191
192 pub fn submit_guard_code(&self, code: impl Into<String>) -> Result<()> {
193 let sender = self
194 .auth_commands
195 .as_ref()
196 .ok_or_else(|| Error::Authentication("no guard flow is active".to_owned()))?;
197 sender
198 .send(AuthCommand::GuardCode(code.into()))
199 .map_err(|_| Error::Authentication("guard flow is no longer active".to_owned()))
200 }
201
202 pub async fn run(
203 &mut self,
204 mut commands: UnboundedReceiver<RunCommand>,
205 events: UnboundedSender<crate::friends::FriendsEvent>,
206 ) -> Result<()> {
207 let connection = self
208 .connection
209 .as_ref()
210 .cloned()
211 .ok_or_else(|| Error::Transport("no active connection".to_owned()))?;
212
213 let mut incoming = connection.lock().await.take_incoming();
216
217 {
219 let conn = connection.lock().await;
220 let state = conn.state_snapshot().await;
221 let (header, body) = friends::build_change_status(&state, PersonaState::Online);
222 conn.send_message(EMsg::ClientChangeStatus, &header, &body)
223 .await?;
224 }
225
226 loop {
227 tokio::select! {
228 packet = incoming.recv() => {
229 match packet {
230 Some(Ok(pkt)) => {
231 if pkt.emsg == EMsg::ClientLicenseList.raw()
232 && let Some(package_ids) = decode_license_list_packages(&pkt)
233 {
234 let conn = connection.lock().await;
235 conn.set_package_ids(package_ids).await;
236 }
237
238 if let Some(event) = friends::decode(&pkt) {
239 if let friends::FriendsEvent::FriendsList(ref friend_list) = event {
241 let ids: Vec<u64> = friend_list.iter().map(|f| f.steamid).collect();
242 if !ids.is_empty() {
243 let conn = connection.lock().await;
244 let state = conn.state_snapshot().await;
245 let (header, body) = friends::build_request_friend_data(&state, ids);
246 let _ = conn.send_message(EMsg::ClientRequestFriendData, &header, &body).await;
247 }
248 }
249 let _ = events.send(event);
250 } else if let Some(event) = chat::decode_incoming(&pkt) {
251 let _ = events.send(event);
253 }
254 }
255 Some(Err(e)) => return Err(e),
256 None => return Ok(()),
257 }
258 }
259 cmd = commands.recv() => {
260 match cmd {
261 Some(RunCommand::SetPersonaState(state)) => {
262 let conn = connection.lock().await;
263 let conn_state = conn.state_snapshot().await;
264 let (header, body) = friends::build_change_status(&conn_state, state);
265 conn.send_message(EMsg::ClientChangeStatus, &header, &body).await?;
266 }
267 Some(RunCommand::RequestFriendData(ids)) => {
268 let conn = connection.lock().await;
269 let conn_state = conn.state_snapshot().await;
270 let (header, body) = friends::build_request_friend_data(&conn_state, ids);
271 conn.send_message(EMsg::ClientRequestFriendData, &header, &body).await?;
272 }
273 Some(RunCommand::GetLibrary) => {
274 let connection_clone = Arc::clone(&connection);
275 let events_clone = events.clone();
276 tokio::spawn(async move {
277 match load_library(connection_clone, events_clone.clone()).await {
278 Ok(games) => {
279 let _ = events_clone
280 .send(friends::FriendsEvent::OwnedGames(games));
281 }
282 Err(error) => {
283 tracing::warn!("GetLibrary failed: {error}");
284 let _ = events_clone
285 .send(friends::FriendsEvent::OwnedGames(vec![]));
286 }
287 }
288 });
289 }
290 Some(RunCommand::GetPlayerAchievements(appid)) => {
291 let conn = connection.lock().await;
292 let state = conn.state_snapshot().await;
293 let achievements = match timeout(
297 ACHIEVEMENTS_TIMEOUT,
298 achievements::get_player_achievements(&conn, &state, appid),
299 )
300 .await
301 {
302 Ok(Ok(achievements)) => achievements,
303 Ok(Err(e)) => {
304 tracing::warn!("GetPlayerAchievements({appid}) failed: {e}");
305 vec![]
306 }
307 Err(_) => {
308 tracing::warn!("GetPlayerAchievements({appid}) timed out");
309 vec![]
310 }
311 };
312 let _ = events.send(friends::FriendsEvent::PlayerAchievements {
313 appid,
314 achievements,
315 });
316 }
317 Some(RunCommand::SendMessage { steamid, message }) => {
318 let conn = connection.lock().await;
321 let state = conn.state_snapshot().await;
322 match timeout(
323 CHAT_SEND_TIMEOUT,
324 chat::send_message(&conn, &state, steamid, message),
325 )
326 .await
327 {
328 Ok(Ok(sent)) => {
329 let _ = events.send(friends::FriendsEvent::MessageSent(sent));
330 }
331 Ok(Err(e)) => tracing::warn!("SendMessage({steamid}) failed: {e}"),
332 Err(_) => tracing::warn!("SendMessage({steamid}) timed out"),
333 }
334 }
335 Some(RunCommand::SendTyping { steamid }) => {
336 let connection_clone = Arc::clone(&connection);
340 tokio::spawn(async move {
341 let conn = connection_clone.lock().await;
342 let state = conn.state_snapshot().await;
343 match timeout(
344 CHAT_SEND_TIMEOUT,
345 chat::send_typing(&conn, &state, steamid),
346 )
347 .await
348 {
349 Ok(Ok(())) => {}
350 Ok(Err(e)) => tracing::debug!("SendTyping({steamid}) failed: {e}"),
351 Err(_) => tracing::debug!("SendTyping({steamid}) timed out"),
352 }
353 });
354 }
355 Some(RunCommand::GetRecentMessages { steamid }) => {
356 let connection_clone = Arc::clone(&connection);
359 let events_clone = events.clone();
360 tokio::spawn(async move {
361 let messages = {
362 let conn = connection_clone.lock().await;
363 let state = conn.state_snapshot().await;
364 match timeout(
365 CHAT_HISTORY_TIMEOUT,
366 chat::get_recent_messages(&conn, &state, steamid),
367 )
368 .await
369 {
370 Ok(Ok(messages)) => messages,
371 Ok(Err(e)) => {
372 tracing::warn!("GetRecentMessages({steamid}) failed: {e}");
373 vec![]
374 }
375 Err(_) => {
376 tracing::warn!("GetRecentMessages({steamid}) timed out");
377 vec![]
378 }
379 }
380 };
381 let _ = events_clone.send(friends::FriendsEvent::RecentMessages {
382 steamid,
383 messages,
384 });
385 });
386 }
387 None => return Ok(()),
388 }
389 }
390 }
391 }
392 }
393}
394
395fn decode_license_list_packages(packet: &crate::message::Packet) -> Option<Vec<u32>> {
396 let msg = match packet.decode_body::<CMsgClientLicenseList>() {
397 Ok(msg) => msg,
398 Err(error) => {
399 tracing::warn!("ClientLicenseList decode failed: {error}");
400 return None;
401 }
402 };
403
404 let package_ids: Vec<u32> = msg
405 .licenses
406 .iter()
407 .filter_map(|license| license.package_id)
408 .filter(|package_id| *package_id != 0)
409 .collect();
410
411 tracing::info!(
412 licenses = msg.licenses.len(),
413 package_ids = package_ids.len(),
414 "ClientLicenseList received"
415 );
416
417 Some(package_ids)
418}
419
420async fn load_library(
421 connection: Arc<Mutex<Connection>>,
422 events: UnboundedSender<crate::friends::FriendsEvent>,
423) -> Result<Vec<friends::ProtocolGame>> {
424 let package_ids = wait_for_package_ids(&connection).await?;
425
426 let playtimes = {
427 let conn = connection.lock().await;
428 let state = conn.state_snapshot().await;
429 match timeout(
432 PLAYTIME_TIMEOUT,
433 library::get_last_played_times(&conn, &state),
434 )
435 .await
436 {
437 Ok(Ok(playtimes)) => {
438 tracing::info!(
439 games = playtimes.len(),
440 "ClientGetLastPlayedTimes returned playtime data"
441 );
442 playtimes
443 }
444 Ok(Err(error)) => {
445 tracing::warn!("ClientGetLastPlayedTimes failed: {error}");
446 HashMap::new()
447 }
448 Err(_) => {
449 tracing::warn!(
450 "ClientGetLastPlayedTimes timed out; loading library without playtime"
451 );
452 HashMap::new()
453 }
454 }
455 };
456 let recently_played = library::recently_played_games(&playtimes);
457 let _ = events.send(friends::FriendsEvent::RecentlyPlayedGames(
458 recently_played.clone(),
459 ));
460
461 let catalog = {
462 let conn = connection.lock().await;
463 let state = conn.state_snapshot().await;
464 pics::load_owned_app_catalog(&conn, &state, package_ids).await?
465 };
466
467 let games = library::merge_catalog_and_playtimes(catalog, &playtimes);
468 tracing::info!(
469 games = games.len(),
470 recently_played = recently_played.len(),
471 "CM library pipeline completed"
472 );
473
474 Ok(games)
475}
476
477async fn wait_for_package_ids(connection: &Arc<Mutex<Connection>>) -> Result<Vec<u32>> {
478 let notify = {
479 let conn = connection.lock().await;
480 conn.license_notify()
481 };
482
483 let wait = async {
484 loop {
485 let notified = notify.notified();
489 tokio::pin!(notified);
490 notified.as_mut().enable();
491
492 {
493 let conn = connection.lock().await;
494 let state = conn.state_snapshot().await;
495 if let Some(reason) = state.close_reason {
496 return Err(Error::Transport(reason));
497 }
498 if state.license_list_received {
499 return Ok(state.package_ids);
500 }
501 }
502
503 notified.await;
504 }
505 };
506
507 match timeout(LICENSE_LIST_TIMEOUT, wait).await {
508 Ok(result) => result,
509 Err(_) => Err(Error::Transport(
510 "timed out waiting for ClientLicenseList".to_owned(),
511 )),
512 }
513}
514
515async fn run_qr_auth(
516 connection: Arc<Mutex<Connection>>,
517 event_tx: mpsc::UnboundedSender<AuthEvent>,
518) -> Result<LoggedOn> {
519 let mut challenge = {
520 let connection = connection.lock().await;
521 qr::begin(
522 &connection,
523 DEFAULT_DEVICE_NAME,
524 EAuthTokenPlatformType::KEAuthTokenPlatformTypeSteamClient as i32,
525 build_device_details(),
526 DEFAULT_WEBSITE_ID,
527 )
528 .await?
529 };
530
531 let _ = event_tx.send(AuthEvent::QrChallenge(challenge.challenge_url.clone()));
532
533 loop {
534 sleep(challenge.interval).await;
535 let poll_result = {
536 let connection = connection.lock().await;
537 qr::poll(&connection, &mut challenge).await?
538 };
539
540 match poll_result {
541 qr::PollState::Pending { challenge_changed } => {
542 if challenge_changed {
543 let _ = event_tx.send(AuthEvent::QrChallenge(challenge.challenge_url.clone()));
544 }
545 }
546 qr::PollState::Complete(completed) => {
547 let mut connection = connection.lock().await;
548 return log_on_with_token(
549 &mut connection,
550 &completed.refresh_token,
551 Some(completed.account_name),
552 )
553 .await;
554 }
555 }
556 }
557}
558
559async fn run_credentials_auth(
560 connection: Arc<Mutex<Connection>>,
561 event_tx: mpsc::UnboundedSender<AuthEvent>,
562 mut command_rx: mpsc::UnboundedReceiver<AuthCommand>,
563 account: String,
564 password: String,
565) -> Result<LoggedOn> {
566 let session = {
567 let connection = connection.lock().await;
568 credentials::begin(
569 &connection,
570 &account,
571 &password,
572 DEFAULT_DEVICE_NAME,
573 build_device_details(),
574 DEFAULT_WEBSITE_ID,
575 )
576 .await?
577 };
578
579 if let Some(kind) = session.preferred_guard_kind() {
580 let _ = event_tx.send(AuthEvent::GuardRequired(kind.clone()));
581 match kind {
582 GuardKind::EmailCode | GuardKind::DeviceCode => {
583 let AuthCommand::GuardCode(code) = command_rx
584 .recv()
585 .await
586 .ok_or_else(|| Error::Authentication("guard flow cancelled".to_owned()))?;
587 let connection = connection.lock().await;
588 credentials::submit_guard_code(&connection, &session, &code, kind).await?;
589 }
590 GuardKind::DeviceConfirmation => {}
591 }
592 }
593
594 loop {
595 sleep(session.interval).await;
596 let poll_result = {
597 let connection = connection.lock().await;
598 credentials::poll(&connection, &session).await?
599 };
600
601 if let Some(completed) = poll_result {
602 let mut connection = connection.lock().await;
603 return log_on_with_token(
604 &mut connection,
605 &completed.refresh_token,
606 Some(completed.account_name),
607 )
608 .await;
609 }
610 }
611}
612
613async fn run_refresh_token_auth(
614 connection: Arc<Mutex<Connection>>,
615 refresh_token: String,
616 account_name_hint: Option<String>,
617) -> Result<LoggedOn> {
618 let mut connection = connection.lock().await;
619 log_on_with_token(&mut connection, &refresh_token, account_name_hint).await
620}
621
622async fn log_on_with_token(
623 connection: &mut Connection,
624 refresh_token: &str,
625 account_name: Option<String>,
626) -> Result<LoggedOn> {
627 let steamid = steamid_from_refresh_token(refresh_token).ok_or_else(|| {
628 Error::Authentication("refresh token did not contain a valid steamid".to_owned())
629 })?;
630 let account_name = account_name.unwrap_or_default();
631
632 let header = CMsgProtoBufHeader {
633 steamid: Some(steamid),
634 ..Default::default()
635 };
636 let body = CMsgClientLogon {
637 protocol_version: Some(PROTOCOL_VERSION),
638 client_language: Some(CLIENT_LANGUAGE.to_owned()),
639 client_os_type: Some(CLIENT_OS_TYPE),
640 client_supplied_steam_id: Some(steamid),
641 machine_id: Some(machine_id()),
642 account_name: if account_name.is_empty() {
643 None
644 } else {
645 Some(account_name.clone())
646 },
647 should_remember_password: Some(true),
648 supports_rate_limit_response: Some(true),
649 access_token: Some(refresh_token.to_owned()),
650 gaming_device_type: Some(DEFAULT_GAMING_DEVICE_TYPE),
651 chat_mode: Some(2),
656 ..Default::default()
657 };
658
659 connection
660 .send_message(EMsg::ClientLogon, &header, &body)
661 .await?;
662
663 loop {
664 let packet = connection.next_event().await.ok_or(Error::Closed)??;
665 if packet.emsg != EMsg::ClientLogOnResponse.raw() {
666 continue;
667 }
668
669 let response = packet.decode_body::<CMsgClientLogonResponse>()?;
670 if response.eresult.unwrap_or_default() != 1 {
671 return Err(Error::Authentication(format!(
672 "ClientLogOn failed with eresult {}",
673 response.eresult.unwrap_or_default()
674 )));
675 }
676
677 let client_session_id = packet.header.client_sessionid.ok_or(Error::MissingField(
678 "ClientLogOnResponse proto header client_sessionid",
679 ))?;
680 let heartbeat_seconds = response
681 .heartbeat_seconds
682 .or(response.legacy_out_of_game_heartbeat_seconds)
683 .ok_or(Error::MissingField(
684 "CMsgClientLogonResponse.heartbeat_seconds",
685 ))?;
686
687 connection
688 .set_logged_on(steamid, client_session_id, heartbeat_seconds)
689 .await?;
690
691 return Ok(LoggedOn {
692 steamid,
693 account_name,
694 refresh_token: refresh_token.to_owned(),
695 });
696 }
697}
698
699fn build_device_details() -> CAuthenticationDeviceDetails {
700 CAuthenticationDeviceDetails {
701 device_friendly_name: Some(DEFAULT_DEVICE_NAME.to_owned()),
702 platform_type: Some(EAuthTokenPlatformType::KEAuthTokenPlatformTypeSteamClient as i32),
703 os_type: Some(CLIENT_OS_TYPE as i32),
704 gaming_device_type: Some(DEFAULT_GAMING_DEVICE_TYPE),
705 client_count: Some(1),
706 machine_id: Some(machine_id()),
707 app_type: None,
708 }
709}
710
711fn machine_id() -> Vec<u8> {
712 b"vapour".to_vec()
713}