Skip to main content

aetheris_server/matchmaking/
mod.rs

1use crate::auth::AuthServiceImpl;
2use aetheris_protocol::auth::v1::auth_service_server::AuthService;
3use aetheris_protocol::matchmaking::v1::{
4    CancelQueueRequest, CancelQueueResponse, HeartbeatRequest, HeartbeatResponse,
5    ListServersRequest, ListServersResponse, MatchFoundStatus, QueueRequest, QueueUpdate,
6    QueuedStatus, RegisterInstanceRequest, RegisterInstanceResponse, ServerInstance,
7    matchmaking_service_server::MatchmakingService, queue_update::Status as UpdateStatus,
8};
9use async_trait::async_trait;
10use dashmap::DashMap;
11use std::sync::Arc;
12use tokio::sync::mpsc;
13use tokio_stream::wrappers::ReceiverStream;
14use tonic::{Request, Response, Status};
15
16#[derive(Debug, Clone)]
17pub struct RegisteredServer {
18    pub info: ServerInstance,
19    pub last_heartbeat: std::time::Instant,
20}
21
22#[derive(Clone)]
23pub struct MatchmakingServiceImpl {
24    servers: Arc<DashMap<String, RegisteredServer>>,
25    authorizer: Arc<AuthServiceImpl>,
26}
27
28// Default removed as we need an authorizer now.
29
30impl MatchmakingServiceImpl {
31    #[must_use]
32    pub fn new(authorizer: Arc<AuthServiceImpl>) -> Self {
33        Self {
34            servers: Arc::new(DashMap::new()),
35            authorizer,
36        }
37    }
38}
39
40#[async_trait]
41impl MatchmakingService for MatchmakingServiceImpl {
42    type JoinQueueStream = ReceiverStream<Result<QueueUpdate, Status>>;
43
44    async fn join_queue(
45        &self,
46        request: Request<QueueRequest>,
47    ) -> Result<Response<Self::JoinQueueStream>, Status> {
48        let req = request.into_inner();
49
50        if !self.authorizer.is_authorized(&req.session_token) {
51            return Err(Status::unauthenticated("Invalid session"));
52        }
53
54        let session_token = req.session_token.clone();
55        let (tx, rx) = mpsc::channel(10);
56
57        // Immediate Queued status
58        let tx_queued = tx.clone();
59        tokio::spawn(async move {
60            let _ = tx_queued
61                .send(Ok(QueueUpdate {
62                    status: Some(UpdateStatus::Queued(QueuedStatus {
63                        position: 1,
64                        estimated_wait_seconds: 1,
65                    })),
66                }))
67                .await;
68        });
69
70        let servers = self.servers.clone();
71        let auth_clone = self.authorizer.clone();
72        tokio::spawn(async move {
73            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
74
75            let optimal = servers
76                .iter()
77                .filter(|s| s.last_heartbeat.elapsed().as_secs() < 30)
78                .filter(|s| s.info.players + s.info.reserved < s.info.max_players)
79                .max_by_key(|s| s.info.max_players - (s.info.players + s.info.reserved))
80                .map(|s| s.info.clone());
81
82            if let Some(server) = optimal {
83                use aetheris_protocol::auth::v1::ConnectTokenRequest;
84                let connect_req = Request::new(ConnectTokenRequest {
85                    session_token,
86                    server_address: server.addr.clone(),
87                });
88
89                let auth = auth_clone.clone();
90                match auth.issue_connect_token(connect_req).await {
91                    Ok(resp) => {
92                        let inner_resp = resp.into_inner();
93                        let _ = tx
94                            .send(Ok(QueueUpdate {
95                                status: Some(UpdateStatus::Matched(MatchFoundStatus {
96                                    quic_token: inner_resp.token,
97                                    server_address: server.addr,
98                                    world_instance_id: server.instance_id,
99                                })),
100                            }))
101                            .await;
102                    }
103                    Err(e) => {
104                        let _ = tx
105                            .send(Err(Status::internal(format!(
106                                "Failed to issue connect token: {e}"
107                            ))))
108                            .await;
109                    }
110                }
111            } else {
112                let _ = tx
113                    .send(Err(Status::resource_exhausted("No servers available")))
114                    .await;
115            }
116        });
117
118        Ok(Response::new(ReceiverStream::new(rx)))
119    }
120
121    async fn cancel_queue(
122        &self,
123        _request: Request<CancelQueueRequest>,
124    ) -> Result<Response<CancelQueueResponse>, Status> {
125        Ok(Response::new(CancelQueueResponse { success: true }))
126    }
127
128    async fn list_servers(
129        &self,
130        _request: Request<ListServersRequest>,
131    ) -> Result<Response<ListServersResponse>, Status> {
132        let instances = self.servers.iter().map(|s| s.info.clone()).collect();
133        Ok(Response::new(ListServersResponse { instances }))
134    }
135
136    async fn register_instance(
137        &self,
138        request: Request<RegisterInstanceRequest>,
139    ) -> Result<Response<RegisterInstanceResponse>, Status> {
140        let req = request.into_inner();
141        let Some(instance) = req.instance else {
142            return Err(Status::invalid_argument("Missing instance info"));
143        };
144
145        self.servers.insert(
146            instance.instance_id.clone(),
147            RegisteredServer {
148                info: instance,
149                last_heartbeat: std::time::Instant::now(),
150            },
151        );
152
153        Ok(Response::new(RegisterInstanceResponse { success: true }))
154    }
155
156    async fn heartbeat(
157        &self,
158        request: Request<HeartbeatRequest>,
159    ) -> Result<Response<HeartbeatResponse>, Status> {
160        let req = request.into_inner();
161        if let Some(mut server) = self.servers.get_mut(&req.instance_id) {
162            server.info.players = req.players;
163            server.last_heartbeat = std::time::Instant::now();
164            Ok(Response::new(HeartbeatResponse { ok: true }))
165        } else {
166            Err(Status::not_found("Instance not registered"))
167        }
168    }
169}