Skip to main content

digitalis_server/
server.rs

1use async_trait::async_trait;
2use digitalis_core::{client::ClientMessage, Control};
3use futures_util::{SinkExt, StreamExt};
4use tokio::{
5    net::{TcpListener, TcpStream},
6    sync::mpsc::channel,
7};
8use tokio_tungstenite::{accept_hdr_async, tungstenite::Message};
9
10use crate::{
11    accept::Accept,
12    client::{Client, ClientId},
13    DigitalisResult,
14};
15
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct ServerOption {
18    pub host: String,
19    pub port: u16,
20}
21
22impl ServerOption {
23    pub async fn tcp_connect(&self) -> DigitalisResult<TcpListener> {
24        let addr = format!("{}:{}", self.host, self.port);
25        let listner = TcpListener::bind(&addr).await?;
26        Ok(listner)
27    }
28}
29
30impl Default for ServerOption {
31    fn default() -> Self {
32        Self {
33            host: "localhost".into(),
34            port: 8765,
35        }
36    }
37}
38
39#[async_trait]
40pub trait DigitalisServer {
41    async fn accept_connection(&self, client_id: ClientId, stream: TcpStream) {
42        log::info!(
43            "Try to accept connection: {:?}",
44            stream.peer_addr().expect("Fail to get peer addr")
45        );
46
47        let (mut sender, mut receiver) = accept_hdr_async(stream, Accept {})
48            .await
49            .expect("Fail to accept")
50            .split();
51
52        let (msg_queue_tx, mut msg_queue_rx) = channel::<Message>(100);
53
54        let client = Client::new(client_id, msg_queue_tx);
55
56        self.on_open(&client).unwrap();
57
58        loop {
59            tokio::select! {
60                Some(msg) = msg_queue_rx.recv() => {
61                    if let Err(e) = sender.send(msg).await {
62                        log::error!("Fail to send message to client {}: {e:?}", client_id);
63                    }
64                }
65
66                Some(msg) = receiver.next() => {
67                    let msg = msg.expect("Fail to get message");
68                    log::debug!("Received message from client {}: {:?}", client.id(), msg);
69
70                    let msg = match ClientMessage::from_ws_message(msg) {
71                        Ok(msg) => msg,
72                        Err(e) => {
73                            log::error!("Fail to parse message from client {}: {e:?}", client.id());
74                            continue;
75                        }
76                    };
77
78                    match self.handle_message(&client, msg) {
79                        Ok(Control::Exit) => return,
80                        Ok(Control::Continue) => continue,
81                        Err(e) => {
82                            log::error!("Fail to handle message from client {}: {e:?}", client.id());
83                        }
84                    }
85                }
86            }
87        }
88    }
89
90    fn handle_message(&self, client: &Client, msg: ClientMessage) -> DigitalisResult<Control>;
91
92    fn on_open(&self, client: &Client) -> DigitalisResult<()>;
93}