aetheris_server/matchmaking/
mod.rs1use crate::auth::AuthServiceImpl;
2use aetheris_protocol::auth::v1::auth_service_server::AuthService;
3use aetheris_protocol::matchmaking::v1::{
4 CancelQueueRequest, CancelQueueResponse, HeartbeatRequest, HeartbeatResponse,
5 ListServersRequest, ListServersResponse, MatchFoundStatus, QueueRequest, QueueUpdate,
6 QueuedStatus, RegisterInstanceRequest, RegisterInstanceResponse, ServerInstance,
7 matchmaking_service_server::MatchmakingService, queue_update::Status as UpdateStatus,
8};
9use async_trait::async_trait;
10use dashmap::DashMap;
11use std::sync::Arc;
12use tokio::sync::{mpsc, oneshot};
13use tokio_stream::wrappers::ReceiverStream;
14use tonic::{Request, Response, Status};
15
16#[derive(Debug, Clone)]
17pub struct RegisteredServer {
18 pub info: ServerInstance,
19 pub last_heartbeat: std::time::Instant,
20}
21
22#[derive(Clone)]
23pub struct MatchmakingServiceImpl {
24 servers: Arc<DashMap<String, RegisteredServer>>,
25 authorizer: Arc<AuthServiceImpl>,
26 active_queues: Arc<DashMap<String, oneshot::Sender<()>>>,
28}
29
30impl MatchmakingServiceImpl {
33 #[must_use]
34 pub fn new(authorizer: Arc<AuthServiceImpl>) -> Self {
35 Self {
36 servers: Arc::new(DashMap::new()),
37 authorizer,
38 active_queues: Arc::new(DashMap::new()),
39 }
40 }
41}
42
43#[async_trait]
44impl MatchmakingService for MatchmakingServiceImpl {
45 type JoinQueueStream = ReceiverStream<Result<QueueUpdate, Status>>;
46
47 async fn join_queue(
48 &self,
49 request: Request<QueueRequest>,
50 ) -> Result<Response<Self::JoinQueueStream>, Status> {
51 let req = request.into_inner();
52
53 if !self.authorizer.is_authorized(&req.session_token) {
54 return Err(Status::unauthenticated("Invalid session"));
55 }
56
57 let session_token = req.session_token.clone();
58 let (tx, rx) = mpsc::channel(10);
59
60 let tx_queued = tx.clone();
62 tokio::spawn(async move {
63 let _ = tx_queued
64 .send(Ok(QueueUpdate {
65 status: Some(UpdateStatus::Queued(QueuedStatus {
66 position: 1,
67 estimated_wait_seconds: 1,
68 })),
69 }))
70 .await;
71 });
72
73 let servers = self.servers.clone();
74 let auth_clone = self.authorizer.clone();
75 let active_queues = self.active_queues.clone();
76 let (cancel_tx, cancel_rx) = oneshot::channel();
77
78 active_queues.insert(session_token.clone(), cancel_tx);
79
80 tokio::spawn(async move {
81 tokio::select! {
82 _ = cancel_rx => {
83 let _ = tx.send(Err(Status::cancelled("Matchmaking cancelled by user"))).await;
84 }
85 () = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
86 let optimal = servers
87 .iter()
88 .filter(|s| s.last_heartbeat.elapsed().as_secs() < 30)
89 .filter(|s| s.info.players + s.info.reserved < s.info.max_players)
90 .max_by_key(|s| s.info.max_players - (s.info.players + s.info.reserved))
91 .map(|s| s.info.clone());
92
93 if let Some(server) = optimal {
94 use aetheris_protocol::auth::v1::ConnectTokenRequest;
95 let connect_req = Request::new(ConnectTokenRequest {
96 session_token: session_token.clone(),
97 server_address: server.addr.clone(),
98 });
99
100 let auth = auth_clone.clone();
101 match auth.issue_connect_token(connect_req).await {
102 Ok(resp) => {
103 let inner_resp = resp.into_inner();
104 let _ = tx
105 .send(Ok(QueueUpdate {
106 status: Some(UpdateStatus::Matched(MatchFoundStatus {
107 quic_token: inner_resp.token,
108 server_address: server.addr,
109 world_instance_id: server.instance_id,
110 })),
111 }))
112 .await;
113 }
114 Err(e) => {
115 let _ = tx
116 .send(Err(Status::internal(format!(
117 "Failed to issue connect token: {e}"
118 ))))
119 .await;
120 }
121 }
122 } else {
123 let _ = tx
124 .send(Err(Status::resource_exhausted("No servers available")))
125 .await;
126 }
127 }
128 }
129 active_queues.remove(&session_token);
130 });
131
132 Ok(Response::new(ReceiverStream::new(rx)))
133 }
134
135 async fn cancel_queue(
136 &self,
137 request: Request<CancelQueueRequest>,
138 ) -> Result<Response<CancelQueueResponse>, Status> {
139 let req = request.into_inner();
140 let success = if let Some((_, cancel_tx)) = self.active_queues.remove(&req.session_token) {
141 let _ = cancel_tx.send(());
142 true
143 } else {
144 false
145 };
146 Ok(Response::new(CancelQueueResponse { success }))
147 }
148
149 async fn list_servers(
150 &self,
151 _request: Request<ListServersRequest>,
152 ) -> Result<Response<ListServersResponse>, Status> {
153 let instances = self.servers.iter().map(|s| s.info.clone()).collect();
154 Ok(Response::new(ListServersResponse { instances }))
155 }
156
157 async fn register_instance(
158 &self,
159 request: Request<RegisterInstanceRequest>,
160 ) -> Result<Response<RegisterInstanceResponse>, Status> {
161 let req = request.into_inner();
162 let Some(instance) = req.instance else {
163 return Err(Status::invalid_argument("Missing instance info"));
164 };
165
166 self.servers.insert(
167 instance.instance_id.clone(),
168 RegisteredServer {
169 info: instance,
170 last_heartbeat: std::time::Instant::now(),
171 },
172 );
173
174 Ok(Response::new(RegisterInstanceResponse { success: true }))
175 }
176
177 async fn heartbeat(
178 &self,
179 request: Request<HeartbeatRequest>,
180 ) -> Result<Response<HeartbeatResponse>, Status> {
181 let req = request.into_inner();
182 if let Some(mut server) = self.servers.get_mut(&req.instance_id) {
183 server.info.players = req.players;
184 server.last_heartbeat = std::time::Instant::now();
185 Ok(Response::new(HeartbeatResponse { ok: true }))
186 } else {
187 Err(Status::not_found("Instance not registered"))
188 }
189 }
190}