use anyhow::{Result, anyhow};
use log::{error, info, warn};
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use tokio::{
io::{AsyncBufReadExt, BufReader},
net::{TcpListener, TcpStream},
sync::mpsc,
};
use typeshare::typeshare;
use crate::search::Searchable;
pub type LogServerSender = mpsc::Sender<SocketMessage>;
#[typeshare]
#[derive(Eq, PartialEq, Clone, Debug, Serialize_repr, Deserialize_repr)]
#[serde(rename_all = "camelCase")]
#[repr(u8)]
#[allow(missing_docs)]
pub enum SocketMessageType {
Message = 0,
Error = 1,
Warning = 2,
Info = 3,
Success = 4,
Quit = 5,
Fatal = 6,
Debug = 7,
}
impl SocketMessageType {
pub fn parse(str: &str) -> Result<Self> {
match str {
"Message" => Ok(Self::Message),
"Error" => Ok(Self::Error),
"Warning" => Ok(Self::Warning),
"Info" => Ok(Self::Info),
"Success" => Ok(Self::Success),
"Quit" => Ok(Self::Quit),
"Fatal" => Ok(Self::Fatal),
"Debug" => Ok(Self::Debug),
_ => Err(anyhow!("Invalid Variant!")),
}
}
}
#[typeshare]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct SocketMessage {
pub sender_name: Option<String>,
pub sender_type: Option<String>,
pub message: String,
#[serde(alias = "type")]
pub message_type: SocketMessageType,
}
impl SocketMessage {
pub fn make_internal(message: &str, message_type: SocketMessageType) -> Self {
Self {
message: message.to_string(),
message_type,
sender_name: Some("Manager".to_string()),
sender_type: Some("LogServer".to_string()),
}
}
}
impl Searchable for SocketMessage {
fn get_values(&self) -> Vec<String> {
vec![
self.message.clone(),
self.sender_name.clone().unwrap_or("".to_string()),
self.sender_type.clone().unwrap_or("".to_string()),
]
}
}
pub struct LogServer {
pub port: u16,
listener: TcpListener,
}
impl LogServer {
const CLIENT_CONNECTED: &'static str = "====== Client Connected To Console ======";
const CLIENT_DISCONNECTED: &'static str = "====== Client Disconnected From Console ======";
pub async fn new(port: u16) -> Result<Self> {
let address = format!("127.0.0.1:{port}");
let listener = TcpListener::bind(&address).await?;
let port = listener.local_addr()?.port();
Ok(Self { port, listener })
}
async fn yield_log(tx: &LogServerSender, message: SocketMessage) {
if tx.capacity() == 0 {
warn!("Logs incoming too fast! Logs may be dropped!");
} else {
let res = tx.send(message).await;
if let Err(why) = res {
error!("Couldn't Yield Log: {why:?}")
}
}
}
async fn client_loop(mut stream: TcpStream, tx: &LogServerSender) -> bool {
let mut reader = BufReader::with_capacity(16000, &mut stream);
let mut body = String::new();
let mut flag = false;
while let Ok(bytes_read) = reader.read_line(&mut body).await {
if bytes_read == 0 {
break;
}
if body.trim() == "" {
continue;
}
let message: Result<SocketMessage, _> = serde_json::from_str(body.trim());
match message {
Ok(message) => {
match message.message_type {
SocketMessageType::Quit => {
flag = true;
break;
}
_ => {
Self::yield_log(tx, message).await;
}
};
}
Err(why) => {
Self::yield_log(
tx,
SocketMessage::make_internal(
&format!("Invalid Log From Game Received: {why:?}"),
SocketMessageType::Error,
),
)
.await;
}
}
body.clear();
}
flag
}
async fn server_loop(
&self,
tx: &LogServerSender,
shutdown_sender: mpsc::Sender<()>,
disconnect_on_quit: bool,
) {
loop {
let stream = self.listener.accept().await;
match stream {
Ok((stream, _)) => {
let tx2 = tx.clone();
let shutdown_sender2 = shutdown_sender.clone();
tokio::spawn(async move {
Self::yield_log(
&tx2,
SocketMessage::make_internal(
Self::CLIENT_CONNECTED,
SocketMessageType::Info,
),
)
.await;
let quit_received = Self::client_loop(stream, &tx2).await;
if quit_received && disconnect_on_quit {
shutdown_sender2.send(()).await.ok();
}
Self::yield_log(
&tx2,
SocketMessage::make_internal(
Self::CLIENT_DISCONNECTED,
SocketMessageType::Info,
),
)
.await;
});
}
Err(why) => {
Self::yield_log(
tx,
SocketMessage::make_internal(
&format!("Client Connection Failure! {why:?}"),
SocketMessageType::Error,
),
)
.await;
}
}
}
}
pub async fn listen(&self, tx: LogServerSender, disconnect_on_quit: bool) -> Result<()> {
Self::yield_log(
&tx,
SocketMessage::make_internal(
&format!("Ready to receive game logs on port {}!", self.port),
SocketMessageType::Info,
),
)
.await;
let (shutdown_sender, mut shutdown_receiver) = mpsc::channel::<()>(2);
tokio::select! {
_ = async {
let shutdown_sender = &shutdown_sender.clone();
self.server_loop(&tx, shutdown_sender.clone(), disconnect_on_quit).await;
} => {},
_ = shutdown_receiver.recv() => info!("Quit Message Received")
};
Ok(())
}
}
#[cfg(test)]
mod tests {
use tokio::{
io::{AsyncWriteExt, BufWriter},
net::TcpStream,
sync::mpsc,
};
use super::*;
struct TestLogServer {
pub server: LogServer,
pub port: u16,
pub logs: Vec<SocketMessage>,
}
impl TestLogServer {
pub async fn new() -> Self {
let server = LogServer::new(0).await.unwrap();
let port = server.port;
Self {
server,
port,
logs: vec![],
}
}
pub async fn listen(&mut self, disconnect_on_quit: bool) {
let (tx, mut rx) = mpsc::channel(32);
tokio::join!(self.server.listen(tx, disconnect_on_quit), async {
while let Some(msg) = rx.recv().await {
self.logs.push(msg);
}
})
.0
.unwrap();
}
pub fn assert_logs(&self, expected: Vec<SocketMessage>) {
assert_eq!(self.logs.len(), expected.len());
for (i, log) in self.logs.iter().enumerate() {
if log != &expected[i] {
panic!(
"Log {} doesn't match expected!\nExpected: {:?}\nActual: {:?}",
i, expected[i], log
);
}
}
}
}
struct MockGame {
pub stream: TcpStream,
}
impl MockGame {
pub async fn new(port: u16) -> Self {
let stream = TcpStream::connect(format!("127.0.0.1:{port}"))
.await
.unwrap();
Self { stream }
}
pub async fn send(&mut self, msg: SocketMessage) {
let mut writer = BufWriter::new(&mut self.stream);
let str = format!("{}\n", serde_json::to_string(&msg).unwrap());
writer.write_all(str.as_bytes()).await.unwrap();
writer.flush().await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
pub fn make_test_msg(message: &str, message_type: SocketMessageType) -> SocketMessage {
let mut msg = SocketMessage::make_internal(message, message_type);
msg.sender_type = Some("TestClient".to_string());
msg
}
pub async fn send_test_msg(&mut self, message: &str, message_type: SocketMessageType) {
let msg = Self::make_test_msg(message, message_type);
self.send(msg).await;
}
}
#[test]
fn test_log_server() {
tokio_test::block_on(async {
let mut server = TestLogServer::new().await;
let port = server.port;
tokio::join!(server.listen(true), async move {
let mut game = MockGame::new(port).await;
game.send_test_msg("Test Message", SocketMessageType::Info)
.await;
game.send_test_msg("Success!", SocketMessageType::Success)
.await;
game.send_test_msg("", SocketMessageType::Quit).await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
});
let expected = vec![
SocketMessage::make_internal(
&format!("Ready to receive game logs on port {}!", server.port),
SocketMessageType::Info,
),
SocketMessage::make_internal(LogServer::CLIENT_CONNECTED, SocketMessageType::Info),
MockGame::make_test_msg("Test Message", SocketMessageType::Info),
MockGame::make_test_msg("Success!", SocketMessageType::Success),
SocketMessage::make_internal(
LogServer::CLIENT_DISCONNECTED,
SocketMessageType::Info,
),
];
server.assert_logs(expected);
});
}
#[test]
fn test_log_server_no_disconnect_on_quit() {
tokio_test::block_on(async {
let mut server = TestLogServer::new().await;
let port = server.port;
tokio::select!(_ = server.listen(false) => panic!("Other should've completed first!"), _ = async move {
let mut game = MockGame::new(port).await;
game.send_test_msg("Test Message", SocketMessageType::Info)
.await;
game.send_test_msg("Success!", SocketMessageType::Success)
.await;
game.send_test_msg("", SocketMessageType::Quit).await;
let mut game2 = MockGame::new(port).await;
game2
.send_test_msg("Test Message", SocketMessageType::Info)
.await;
game2
.send_test_msg("Warning!", SocketMessageType::Warning)
.await;
game2.send_test_msg("", SocketMessageType::Quit).await;
} => {});
let expected = vec![
SocketMessage::make_internal(
&format!("Ready to receive game logs on port {}!", server.port),
SocketMessageType::Info,
),
SocketMessage::make_internal(LogServer::CLIENT_CONNECTED, SocketMessageType::Info),
MockGame::make_test_msg("Test Message", SocketMessageType::Info),
MockGame::make_test_msg("Success!", SocketMessageType::Success),
SocketMessage::make_internal(
LogServer::CLIENT_DISCONNECTED,
SocketMessageType::Info,
),
SocketMessage::make_internal(LogServer::CLIENT_CONNECTED, SocketMessageType::Info),
MockGame::make_test_msg("Test Message", SocketMessageType::Info),
MockGame::make_test_msg("Warning!", SocketMessageType::Warning),
SocketMessage::make_internal(
LogServer::CLIENT_DISCONNECTED,
SocketMessageType::Info,
),
];
server.assert_logs(expected);
});
}
#[test]
fn test_log_server_multi_client() {
tokio_test::block_on(async {
let mut server = TestLogServer::new().await;
let port = server.port;
tokio::join!(server.listen(true), async move {
let mut game = MockGame::new(port).await;
game.send_test_msg("Test Message", SocketMessageType::Info)
.await;
game.send_test_msg("Success!", SocketMessageType::Success)
.await;
let mut game2 = MockGame::new(port).await;
game2
.send_test_msg("Test Message", SocketMessageType::Info)
.await;
game2
.send_test_msg("Warning!", SocketMessageType::Warning)
.await;
game.send_test_msg("Other Info", SocketMessageType::Info)
.await;
game2
.send_test_msg("Other Warning", SocketMessageType::Warning)
.await;
game2.send_test_msg("", SocketMessageType::Quit).await;
game.send_test_msg("", SocketMessageType::Quit).await;
});
let expected = vec![
SocketMessage::make_internal(
&format!("Ready to receive game logs on port {}!", server.port),
SocketMessageType::Info,
),
SocketMessage::make_internal(LogServer::CLIENT_CONNECTED, SocketMessageType::Info),
MockGame::make_test_msg("Test Message", SocketMessageType::Info),
MockGame::make_test_msg("Success!", SocketMessageType::Success),
SocketMessage::make_internal(LogServer::CLIENT_CONNECTED, SocketMessageType::Info),
MockGame::make_test_msg("Test Message", SocketMessageType::Info),
MockGame::make_test_msg("Warning!", SocketMessageType::Warning),
MockGame::make_test_msg("Other Info", SocketMessageType::Info),
MockGame::make_test_msg("Other Warning", SocketMessageType::Warning),
SocketMessage::make_internal(
LogServer::CLIENT_DISCONNECTED,
SocketMessageType::Info,
),
SocketMessage::make_internal(
LogServer::CLIENT_DISCONNECTED,
SocketMessageType::Info,
),
];
server.assert_logs(expected);
});
}
}