hakuban 0.7.2

Data-object sharing library
Documentation

use hakuban::message::Message;
use tokio::{net::TcpListener, sync::{mpsc, oneshot}};
use tokio_util::compat::TokioAsyncReadCompatExt; //TODO: remove
extern crate hakuban;

use async_tungstenite::tungstenite;
use futures::{SinkExt, StreamExt};


pub struct WebsocketMessageProxy {
	pub sent_to_target_rx: mpsc::Receiver<Message>,
	pub sent_from_target_rx: mpsc::Receiver<Message>,
	pub stop_tx: oneshot::Sender<()>,
}

impl WebsocketMessageProxy {
	pub async fn spawn(listen_at: String, target: String) -> WebsocketMessageProxy {
		let (listening_tx, listening_rx) = oneshot::channel();
		let (stop_tx, mut stop_rx) = oneshot::channel();
		let (sent_to_target_tx, sent_to_target_rx) = mpsc::channel::<Message>(1000);
		let (sent_from_target_tx, sent_from_target_rx) = mpsc::channel::<Message>(1000);
		tokio::spawn(async move {
			let listen_socket = TcpListener::bind(listen_at).await.expect("Couldn't bind listening socket");
			listening_tx.send(()).unwrap();
			let mut source_socket = None;
			tokio::select! {
				new_connection = listen_socket.accept() => match new_connection {
					Ok((stream, _address)) => {
						source_socket = Some(stream);
						drop(listen_socket);
					},
					Err(_) => panic!(), //TODO what errors can happen here?
				},
				_ = &mut stop_rx => { }
			};
			if let Some(source_socket) = source_socket {
				let mut source_stream = async_tungstenite::accept_async(source_socket.compat())
					.await
					.expect("Error during the websocket handshake occurred");
				let mut target_stream = match async_tungstenite::tokio::connect_async(target.clone()).await {
					Ok((stream, _response)) => stream,
					Err(error) => {
						panic!("Failed to connect: {:?}", error);
					}
				};
				loop {
					tokio::select! {
						message_option = source_stream.next() => match message_option {
							Some(Ok(tungstenite::Message::Binary(message_msgpack))) => {
								let message = rmp_serde::from_read_ref::<Vec<u8>,Message>(&message_msgpack).unwrap();
								target_stream.send(tungstenite::Message::Binary(rmp_serde::to_vec(&message).unwrap())).await.unwrap();
								sent_to_target_tx.send(message).await.unwrap();
							},
							_ => panic!()
						},
						message_option = target_stream.next() => match message_option {
							Some(Ok(tungstenite::Message::Binary(message_msgpack))) => {
								let message = rmp_serde::from_read_ref::<Vec<u8>,Message>(&message_msgpack).unwrap();
								source_stream.send(tungstenite::Message::Binary(rmp_serde::to_vec(&message).unwrap())).await.unwrap();
								sent_from_target_tx.send(message).await.unwrap();
							},
							_ => panic!()
						},
						_ = &mut stop_rx => { break; }
					}
				}
			};
		});
		listening_rx.await.unwrap();
		WebsocketMessageProxy {
			sent_to_target_rx,
			sent_from_target_rx,
			stop_tx,
		}
	}
}