Skip to main content

comet/server/
server.rs

1use std::fmt::Debug;
2use std::sync::Arc;
3
4use std::net::SocketAddr;
5
6use tokio::sync::RwLock;
7
8use axum::{
9    extract::{
10        ws::{WebSocket, WebSocketUpgrade},
11        ConnectInfo,
12    },
13    response::Response,
14    routing::get,
15    Extension, Router,
16};
17use axum_extra::routing::SpaRouter;
18
19use crate::core::prelude::ProtoTrait;
20
21use serde::de::DeserializeOwned;
22use serde::Serialize;
23
24use super::{client::Client, universe::Universe};
25
26use futures::stream::StreamExt;
27use log::*;
28
29async fn handler<P: ProtoTrait + Send + 'static + Serialize + DeserializeOwned + Debug>(
30    ws: WebSocketUpgrade,
31    ConnectInfo(addr): ConnectInfo<SocketAddr>,
32    Extension(universe): Extension<Universe>,
33) -> Response
34where
35    <P as ProtoTrait>::Client: Send,
36    P: ProtoTrait<Client = Client>,
37{
38    debug!("New connection from {}", addr);
39    ws.on_upgrade(move |socket| handle_socket::<P>(socket, universe, addr))
40}
41
42async fn handle_socket<P: ProtoTrait + Send + 'static + Serialize + DeserializeOwned + Debug>(
43    socket: WebSocket,
44    universe: Universe,
45    addr: SocketAddr,
46) where
47    <P as ProtoTrait>::Client: Send,
48    P: ProtoTrait<Client = Client>,
49{
50    let (tx, mut rx) = socket.split();
51
52    let tx = Arc::new(RwLock::new(tx));
53
54    let session_id = universe
55        .write()
56        .await
57        .new_client(Client::new(tx.clone(), universe.clone()));
58
59    debug!("Session id: {} for {}", session_id, addr);
60
61    while let Some(msg) = rx.next().await {
62        let msg = if let Ok(msg) = msg {
63            msg
64        } else {
65            break;
66        };
67
68        let client = universe.read().await.get_client(session_id);
69
70        client.handle_msg::<P>(msg.into()).await;
71    }
72
73    debug!("{}: Client disconnected", session_id);
74
75    // client disconnected
76    universe.write().await.remove_client(session_id).await;
77}
78
79pub async fn run<P: ProtoTrait + Send + 'static + Serialize + DeserializeOwned + Debug>()
80where
81    <P as ProtoTrait>::Client: Send,
82    P: ProtoTrait<Client = Client>,
83{
84    let app = Router::new()
85        .route("/ws", get(handler::<P>))
86        .layer(Extension(crate::UNIVERSE.clone()))
87        .merge(SpaRouter::new("/assets", "dist"));
88
89    let addr = "0.0.0.0:8080";
90
91    info!("Listening on {}", addr);
92
93    axum::Server::bind(&addr.parse().unwrap())
94        .serve(app.into_make_service_with_connect_info::<SocketAddr>())
95        .await
96        .unwrap();
97}