1use crate::protocol::*;
2use std::net::SocketAddr;
3use tokio::{
4 io::{AsyncReadExt, AsyncWriteExt},
5 net::{TcpListener, TcpStream},
6};
7
8#[derive(Clone)]
9pub struct Server<A, B>
10where
11 A: Fn(AuthToken, SocketAddr) -> bool + Sync + Send + Clone + 'static,
12 B: Fn(LogMessage, SocketAddr) -> Status + Sync + Send + Clone + 'static,
13{
14 on_login: A,
15 on_log: B,
16}
17
18impl<A, B> Server<A, B>
19where
20 A: Fn(AuthToken, SocketAddr) -> bool + Sync + Send + Clone + 'static,
21 B: Fn(LogMessage, SocketAddr) -> Status + Sync + Send + Clone + 'static,
22{
23 pub fn new(on_login: A, on_log: B) -> Self {
24 Server { on_login, on_log }
25 }
26
27 pub async fn listen(&self, host: &str, port: u16) {
28 let listener = TcpListener::bind((host, port)).await.unwrap();
29 println!("server listening on {host}:{port}");
30 loop {
31 let server = self.clone();
32 match listener.accept().await {
33 Err(err) => eprintln!("error on accept tcpstream. err = {}", err),
34 Ok((mut stream, addr)) => {
35 tokio::spawn(async move {
36 let mut authenticated = false;
37 loop {
38 let server = server.clone();
39 let package = next_package(&mut stream).await;
40 let package = match package {
41 Some(package) => package,
42 None => break,
43 };
44 match package.command {
45 Command::NoOp => {
46 send_status(&mut stream, Status::NoCmd).await;
47 }
48 Command::Login(token) => {
49 if !authenticated {
50 authenticated = (server.on_login)(token, addr);
51 if authenticated {
52 send_status(&mut stream, Status::Ok).await;
53 } else {
54 send_status(&mut stream, Status::AuthErr).await;
55 }
56 }
57 }
58 Command::Log(message) => {
59 if authenticated {
60 let status = (server.on_log)(message, addr);
61 send_status(&mut stream, status).await;
62 } else {
63 send_status(&mut stream, Status::AuthErr).await;
64 }
65 }
66 }
67 }
68 stream.shutdown().await.ok();
69 });
70 }
71 }
72 }
73 }
74}
75
76async fn next_package(stream: &mut TcpStream) -> Option<RequestPackage> {
77 loop {
78 let size: usize;
79 let mut header_bytes = vec![0; 8];
80 match stream.read_exact(&mut header_bytes).await {
81 Ok(n) => {
82 if n == 0 {
83 return None;
84 }
85 match HeaderPackage::try_parse(&header_bytes) {
86 Some(header) => size = header.bytes,
87 None => continue,
88 };
89 }
90 Err(_) => return None,
91 };
92
93 if size > 0 {
94 let mut bytes = vec![0; size];
95 match stream.read_exact(&mut bytes).await {
96 Ok(_) => return Some(bytes.into()),
97 Err(_) => return None
98 }
99 }
100 }
101}
102
103async fn send_status(stream: &mut TcpStream, status: Status) {
104 let package = ResponsePackage { status };
105 let bytes: Vec<u8> = package.into();
106
107 let header = HeaderPackage { bytes: bytes.len() };
108 let header_bytes: Vec<u8> = header.into();
109
110 if let Err(err) = stream.write_all(&[header_bytes, bytes].concat()[..]).await {
111 eprintln!("failed to write to tcpstream. err = {err}");
112 }
113}
114
115