zero4rs 2.0.0

zero4rs is a powerful, pragmatic, and extremely fast web framework for Rust
Documentation
use url::Url;

use futures_util::{future, pin_mut, StreamExt};

// use tokio::io::AsyncReadExt;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::protocol::Message;

use crate::commons::bytes_to_string;

#[derive(Clone)]
pub struct WebSocketClient {
    pub connection: String,
    pub sender_tx: Option<futures_channel::mpsc::UnboundedSender<Message>>,
    pub receiver_tx: Option<futures_channel::mpsc::UnboundedSender<String>>,
}

impl WebSocketClient {
    pub fn addr(&self) -> &str {
        &self.connection
    }

    pub fn receiver_tx(&self) -> futures_channel::mpsc::UnboundedSender<String> {
        self.receiver_tx.clone().unwrap()
    }

    pub fn sender_tx(&self) -> futures_channel::mpsc::UnboundedSender<Message> {
        self.sender_tx.clone().unwrap()
    }

    pub fn send_message(&self, data: &str) {
        log::info!("send_message: {}", data);
        self.sender_tx()
            .unbounded_send(Message::Text(data.to_string()))
            // .unbounded_send(Message::binary(data.as_bytes().to_vec()))
            .unwrap();
    }

    pub async fn on_message(mut rx: futures_channel::mpsc::UnboundedReceiver<String>) {
        loop {
            match rx.try_next() {
                Ok(m) => {
                    if let Some(msg) = m {
                        log::info!("{}", msg);
                    }
                }
                Err(_) => {
                    tokio::time::sleep(std::time::Duration::from_millis(300)).await;
                }
            }
        }
    }

    pub fn connect(connection: &str) -> Self {
        let (sender_tx, sender_rx) = futures_channel::mpsc::unbounded::<Message>();
        let (receiver_tx, receiver_rx) = futures_channel::mpsc::unbounded::<String>();

        let s = Self {
            connection: connection.to_string(),
            sender_tx: Some(sender_tx.clone()),
            receiver_tx: Some(receiver_tx.clone()),
        };

        tokio::spawn(Self::on_message(receiver_rx));
        // tokio::spawn(Self::send_message(sender_tx.clone()));

        let url = Url::parse(connection).expect("Failed to parse URL");

        tokio::spawn(Self::run(url, sender_rx, s.receiver_tx()));

        s
    }

    pub async fn listener(connection: &str) {
        let (_sender_tx, sender_rx) = futures_channel::mpsc::unbounded::<Message>();
        let (receiver_tx, receiver_rx) = futures_channel::mpsc::unbounded::<String>();

        tokio::spawn(Self::on_message(receiver_rx));
        // tokio::spawn(Self::send_message(sender_tx.clone()));

        let url = Url::parse(connection).expect("Failed to parse URL");

        Self::run(url, sender_rx, receiver_tx.clone()).await;
    }

    pub async fn run(
        connection: Url,
        sender_rx: futures_channel::mpsc::UnboundedReceiver<Message>,
        receiver_tx: futures_channel::mpsc::UnboundedSender<String>,
    ) {
        match connect_async(connection.clone()).await {
            Ok((ws_stream, _)) => {
                log::info!(
                    "WebSocket handshake has been successfully completed: connection={}",
                    &connection.as_str()
                );

                let (write, read) = ws_stream.split();

                let sender_to_ws = sender_rx.map(Ok).forward(write);

                let ws_to_stdout = {
                    read.for_each(|message| async {
                        match message {
                            Ok(line) => match line {
                                Message::Close(frame) => match frame {
                                    Some(f) => {
                                        log::info!("Close: code={}, reason={}", f.code, f.reason);
                                    }
                                    None => {
                                        log::info!("Close");
                                    }
                                },
                                Message::Text(msg) => {
                                    receiver_tx.unbounded_send(msg.to_string()).unwrap();
                                }
                                Message::Binary(data) => {
                                    let msg = match bytes_to_string(data) {
                                        Ok(msg) => msg,
                                        Err(e) => {
                                            log::error!("Binary: {:?}", e);
                                            "".into()
                                        }
                                    };

                                    if !msg.is_empty() {
                                        receiver_tx.unbounded_send(msg.to_string()).unwrap();
                                    }
                                }
                                Message::Ping(data) => match bytes_to_string(data) {
                                    Ok(msg) => {
                                        log::info!("Ping: {}", msg);
                                    }
                                    Err(e) => {
                                        log::error!("Ping: {:?}", e);
                                    }
                                },
                                Message::Pong(data) => match bytes_to_string(data) {
                                    Ok(msg) => {
                                        log::info!("Pong: {}", msg);
                                    }
                                    Err(e) => {
                                        log::error!("Pong: {:?}", e);
                                    }
                                },
                                Message::Frame(f) => {
                                    log::info!("Frame: {:?}", f);
                                }
                            },
                            Err(e) => {
                                // Protocol(ResetWithoutClosingHandshake)
                                log::error!("error={:?}", e);
                            }
                        }
                    })
                };

                pin_mut!(sender_to_ws, ws_to_stdout);

                future::select(sender_to_ws, ws_to_stdout).await;
            }
            Err(e) => {
                log::info!(
                    "WebSocket-Connect-failed: connection={}, error={:?}",
                    &connection.as_str(),
                    e
                );
            }
        }
    }
}