use hakuban::message::Message;
use tokio::{net::TcpListener, sync::{mpsc, oneshot}};
use tokio_util::compat::TokioAsyncReadCompatExt; extern crate hakuban;
use async_tungstenite::tungstenite;
use futures::{SinkExt, StreamExt};
pub struct WebsocketMessageProxy {
pub sent_to_target_rx: mpsc::Receiver<Message>,
pub sent_from_target_rx: mpsc::Receiver<Message>,
pub stop_tx: oneshot::Sender<()>,
}
impl WebsocketMessageProxy {
pub async fn spawn(listen_at: String, target: String) -> WebsocketMessageProxy {
let (listening_tx, listening_rx) = oneshot::channel();
let (stop_tx, mut stop_rx) = oneshot::channel();
let (sent_to_target_tx, sent_to_target_rx) = mpsc::channel::<Message>(1000);
let (sent_from_target_tx, sent_from_target_rx) = mpsc::channel::<Message>(1000);
tokio::spawn(async move {
let listen_socket = TcpListener::bind(listen_at).await.expect("Couldn't bind listening socket");
listening_tx.send(()).unwrap();
let mut source_socket = None;
tokio::select! {
new_connection = listen_socket.accept() => match new_connection {
Ok((stream, _address)) => {
source_socket = Some(stream);
drop(listen_socket);
},
Err(_) => panic!(), },
_ = &mut stop_rx => { }
};
if let Some(source_socket) = source_socket {
let mut source_stream = async_tungstenite::accept_async(source_socket.compat())
.await
.expect("Error during the websocket handshake occurred");
let mut target_stream = match async_tungstenite::tokio::connect_async(target.clone()).await {
Ok((stream, _response)) => stream,
Err(error) => {
panic!("Failed to connect: {:?}", error);
}
};
loop {
tokio::select! {
message_option = source_stream.next() => match message_option {
Some(Ok(tungstenite::Message::Binary(message_msgpack))) => {
let message = rmp_serde::from_read_ref::<Vec<u8>,Message>(&message_msgpack).unwrap();
target_stream.send(tungstenite::Message::Binary(rmp_serde::to_vec(&message).unwrap())).await.unwrap();
sent_to_target_tx.send(message).await.unwrap();
},
_ => panic!()
},
message_option = target_stream.next() => match message_option {
Some(Ok(tungstenite::Message::Binary(message_msgpack))) => {
let message = rmp_serde::from_read_ref::<Vec<u8>,Message>(&message_msgpack).unwrap();
source_stream.send(tungstenite::Message::Binary(rmp_serde::to_vec(&message).unwrap())).await.unwrap();
sent_from_target_tx.send(message).await.unwrap();
},
_ => panic!()
},
_ = &mut stop_rx => { break; }
}
}
};
});
listening_rx.await.unwrap();
WebsocketMessageProxy {
sent_to_target_rx,
sent_from_target_rx,
stop_tx,
}
}
}