steam_vent/
game_coordinator.rs1use crate::connection::{ConnectionImpl, ConnectionTrait, MessageFilter, MessageSender};
2use crate::message::EncodableMessage;
3use crate::net::{decode_kind, NetMessageHeader, RawNetMessage};
4use crate::session::Session;
5use crate::{Connection, NetworkError};
6use futures_util::future::select;
7use protobuf::Message;
8use std::fmt::{Debug, Formatter};
9use std::pin::pin;
10use std::time::Duration;
11use steam_vent_proto::enums_clientserver::EMsg;
12use steam_vent_proto::steammessages_clientserver::cmsg_client_games_played::GamePlayed;
13use steam_vent_proto::steammessages_clientserver::CMsgClientGamesPlayed;
14use steam_vent_proto::steammessages_clientserver_2::CMsgGCClient;
15use steam_vent_proto::steammessages_clientserver_login::CMsgClientHello;
16use steam_vent_proto::{MsgKindEnum, RpcMessage, RpcMessageWithKind};
17use tokio::spawn;
18use tokio::sync::mpsc::channel;
19use tokio::time::sleep;
20use tokio_stream::wrappers::ReceiverStream;
21use tokio_stream::StreamExt;
22use tracing::debug;
23
24pub struct GameCoordinator {
25 app_id: u32,
26 filter: MessageFilter,
27 sender: MessageSender,
28 session: Session,
29 timeout: Duration,
30}
31
32#[repr(i32)]
35#[allow(non_camel_case_types)]
36#[derive(Debug, Copy, Clone, Eq, PartialEq, Default)]
37pub enum GCMsgKind {
38 #[default]
39 Invalid = 0,
40 k_EMsgGCClientWelcome = 4004,
41 k_EMsgGCServerWelcome = 4005,
42 k_EMsgGCClientHello = 4006,
43 k_EMsgGCServerHello = 4007,
44}
45
46impl protobuf::Enum for GCMsgKind {
47 const NAME: &'static str = "GCMsgKind";
48
49 fn value(&self) -> i32 {
50 *self as i32
51 }
52
53 fn from_i32(v: i32) -> Option<Self> {
54 match v {
55 4004 => Some(Self::k_EMsgGCClientWelcome),
56 4005 => Some(Self::k_EMsgGCServerWelcome),
57 4006 => Some(Self::k_EMsgGCClientHello),
58 4007 => Some(Self::k_EMsgGCServerHello),
59 _ => None,
60 }
61 }
62
63 fn from_str(_s: &str) -> Option<Self> {
64 None
65 }
66}
67
68impl MsgKindEnum for GCMsgKind {}
69
70impl Debug for GameCoordinator {
71 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("GameCoordinator")
73 .field("app_id", &self.app_id)
74 .finish_non_exhaustive()
75 }
76}
77
78impl GameCoordinator {
79 pub async fn new(connection: &Connection, app_id: u32) -> Result<Self, NetworkError> {
80 let (tx, rx) = channel(10);
81 let filter = MessageFilter::new(ReceiverStream::new(rx));
82 let gc_messages = connection.on::<ClientFromGcMessage>();
83 spawn(async move {
84 let mut gc_messages = pin!(gc_messages);
85 while let Some(gc_message) = gc_messages.next().await {
86 if let Ok(mut message) = gc_message {
87 let (kind, is_protobuf) = decode_kind(message.data.msgtype());
88 debug!(kind = ?kind, is_protobuf, "received gc messages");
89
90 let payload = message.data.take_payload();
91 tx.send(RawNetMessage::read(payload)).await.ok();
92 }
93 }
94 });
95
96 let gc = GameCoordinator {
97 app_id,
98 filter,
99 sender: connection.sender().clone(),
100 session: connection.session().clone().with_app_id(app_id),
101 timeout: connection.timeout(),
102 };
103
104 connection
105 .send_with_kind(
106 CMsgClientGamesPlayed {
107 games_played: vec![GamePlayed {
108 game_id: Some(app_id as u64),
109 ..Default::default()
110 }],
111 ..Default::default()
112 },
113 EMsg::k_EMsgClientGamesPlayedWithDataBlob,
114 )
115 .await?;
116
117 let welcome = gc.wait_welcome();
118 let hello_sender = async {
119 loop {
120 if let Err(e) = gc.send_hello().await {
121 return Result::<(), _>::Err(e);
122 };
123 sleep(Duration::from_secs(5)).await;
124 }
125 };
126 select(pin!(welcome), pin!(hello_sender)).await;
127 Ok(gc)
128 }
129
130 async fn send_hello(&self) -> Result<(), NetworkError> {
131 if self.session.is_server() {
132 self.send_with_kind(CMsgClientHello::default(), GCMsgKind::k_EMsgGCServerHello)
133 .await?;
134 } else {
135 self.send_with_kind(CMsgClientHello::default(), GCMsgKind::k_EMsgGCClientHello)
136 .await?;
137 }
138 Ok(())
139 }
140
141 async fn wait_welcome(&self) -> Result<(), NetworkError> {
142 if self.session.is_server() {
143 self.filter.one_kind(GCMsgKind::k_EMsgGCServerWelcome)
144 } else {
145 self.filter.one_kind(GCMsgKind::k_EMsgGCClientWelcome)
146 }
147 .await
148 .map_err(|_| NetworkError::EOF)?;
149 Ok(())
150 }
151}
152
153impl ConnectionImpl for GameCoordinator {
154 fn timeout(&self) -> Duration {
155 self.timeout
156 }
157
158 fn filter(&self) -> &MessageFilter {
159 &self.filter
160 }
161
162 fn session(&self) -> &Session {
163 &self.session
164 }
165
166 async fn raw_send_with_kind<Msg: EncodableMessage, K: MsgKindEnum>(
167 &self,
168 header: NetMessageHeader,
169 msg: Msg,
170 kind: K,
171 is_protobuf: bool,
172 ) -> Result<(), NetworkError> {
173 let nested_header = NetMessageHeader::default();
174 let mut payload: Vec<u8> = Vec::with_capacity(
175 nested_header.encode_size(kind.into(), is_protobuf) + msg.encode_size(),
176 );
177
178 nested_header.write(&mut payload, kind, is_protobuf)?;
179 msg.write_body(&mut payload)?;
180 let data = CMsgGCClient {
181 appid: Some(self.app_id),
182 msgtype: Some(kind.encode_kind(is_protobuf)),
183 payload: Some(payload),
184 ..Default::default()
185 };
186
187 let msg = RawNetMessage::from_message(header, ClientToGcMessage { data })?;
188 self.sender.send_raw(msg).await
189 }
190}
191
192#[derive(Debug)]
193struct ClientToGcMessage {
194 data: CMsgGCClient,
195}
196
197impl RpcMessageWithKind for ClientToGcMessage {
198 type KindEnum = EMsg;
199 const KIND: Self::KindEnum = EMsg::k_EMsgClientToGC;
200}
201
202impl RpcMessage for ClientToGcMessage {
203 fn parse(reader: &mut dyn std::io::Read) -> protobuf::Result<Self> {
204 let data = <CMsgGCClient as Message>::parse_from_reader(reader)?;
205 Ok(ClientToGcMessage { data })
206 }
207 fn write(&self, writer: &mut dyn std::io::Write) -> protobuf::Result<()> {
208 self.data.write_to_writer(writer)
209 }
210 fn encode_size(&self) -> usize {
211 self.data.compute_size() as usize
212 }
213}
214
215#[derive(Debug)]
216struct ClientFromGcMessage {
217 data: CMsgGCClient,
218}
219
220impl RpcMessageWithKind for ClientFromGcMessage {
221 type KindEnum = EMsg;
222 const KIND: Self::KindEnum = EMsg::k_EMsgClientFromGC;
223}
224
225impl RpcMessage for ClientFromGcMessage {
226 fn parse(reader: &mut dyn std::io::Read) -> protobuf::Result<Self> {
227 let data = <CMsgGCClient as Message>::parse_from_reader(reader)?;
228 Ok(ClientFromGcMessage { data })
229 }
230 fn write(&self, writer: &mut dyn std::io::Write) -> protobuf::Result<()> {
231 self.data.write_to_writer(writer)
232 }
233 fn encode_size(&self) -> usize {
234 self.data.compute_size() as usize
235 }
236}