use async_stream::try_stream;
use bincode;
use futures_core::stream::Stream;
use serde::{Deserialize, Serialize};
use std::error::Error;
use std::sync::{Arc, Mutex};
use strum_macros::{Display, EnumIter, EnumString};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use crate::config::Config;
use crate::library::*;
use crate::music::song::Song;
use color_eyre::Result;
use log::{debug, error, info, trace, warn};
pub struct Server {
config: Config,
}
impl Server {
pub async fn start(config: &Config) -> Result<()> {
let address = config.server_address.clone() + ":" + &config.server_port.to_string();
trace!("Starting TCP server on {:?}", &address);
let listener = TcpListener::bind(&address).await?;
trace!("Server bound to tcp port");
let stop_flag = Arc::new(Mutex::new(false));
loop {
let local_stop_flag = stop_flag.clone();
let (mut socket, _) = listener.accept().await?;
let client_address = socket.peer_addr()?;
let client_address = format!("{}", client_address);
debug!("New client: {}", client_address);
let config = config.clone();
let handle = tokio::spawn(async move {
let mut buf = [0u8; 8];
loop {
match socket.read(&mut buf).await {
Ok(n) if n == 0 => break,
Ok(n) => n,
Err(e) => {
error!("failed to read from socket; err = {:?}", e);
break;
}
};
let size = u64::from_ne_bytes(buf);
let mut payload = vec![0; size as usize];
let res = socket.read_exact(&mut payload[..]).await;
trace!("res from socket = {:?}", res);
let decoded_command = bincode::deserialize::<Command>(&payload);
match decoded_command {
Ok(command) => {
info!("{command} command received");
let res =
Server::reply(Reply::Received(command.to_string()), &mut socket)
.await;
trace!("Replied 'received': status: {:?}", res);
match command {
Command::Play(i) => (),
Command::Pause => (),
Command::Toggle => (),
Command::Next => (),
Command::Previous => (),
Command::Scan => scan(&config).await,
Command::List(i) => {
let list = list(&config, i).await;
match Server::reply(Reply::List(list), &mut socket).await {
Ok(_) => trace!("Replied 'list' successfully"),
Err(e) => {
warn!("Failed to send 'list' reply to client: {:?}", e)
}
}
}
Command::Ping => (),
Command::Restart => (),
Command::Stop => {
let mut flag = local_stop_flag.lock().unwrap();
*flag = true;
break;
}
};
match Server::reply(Reply::Done, &mut socket).await {
Ok(_) => trace!("Replied 'done' successfully"),
Err(e) => warn!("Failed to send 'done' to client: {:?}", e),
}
}
Err(e) => warn!("failed to decode message payload; err = {:?}", e),
};
}
Server::reply(Reply::Done, &mut socket).await; trace!("Terminating tokio thread");
});
trace!("Waiting on tokio thread join for shutdown...");
let res = tokio::join!(handle);
if *stop_flag.lock().unwrap() {
break Ok(());
}
}
}
pub async fn send_wait(
message: &Command,
address: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let encoded: Vec<u8> = message.prepare_query()?;
let mut stream = TcpStream::connect(address).await?;
stream.write_all(&encoded).await?;
let mut buf = [0u8; 8];
loop {
match stream.read(&mut buf).await {
Ok(n) if n == 0 => break,
Ok(n) => n,
Err(e) => {
error!("failed to read from socket; err = {:?}", e);
break;
}
};
let size = u64::from_ne_bytes(buf);
let mut payload = vec![0; size as usize];
let res = stream.read_exact(&mut payload[..]).await;
trace!("res from socket = {:?}", res);
let decoded_reply = bincode::deserialize::<Reply>(&payload);
if let Ok(Reply::Done) = decoded_reply {
break;
}
}
Ok(())
}
pub async fn send<'a>(
message: &'a Command,
address: &'a str,
) -> impl Stream<Item = Result<Reply, Box<dyn Error + Send + Sync>>> + 'a {
try_stream! {
let encoded: Vec<u8> = message.prepare_query()?;
let mut stream = TcpStream::connect(address).await?;
stream.write_all(&encoded).await?;
let mut buf = [0u8; 8];
loop {
match stream.read(&mut buf).await {
Ok(n) if n == 0 => break,
Ok(n) => n,
Err(e) => {
error!("failed to read from socket; err = {:?}", e);
break;
}
};
let size = u64::from_ne_bytes(buf);
let mut payload = vec![0; size as usize];
let res = stream.read_exact(&mut payload[..]).await;
trace!("res from socket = {:?}", res);
let decoded_reply = bincode::deserialize::<Reply>(&payload)?;
match decoded_reply {
Reply::Done => { yield Reply::Done; break },
r => yield r,
}
}
}
}
async fn reply(
reply: Reply,
stream: &mut TcpStream,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let encoded_reply: Vec<u8> = reply.prepare_query()?;
stream.write_all(&encoded_reply).await?;
Ok(())
}
}
#[non_exhaustive]
#[derive(Display, Debug, Serialize, Deserialize, EnumString, EnumIter)]
pub enum Command {
Play(Option<String>),
Pause,
Toggle,
Next,
Previous,
Scan,
List(Option<String>),
Ping,
Restart,
Stop,
}
#[non_exhaustive]
#[derive(Display, Debug, Serialize, Deserialize, EnumString, EnumIter)]
pub enum Reply {
Received(String),
List(Vec<Song>),
Done,
}
impl Command {
fn prepare_query(&self) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
match bincode::serialized_size(self) {
Ok(size) => {
let mut message: Vec<u8> = (size as u64).to_ne_bytes().to_vec();
message.extend(bincode::serialize(self).unwrap());
Ok(message)
}
Err(e) => Err(Box::new(e)),
}
}
}
impl Reply {
fn prepare_query(&self) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
match bincode::serialized_size(self) {
Ok(size) => {
let mut message: Vec<u8> = (size as u64).to_ne_bytes().to_vec();
message.extend(bincode::serialize(self).unwrap());
Ok(message)
}
Err(e) => Err(Box::new(e)),
}
}
}