Skip to main content

aetheris_server/matchmaking/
mod.rs

1use crate::auth::AuthServiceImpl;
2use aetheris_protocol::auth::v1::auth_service_server::AuthService;
3use aetheris_protocol::matchmaking::v1::{
4    CancelQueueRequest, CancelQueueResponse, HeartbeatRequest, HeartbeatResponse,
5    ListServersRequest, ListServersResponse, MatchFoundStatus, QueueRequest, QueueUpdate,
6    QueuedStatus, RegisterInstanceRequest, RegisterInstanceResponse, ServerInstance,
7    matchmaking_service_server::MatchmakingService, queue_update::Status as UpdateStatus,
8};
9use async_trait::async_trait;
10use dashmap::DashMap;
11use std::sync::Arc;
12use tokio::sync::{mpsc, oneshot};
13use tokio_stream::wrappers::ReceiverStream;
14use tonic::{Request, Response, Status};
15
16#[derive(Debug, Clone)]
17pub struct RegisteredServer {
18    pub info: ServerInstance,
19    pub last_heartbeat: std::time::Instant,
20}
21
22#[derive(Clone)]
23pub struct MatchmakingServiceImpl {
24    servers: Arc<DashMap<String, RegisteredServer>>,
25    authorizer: Arc<AuthServiceImpl>,
26    /// Tracks active matchmaking tasks by session token (using oneshot to signal cancellation)
27    active_queues: Arc<DashMap<String, oneshot::Sender<()>>>,
28}
29
30// Default removed as we need an authorizer now.
31
32impl MatchmakingServiceImpl {
33    #[must_use]
34    pub fn new(authorizer: Arc<AuthServiceImpl>) -> Self {
35        Self {
36            servers: Arc::new(DashMap::new()),
37            authorizer,
38            active_queues: Arc::new(DashMap::new()),
39        }
40    }
41}
42
43#[async_trait]
44impl MatchmakingService for MatchmakingServiceImpl {
45    type JoinQueueStream = ReceiverStream<Result<QueueUpdate, Status>>;
46
47    async fn join_queue(
48        &self,
49        request: Request<QueueRequest>,
50    ) -> Result<Response<Self::JoinQueueStream>, Status> {
51        let req = request.into_inner();
52
53        if !self.authorizer.is_authorized(&req.session_token) {
54            return Err(Status::unauthenticated("Invalid session"));
55        }
56
57        let session_token = req.session_token.clone();
58        let (tx, rx) = mpsc::channel(10);
59
60        // Immediate Queued status
61        let tx_queued = tx.clone();
62        tokio::spawn(async move {
63            let _ = tx_queued
64                .send(Ok(QueueUpdate {
65                    status: Some(UpdateStatus::Queued(QueuedStatus {
66                        position: 1,
67                        estimated_wait_seconds: 1,
68                    })),
69                }))
70                .await;
71        });
72
73        let servers = self.servers.clone();
74        let auth_clone = self.authorizer.clone();
75        let active_queues = self.active_queues.clone();
76        let (cancel_tx, cancel_rx) = oneshot::channel();
77
78        active_queues.insert(session_token.clone(), cancel_tx);
79
80        tokio::spawn(async move {
81            tokio::select! {
82                _ = cancel_rx => {
83                    let _ = tx.send(Err(Status::cancelled("Matchmaking cancelled by user"))).await;
84                }
85                () = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
86                    let optimal = servers
87                        .iter()
88                        .filter(|s| s.last_heartbeat.elapsed().as_secs() < 30)
89                        .filter(|s| s.info.players + s.info.reserved < s.info.max_players)
90                        .max_by_key(|s| s.info.max_players - (s.info.players + s.info.reserved))
91                        .map(|s| s.info.clone());
92
93                    if let Some(server) = optimal {
94                        use aetheris_protocol::auth::v1::ConnectTokenRequest;
95                        let connect_req = Request::new(ConnectTokenRequest {
96                            session_token: session_token.clone(),
97                            server_address: server.addr.clone(),
98                        });
99
100                        let auth = auth_clone.clone();
101                        match auth.issue_connect_token(connect_req).await {
102                            Ok(resp) => {
103                                let inner_resp = resp.into_inner();
104                                let _ = tx
105                                    .send(Ok(QueueUpdate {
106                                        status: Some(UpdateStatus::Matched(MatchFoundStatus {
107                                            quic_token: inner_resp.token,
108                                            server_address: server.addr,
109                                            world_instance_id: server.instance_id,
110                                        })),
111                                    }))
112                                    .await;
113                            }
114                            Err(e) => {
115                                let _ = tx
116                                    .send(Err(Status::internal(format!(
117                                        "Failed to issue connect token: {e}"
118                                    ))))
119                                    .await;
120                            }
121                        }
122                    } else {
123                        let _ = tx
124                            .send(Err(Status::resource_exhausted("No servers available")))
125                            .await;
126                    }
127                }
128            }
129            active_queues.remove(&session_token);
130        });
131
132        Ok(Response::new(ReceiverStream::new(rx)))
133    }
134
135    async fn cancel_queue(
136        &self,
137        request: Request<CancelQueueRequest>,
138    ) -> Result<Response<CancelQueueResponse>, Status> {
139        let req = request.into_inner();
140        let success = if let Some((_, cancel_tx)) = self.active_queues.remove(&req.session_token) {
141            let _ = cancel_tx.send(());
142            true
143        } else {
144            false
145        };
146        Ok(Response::new(CancelQueueResponse { success }))
147    }
148
149    async fn list_servers(
150        &self,
151        _request: Request<ListServersRequest>,
152    ) -> Result<Response<ListServersResponse>, Status> {
153        let instances = self.servers.iter().map(|s| s.info.clone()).collect();
154        Ok(Response::new(ListServersResponse { instances }))
155    }
156
157    async fn register_instance(
158        &self,
159        request: Request<RegisterInstanceRequest>,
160    ) -> Result<Response<RegisterInstanceResponse>, Status> {
161        let req = request.into_inner();
162        let Some(instance) = req.instance else {
163            return Err(Status::invalid_argument("Missing instance info"));
164        };
165
166        self.servers.insert(
167            instance.instance_id.clone(),
168            RegisteredServer {
169                info: instance,
170                last_heartbeat: std::time::Instant::now(),
171            },
172        );
173
174        Ok(Response::new(RegisterInstanceResponse { success: true }))
175    }
176
177    async fn heartbeat(
178        &self,
179        request: Request<HeartbeatRequest>,
180    ) -> Result<Response<HeartbeatResponse>, Status> {
181        let req = request.into_inner();
182        if let Some(mut server) = self.servers.get_mut(&req.instance_id) {
183            server.info.players = req.players;
184            server.last_heartbeat = std::time::Instant::now();
185            Ok(Response::new(HeartbeatResponse { ok: true }))
186        } else {
187            Err(Status::not_found("Instance not registered"))
188        }
189    }
190}