use url::Url;
use futures_util::{future, pin_mut, StreamExt};
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::protocol::Message;
use crate::commons::bytes_to_string;
#[derive(Clone)]
pub struct WebSocketClient {
pub connection: String,
pub sender_tx: Option<futures_channel::mpsc::UnboundedSender<Message>>,
pub receiver_tx: Option<futures_channel::mpsc::UnboundedSender<String>>,
}
impl WebSocketClient {
pub fn addr(&self) -> &str {
&self.connection
}
pub fn receiver_tx(&self) -> futures_channel::mpsc::UnboundedSender<String> {
self.receiver_tx.clone().unwrap()
}
pub fn sender_tx(&self) -> futures_channel::mpsc::UnboundedSender<Message> {
self.sender_tx.clone().unwrap()
}
pub fn send_message(&self, data: &str) {
log::info!("send_message: {}", data);
self.sender_tx()
.unbounded_send(Message::Text(data.to_string()))
.unwrap();
}
pub async fn on_message(mut rx: futures_channel::mpsc::UnboundedReceiver<String>) {
loop {
match rx.try_next() {
Ok(m) => {
if let Some(msg) = m {
log::info!("{}", msg);
}
}
Err(_) => {
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
}
}
}
}
pub fn connect(connection: &str) -> Self {
let (sender_tx, sender_rx) = futures_channel::mpsc::unbounded::<Message>();
let (receiver_tx, receiver_rx) = futures_channel::mpsc::unbounded::<String>();
let s = Self {
connection: connection.to_string(),
sender_tx: Some(sender_tx.clone()),
receiver_tx: Some(receiver_tx.clone()),
};
tokio::spawn(Self::on_message(receiver_rx));
let url = Url::parse(connection).expect("Failed to parse URL");
tokio::spawn(Self::run(url, sender_rx, s.receiver_tx()));
s
}
pub async fn listener(connection: &str) {
let (_sender_tx, sender_rx) = futures_channel::mpsc::unbounded::<Message>();
let (receiver_tx, receiver_rx) = futures_channel::mpsc::unbounded::<String>();
tokio::spawn(Self::on_message(receiver_rx));
let url = Url::parse(connection).expect("Failed to parse URL");
Self::run(url, sender_rx, receiver_tx.clone()).await;
}
pub async fn run(
connection: Url,
sender_rx: futures_channel::mpsc::UnboundedReceiver<Message>,
receiver_tx: futures_channel::mpsc::UnboundedSender<String>,
) {
match connect_async(connection.clone()).await {
Ok((ws_stream, _)) => {
log::info!(
"WebSocket handshake has been successfully completed: connection={}",
&connection.as_str()
);
let (write, read) = ws_stream.split();
let sender_to_ws = sender_rx.map(Ok).forward(write);
let ws_to_stdout = {
read.for_each(|message| async {
match message {
Ok(line) => match line {
Message::Close(frame) => match frame {
Some(f) => {
log::info!("Close: code={}, reason={}", f.code, f.reason);
}
None => {
log::info!("Close");
}
},
Message::Text(msg) => {
receiver_tx.unbounded_send(msg.to_string()).unwrap();
}
Message::Binary(data) => {
let msg = match bytes_to_string(data) {
Ok(msg) => msg,
Err(e) => {
log::error!("Binary: {:?}", e);
"".into()
}
};
if !msg.is_empty() {
receiver_tx.unbounded_send(msg.to_string()).unwrap();
}
}
Message::Ping(data) => match bytes_to_string(data) {
Ok(msg) => {
log::info!("Ping: {}", msg);
}
Err(e) => {
log::error!("Ping: {:?}", e);
}
},
Message::Pong(data) => match bytes_to_string(data) {
Ok(msg) => {
log::info!("Pong: {}", msg);
}
Err(e) => {
log::error!("Pong: {:?}", e);
}
},
Message::Frame(f) => {
log::info!("Frame: {:?}", f);
}
},
Err(e) => {
log::error!("error={:?}", e);
}
}
})
};
pin_mut!(sender_to_ws, ws_to_stdout);
future::select(sender_to_ws, ws_to_stdout).await;
}
Err(e) => {
log::info!(
"WebSocket-Connect-failed: connection={}, error={:?}",
&connection.as_str(),
e
);
}
}
}
}