use super::Packet;
use futures_util::{future, pin_mut, SinkExt, StreamExt};
use log::{error, info};
use serde_json::Result;
use std::str::FromStr;
use tokio::sync::mpsc;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
pub enum Command {
SendPacket(Box<Packet>),
Close,
}
pub struct Client {
ip: String,
port: u16,
cmd_receiver: mpsc::Receiver<Command>,
cmd_sender: mpsc::Sender<Command>,
packet_out: mpsc::Sender<Packet>,
}
pub fn new(addr: &str, packet_out: mpsc::Sender<Packet>) -> std::io::Result<Client> {
let parts: Vec<&str> = addr.split(':').collect();
if parts.len() != 2 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"addr is not in format ip:port",
));
}
let (cmd_sender, cmd_receiver) = mpsc::channel(100);
Ok(Client {
ip: parts[0].to_string(),
port: u16::from_str(parts[1]).expect("could not parse port"),
cmd_receiver,
cmd_sender,
packet_out,
})
}
impl Client {
pub async fn serve(&mut self) -> anyhow::Result<()> {
let (ws_stream, _) =
connect_async(format!("ws://{}:{}/ws/erouting", self.ip, self.port)).await?;
info!("WebSocket handshake has been successfully completed");
let (mut write, read) = ws_stream.split();
let mut cmd_receiver = std::mem::replace(&mut self.cmd_receiver, mpsc::channel(1).1);
let to_ws = tokio::spawn(async move {
while let Some(command) = cmd_receiver.recv().await {
match command {
Command::SendPacket(packet) => {
let data = serde_json::to_string(&packet);
if write.send(Message::Text(data.unwrap())).await.is_err() {
error!("Error while sending packet");
}
}
Command::Close => {
break;
}
}
}
});
let from_ws = {
read.for_each(|message| async {
if message.is_err() {
return;
}
let data = message.unwrap().into_text();
let packet: Result<Packet> = serde_json::from_str(data.unwrap().as_str());
if let Ok(packet) = packet {
if let Err(err) = self.packet_out.send(packet).await {
error!("Error while sending packet to channel: {}", err);
}
}
})
};
pin_mut!(from_ws);
future::select(to_ws, from_ws).await;
Ok(())
}
pub fn command_channel(&self) -> mpsc::Sender<Command> {
self.cmd_sender.clone()
}
}