aetheris_server/matchmaking/
mod.rs1use crate::auth::AuthServiceImpl;
2use aetheris_protocol::auth::v1::auth_service_server::AuthService;
3use aetheris_protocol::matchmaking::v1::{
4 CancelQueueRequest, CancelQueueResponse, HeartbeatRequest, HeartbeatResponse,
5 ListServersRequest, ListServersResponse, MatchFoundStatus, QueueRequest, QueueUpdate,
6 QueuedStatus, RegisterInstanceRequest, RegisterInstanceResponse, ServerInstance,
7 matchmaking_service_server::MatchmakingService, queue_update::Status as UpdateStatus,
8};
9use async_trait::async_trait;
10use dashmap::DashMap;
11use std::sync::Arc;
12use tokio::sync::mpsc;
13use tokio_stream::wrappers::ReceiverStream;
14use tonic::{Request, Response, Status};
15
16#[derive(Debug, Clone)]
17pub struct RegisteredServer {
18 pub info: ServerInstance,
19 pub last_heartbeat: std::time::Instant,
20}
21
22#[derive(Clone)]
23pub struct MatchmakingServiceImpl {
24 servers: Arc<DashMap<String, RegisteredServer>>,
25 authorizer: Arc<AuthServiceImpl>,
26}
27
28impl MatchmakingServiceImpl {
31 #[must_use]
32 pub fn new(authorizer: Arc<AuthServiceImpl>) -> Self {
33 Self {
34 servers: Arc::new(DashMap::new()),
35 authorizer,
36 }
37 }
38}
39
40#[async_trait]
41impl MatchmakingService for MatchmakingServiceImpl {
42 type JoinQueueStream = ReceiverStream<Result<QueueUpdate, Status>>;
43
44 async fn join_queue(
45 &self,
46 request: Request<QueueRequest>,
47 ) -> Result<Response<Self::JoinQueueStream>, Status> {
48 let req = request.into_inner();
49
50 if !self.authorizer.is_authorized(&req.session_token) {
51 return Err(Status::unauthenticated("Invalid session"));
52 }
53
54 let session_token = req.session_token.clone();
55 let (tx, rx) = mpsc::channel(10);
56
57 let tx_queued = tx.clone();
59 tokio::spawn(async move {
60 let _ = tx_queued
61 .send(Ok(QueueUpdate {
62 status: Some(UpdateStatus::Queued(QueuedStatus {
63 position: 1,
64 estimated_wait_seconds: 1,
65 })),
66 }))
67 .await;
68 });
69
70 let servers = self.servers.clone();
71 let auth_clone = self.authorizer.clone();
72 tokio::spawn(async move {
73 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
74
75 let optimal = servers
76 .iter()
77 .filter(|s| s.last_heartbeat.elapsed().as_secs() < 30)
78 .filter(|s| s.info.players + s.info.reserved < s.info.max_players)
79 .max_by_key(|s| s.info.max_players - (s.info.players + s.info.reserved))
80 .map(|s| s.info.clone());
81
82 if let Some(server) = optimal {
83 use aetheris_protocol::auth::v1::ConnectTokenRequest;
84 let connect_req = Request::new(ConnectTokenRequest {
85 session_token,
86 server_address: server.addr.clone(),
87 });
88
89 let auth = auth_clone.clone();
90 match auth.issue_connect_token(connect_req).await {
91 Ok(resp) => {
92 let inner_resp = resp.into_inner();
93 let _ = tx
94 .send(Ok(QueueUpdate {
95 status: Some(UpdateStatus::Matched(MatchFoundStatus {
96 quic_token: inner_resp.token,
97 server_address: server.addr,
98 world_instance_id: server.instance_id,
99 })),
100 }))
101 .await;
102 }
103 Err(e) => {
104 let _ = tx
105 .send(Err(Status::internal(format!(
106 "Failed to issue connect token: {e}"
107 ))))
108 .await;
109 }
110 }
111 } else {
112 let _ = tx
113 .send(Err(Status::resource_exhausted("No servers available")))
114 .await;
115 }
116 });
117
118 Ok(Response::new(ReceiverStream::new(rx)))
119 }
120
121 async fn cancel_queue(
122 &self,
123 _request: Request<CancelQueueRequest>,
124 ) -> Result<Response<CancelQueueResponse>, Status> {
125 Ok(Response::new(CancelQueueResponse { success: true }))
126 }
127
128 async fn list_servers(
129 &self,
130 _request: Request<ListServersRequest>,
131 ) -> Result<Response<ListServersResponse>, Status> {
132 let instances = self.servers.iter().map(|s| s.info.clone()).collect();
133 Ok(Response::new(ListServersResponse { instances }))
134 }
135
136 async fn register_instance(
137 &self,
138 request: Request<RegisterInstanceRequest>,
139 ) -> Result<Response<RegisterInstanceResponse>, Status> {
140 let req = request.into_inner();
141 let Some(instance) = req.instance else {
142 return Err(Status::invalid_argument("Missing instance info"));
143 };
144
145 self.servers.insert(
146 instance.instance_id.clone(),
147 RegisteredServer {
148 info: instance,
149 last_heartbeat: std::time::Instant::now(),
150 },
151 );
152
153 Ok(Response::new(RegisterInstanceResponse { success: true }))
154 }
155
156 async fn heartbeat(
157 &self,
158 request: Request<HeartbeatRequest>,
159 ) -> Result<Response<HeartbeatResponse>, Status> {
160 let req = request.into_inner();
161 if let Some(mut server) = self.servers.get_mut(&req.instance_id) {
162 server.info.players = req.players;
163 server.last_heartbeat = std::time::Instant::now();
164 Ok(Response::new(HeartbeatResponse { ok: true }))
165 } else {
166 Err(Status::not_found("Instance not registered"))
167 }
168 }
169}