rokkett_logger/
server.rs

1use crate::protocol::*;
2use std::net::SocketAddr;
3use tokio::{
4	io::{AsyncReadExt, AsyncWriteExt},
5	net::{TcpListener, TcpStream},
6};
7
8#[derive(Clone)]
9pub struct Server<A, B>
10where
11	A: Fn(AuthToken, SocketAddr) -> bool + Sync + Send + Clone + 'static,
12	B: Fn(LogMessage, SocketAddr) -> Status + Sync + Send + Clone + 'static,
13{
14	on_login: A,
15	on_log: B,
16}
17
18impl<A, B> Server<A, B>
19where
20	A: Fn(AuthToken, SocketAddr) -> bool + Sync + Send + Clone + 'static,
21	B: Fn(LogMessage, SocketAddr) -> Status + Sync + Send + Clone + 'static,
22{
23	pub fn new(on_login: A, on_log: B) -> Self {
24		Server { on_login, on_log }
25	}
26
27	pub async fn listen(&self, host: &str, port: u16) {
28		let listener = TcpListener::bind((host, port)).await.unwrap();
29		println!("server listening on {host}:{port}");
30		loop {
31			let server = self.clone();
32			match listener.accept().await {
33				Err(err) => eprintln!("error on accept tcpstream. err = {}", err),
34				Ok((mut stream, addr)) => {
35					tokio::spawn(async move {
36						let mut authenticated = false;
37						loop {
38							let server = server.clone();
39							let package = next_package(&mut stream).await;
40							let package = match package {
41								Some(package) => package,
42								None => break,
43							};
44							match package.command {
45								Command::NoOp => {
46									send_status(&mut stream, Status::NoCmd).await;
47								}
48								Command::Login(token) => {
49									if !authenticated {
50										authenticated = (server.on_login)(token, addr);
51										if authenticated {
52											send_status(&mut stream, Status::Ok).await;
53										} else {
54											send_status(&mut stream, Status::AuthErr).await;
55										}
56									}
57								}
58								Command::Log(message) => {
59									if authenticated {
60										let status = (server.on_log)(message, addr);
61										send_status(&mut stream, status).await;
62									} else {
63										send_status(&mut stream, Status::AuthErr).await;
64									}
65								}
66							}
67						}
68						stream.shutdown().await.ok();
69					});
70				}
71			}
72		}
73	}
74}
75
76async fn next_package(stream: &mut TcpStream) -> Option<RequestPackage> {
77	loop {
78		let size: usize;
79		let mut header_bytes = vec![0; 8];
80		match stream.read_exact(&mut header_bytes).await {
81			Ok(n) => {
82				if n == 0 {
83					return None;
84				}
85				match HeaderPackage::try_parse(&header_bytes) {
86					Some(header) => size = header.bytes,
87					None => continue,
88				};
89			}
90			Err(_) => return None,
91		};
92
93		if size > 0 {
94			let mut bytes = vec![0; size];
95			match stream.read_exact(&mut bytes).await {
96				Ok(_) => return Some(bytes.into()),
97				Err(_) => return None
98			}
99		}
100	}
101}
102
103async fn send_status(stream: &mut TcpStream, status: Status) {
104	let package = ResponsePackage { status };
105	let bytes: Vec<u8> = package.into();
106
107	let header = HeaderPackage { bytes: bytes.len() };
108	let header_bytes: Vec<u8> = header.into();
109
110	if let Err(err) = stream.write_all(&[header_bytes, bytes].concat()[..]).await {
111		eprintln!("failed to write to tcpstream. err = {err}");
112	}
113}
114
115// #[cfg(test)]
116// mod tests {
117// 	use super::*;
118
119// 	#[test]
120// 	fn test() {
121// 		assert!(true);
122// 	}
123// }