digitalis_server/
server.rs1use async_trait::async_trait;
2use digitalis_core::{client::ClientMessage, Control};
3use futures_util::{SinkExt, StreamExt};
4use tokio::{
5 net::{TcpListener, TcpStream},
6 sync::mpsc::channel,
7};
8use tokio_tungstenite::{accept_hdr_async, tungstenite::Message};
9
10use crate::{
11 accept::Accept,
12 client::{Client, ClientId},
13 DigitalisResult,
14};
15
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct ServerOption {
18 pub host: String,
19 pub port: u16,
20}
21
22impl ServerOption {
23 pub async fn tcp_connect(&self) -> DigitalisResult<TcpListener> {
24 let addr = format!("{}:{}", self.host, self.port);
25 let listner = TcpListener::bind(&addr).await?;
26 Ok(listner)
27 }
28}
29
30impl Default for ServerOption {
31 fn default() -> Self {
32 Self {
33 host: "localhost".into(),
34 port: 8765,
35 }
36 }
37}
38
39#[async_trait]
40pub trait DigitalisServer {
41 async fn accept_connection(&self, client_id: ClientId, stream: TcpStream) {
42 log::info!(
43 "Try to accept connection: {:?}",
44 stream.peer_addr().expect("Fail to get peer addr")
45 );
46
47 let (mut sender, mut receiver) = accept_hdr_async(stream, Accept {})
48 .await
49 .expect("Fail to accept")
50 .split();
51
52 let (msg_queue_tx, mut msg_queue_rx) = channel::<Message>(100);
53
54 let client = Client::new(client_id, msg_queue_tx);
55
56 self.on_open(&client).unwrap();
57
58 loop {
59 tokio::select! {
60 Some(msg) = msg_queue_rx.recv() => {
61 if let Err(e) = sender.send(msg).await {
62 log::error!("Fail to send message to client {}: {e:?}", client_id);
63 }
64 }
65
66 Some(msg) = receiver.next() => {
67 let msg = msg.expect("Fail to get message");
68 log::debug!("Received message from client {}: {:?}", client.id(), msg);
69
70 let msg = match ClientMessage::from_ws_message(msg) {
71 Ok(msg) => msg,
72 Err(e) => {
73 log::error!("Fail to parse message from client {}: {e:?}", client.id());
74 continue;
75 }
76 };
77
78 match self.handle_message(&client, msg) {
79 Ok(Control::Exit) => return,
80 Ok(Control::Continue) => continue,
81 Err(e) => {
82 log::error!("Fail to handle message from client {}: {e:?}", client.id());
83 }
84 }
85 }
86 }
87 }
88 }
89
90 fn handle_message(&self, client: &Client, msg: ClientMessage) -> DigitalisResult<Control>;
91
92 fn on_open(&self, client: &Client) -> DigitalisResult<()>;
93}