steam_vent/
game_coordinator.rs

1use crate::connection::{ConnectionImpl, ConnectionTrait, MessageFilter, MessageSender};
2use crate::message::EncodableMessage;
3use crate::net::{decode_kind, NetMessageHeader, RawNetMessage};
4use crate::session::Session;
5use crate::{Connection, NetworkError};
6use futures_util::future::select;
7use protobuf::Message;
8use std::fmt::{Debug, Formatter};
9use std::pin::pin;
10use std::time::Duration;
11use steam_vent_proto::enums_clientserver::EMsg;
12use steam_vent_proto::steammessages_clientserver::cmsg_client_games_played::GamePlayed;
13use steam_vent_proto::steammessages_clientserver::CMsgClientGamesPlayed;
14use steam_vent_proto::steammessages_clientserver_2::CMsgGCClient;
15use steam_vent_proto::steammessages_clientserver_login::CMsgClientHello;
16use steam_vent_proto::{MsgKindEnum, RpcMessage, RpcMessageWithKind};
17use tokio::spawn;
18use tokio::sync::mpsc::channel;
19use tokio::time::sleep;
20use tokio_stream::wrappers::ReceiverStream;
21use tokio_stream::StreamExt;
22use tracing::debug;
23
24pub struct GameCoordinator {
25    app_id: u32,
26    filter: MessageFilter,
27    sender: MessageSender,
28    session: Session,
29    timeout: Duration,
30}
31
32/// While these kinds are consistent between games, they are not defined in the generic steam protobufs.
33/// We define them here, so we can implement the game coordinator without requiring the protobufs from a game
34#[repr(i32)]
35#[allow(non_camel_case_types)]
36#[derive(Debug, Copy, Clone, Eq, PartialEq, Default)]
37pub enum GCMsgKind {
38    #[default]
39    Invalid = 0,
40    k_EMsgGCClientWelcome = 4004,
41    k_EMsgGCServerWelcome = 4005,
42    k_EMsgGCClientHello = 4006,
43    k_EMsgGCServerHello = 4007,
44}
45
46impl protobuf::Enum for GCMsgKind {
47    const NAME: &'static str = "GCMsgKind";
48
49    fn value(&self) -> i32 {
50        *self as i32
51    }
52
53    fn from_i32(v: i32) -> Option<Self> {
54        match v {
55            4004 => Some(Self::k_EMsgGCClientWelcome),
56            4005 => Some(Self::k_EMsgGCServerWelcome),
57            4006 => Some(Self::k_EMsgGCClientHello),
58            4007 => Some(Self::k_EMsgGCServerHello),
59            _ => None,
60        }
61    }
62
63    fn from_str(_s: &str) -> Option<Self> {
64        None
65    }
66}
67
68impl MsgKindEnum for GCMsgKind {}
69
70impl Debug for GameCoordinator {
71    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("GameCoordinator")
73            .field("app_id", &self.app_id)
74            .finish_non_exhaustive()
75    }
76}
77
78impl GameCoordinator {
79    pub async fn new(connection: &Connection, app_id: u32) -> Result<Self, NetworkError> {
80        let (tx, rx) = channel(10);
81        let filter = MessageFilter::new(ReceiverStream::new(rx));
82        let gc_messages = connection.on::<ClientFromGcMessage>();
83        spawn(async move {
84            let mut gc_messages = pin!(gc_messages);
85            while let Some(gc_message) = gc_messages.next().await {
86                if let Ok(mut message) = gc_message {
87                    let (kind, is_protobuf) = decode_kind(message.data.msgtype());
88                    debug!(kind = ?kind, is_protobuf, "received gc messages");
89
90                    let payload = message.data.take_payload();
91                    tx.send(RawNetMessage::read(payload)).await.ok();
92                }
93            }
94        });
95
96        let gc = GameCoordinator {
97            app_id,
98            filter,
99            sender: connection.sender().clone(),
100            session: connection.session().clone().with_app_id(app_id),
101            timeout: connection.timeout(),
102        };
103
104        connection
105            .send_with_kind(
106                CMsgClientGamesPlayed {
107                    games_played: vec![GamePlayed {
108                        game_id: Some(app_id as u64),
109                        ..Default::default()
110                    }],
111                    ..Default::default()
112                },
113                EMsg::k_EMsgClientGamesPlayedWithDataBlob,
114            )
115            .await?;
116
117        let welcome = gc.wait_welcome();
118        let hello_sender = async {
119            loop {
120                if let Err(e) = gc.send_hello().await {
121                    return Result::<(), _>::Err(e);
122                };
123                sleep(Duration::from_secs(5)).await;
124            }
125        };
126        select(pin!(welcome), pin!(hello_sender)).await;
127        Ok(gc)
128    }
129
130    async fn send_hello(&self) -> Result<(), NetworkError> {
131        if self.session.is_server() {
132            self.send_with_kind(CMsgClientHello::default(), GCMsgKind::k_EMsgGCServerHello)
133                .await?;
134        } else {
135            self.send_with_kind(CMsgClientHello::default(), GCMsgKind::k_EMsgGCClientHello)
136                .await?;
137        }
138        Ok(())
139    }
140
141    async fn wait_welcome(&self) -> Result<(), NetworkError> {
142        if self.session.is_server() {
143            self.filter.one_kind(GCMsgKind::k_EMsgGCServerWelcome)
144        } else {
145            self.filter.one_kind(GCMsgKind::k_EMsgGCClientWelcome)
146        }
147        .await
148        .map_err(|_| NetworkError::EOF)?;
149        Ok(())
150    }
151}
152
153impl ConnectionImpl for GameCoordinator {
154    fn timeout(&self) -> Duration {
155        self.timeout
156    }
157
158    fn filter(&self) -> &MessageFilter {
159        &self.filter
160    }
161
162    fn session(&self) -> &Session {
163        &self.session
164    }
165
166    async fn raw_send_with_kind<Msg: EncodableMessage, K: MsgKindEnum>(
167        &self,
168        header: NetMessageHeader,
169        msg: Msg,
170        kind: K,
171        is_protobuf: bool,
172    ) -> Result<(), NetworkError> {
173        let nested_header = NetMessageHeader::default();
174        let mut payload: Vec<u8> = Vec::with_capacity(
175            nested_header.encode_size(kind.into(), is_protobuf) + msg.encode_size(),
176        );
177
178        nested_header.write(&mut payload, kind, is_protobuf)?;
179        msg.write_body(&mut payload)?;
180        let data = CMsgGCClient {
181            appid: Some(self.app_id),
182            msgtype: Some(kind.encode_kind(is_protobuf)),
183            payload: Some(payload),
184            ..Default::default()
185        };
186
187        let msg = RawNetMessage::from_message(header, ClientToGcMessage { data })?;
188        self.sender.send_raw(msg).await
189    }
190}
191
192#[derive(Debug)]
193struct ClientToGcMessage {
194    data: CMsgGCClient,
195}
196
197impl RpcMessageWithKind for ClientToGcMessage {
198    type KindEnum = EMsg;
199    const KIND: Self::KindEnum = EMsg::k_EMsgClientToGC;
200}
201
202impl RpcMessage for ClientToGcMessage {
203    fn parse(reader: &mut dyn std::io::Read) -> protobuf::Result<Self> {
204        let data = <CMsgGCClient as Message>::parse_from_reader(reader)?;
205        Ok(ClientToGcMessage { data })
206    }
207    fn write(&self, writer: &mut dyn std::io::Write) -> protobuf::Result<()> {
208        self.data.write_to_writer(writer)
209    }
210    fn encode_size(&self) -> usize {
211        self.data.compute_size() as usize
212    }
213}
214
215#[derive(Debug)]
216struct ClientFromGcMessage {
217    data: CMsgGCClient,
218}
219
220impl RpcMessageWithKind for ClientFromGcMessage {
221    type KindEnum = EMsg;
222    const KIND: Self::KindEnum = EMsg::k_EMsgClientFromGC;
223}
224
225impl RpcMessage for ClientFromGcMessage {
226    fn parse(reader: &mut dyn std::io::Read) -> protobuf::Result<Self> {
227        let data = <CMsgGCClient as Message>::parse_from_reader(reader)?;
228        Ok(ClientFromGcMessage { data })
229    }
230    fn write(&self, writer: &mut dyn std::io::Write) -> protobuf::Result<()> {
231        self.data.write_to_writer(writer)
232    }
233    fn encode_size(&self) -> usize {
234        self.data.compute_size() as usize
235    }
236}