use std::sync::Arc;
use crate::{
errors::MessageError,
net::parser::{parse_request, Operation},
store::mem::MemStore,
};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
sync::Mutex,
};
use tracing::{debug, error, info};
use tracing_subscriber::FmtSubscriber;
static INIT_TRACING: std::sync::Once = std::sync::Once::new();
fn init_logger() {
INIT_TRACING.call_once(|| {
FmtSubscriber::builder().init();
})
}
async fn send_response(client: &mut TcpStream, code: Operation, msg: &str) {
let response = format!("{}::{}\n", code, msg);
client
.write_all(response.as_bytes())
.await
.expect("unable to response to client");
}
async fn read_from_client(client: &mut TcpStream) -> String {
let mut buffer = vec![0; 4096];
let n_bytes = client
.read(&mut buffer)
.await
.expect("unable to read from client");
if n_bytes == 0 {
return String::from("");
}
let msg = String::from_utf8_lossy(&buffer[..n_bytes]);
msg.trim_end().to_string()
}
async fn handler(mut client: TcpStream, store: Arc<Mutex<MemStore>>) {
let client_address = client
.peer_addr()
.expect("unable to get client address")
.to_string();
let msg = read_from_client(&mut client).await;
info!("{} -> {}", client_address, msg);
let message = match parse_request(&msg) {
Ok(msg) => msg,
Err(error) => {
match error {
MessageError::InvalidMessage(msg) | MessageError::InvalidFormat(msg) => {
error!("failed to parse message - {}", msg);
send_response(&mut client, Operation::Error, &msg).await
}
}
return;
}
};
let mut vault = store.lock().await;
match message.op {
Operation::StringSet => {
let key = &message.args[0];
let value = &message.args[1..].join(" ");
let _ = vault.insert_string(key, value);
send_response(&mut client, message.op, "OK").await;
info!("{} <- {}", client_address, "OK");
}
Operation::StringGet => {
let key = &message.args[0];
if let Ok(value) = vault.get_string(key) {
send_response(&mut client, message.op, &value).await;
info!("{} <- {}", client_address, &value);
}
}
Operation::StringRemove => {
let key = &message.args[0];
if let Ok(value) = vault.remove_string(key) {
send_response(&mut client, message.op, &value).await;
info!("{} <- {}", client_address, &value);
}
}
Operation::StringClear => {
if vault.clear_strings().is_ok() {
send_response(&mut client, message.op, "OK").await;
info!("{} <- {}", client_address, "OK");
}
}
Operation::Incr => {
let key = &message.args[0];
if let Ok(value) = vault.incr(key) {
send_response(&mut client, message.op, &value.to_string()).await;
info!("{} <- {}", client_address, &value);
}
}
Operation::Decr => {
let key = &message.args[0];
if let Ok(value) = vault.decr(key) {
send_response(&mut client, message.op, &value.to_string()).await;
info!("{} <- {}", client_address, &value);
}
}
Operation::Dump => {
let filepath = &message.args[0];
if let Err(e) = vault.dump_store(filepath) {
let err_message = format!("unable to save store: {}", e);
send_response(&mut client, message.op, &err_message).await;
error!("{} <- {}", client_address, &err_message);
} else {
send_response(&mut client, message.op, "OK").await;
info!("{} <- {}", client_address, "OK");
}
}
_ => {
send_response(&mut client, Operation::Noop, "nothing to do").await;
info!("{} <- noop", client_address);
}
}
}
pub async fn start(addr: &str, port: usize) -> std::io::Result<()> {
init_logger();
let store = Arc::new(Mutex::new(MemStore::new()));
let addr = format!("{}:{}", addr, port);
let listener = TcpListener::bind(&addr).await?;
info!("Started Rubin server at {}", addr);
loop {
let (client, _) = listener.accept().await?;
let store = Arc::clone(&store);
let client_addr = client.peer_addr()?;
debug!("Accepted new client: {}", client_addr);
tokio::spawn(async move {
handler(client, store).await;
});
}
}