1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
use crate::protocol::BNF_LF;
use crate::protocol::{ClientCommand, Frame, FrameParser, ParseError, ServerCommand, StompMessage};
use log::debug;
use std::error::Error;
use std::fmt::{Display, Formatter};
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt, ErrorKind};
use tokio::net::TcpStream;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::{channel, Sender};
use tokio::sync::Mutex;

#[derive(Debug)]
pub enum ClosingReason {
    ParseError(ParseError),
    ConnectionError(std::io::Error),
    Shutdown,
}

#[derive(Debug)]
pub enum ConnectionError {
    Closing(ClosingReason),
}

impl Display for ConnectionError {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(f, "Client error")
    }
}

impl Error for ConnectionError {}

pub struct Connection {
    client_sender: Sender<StompMessage<ClientCommand>>,
    server_Sender: Sender<Result<StompMessage<ServerCommand>, ConnectionError>>,
    close_sender: Sender<()>,
    is_closed: Arc<Mutex<bool>>,
}

impl Connection {
    pub async fn new(
        mut tcp_stream: TcpStream,
        server_sender: Sender<Result<StompMessage<ServerCommand>, ConnectionError>>,
    ) -> Self {
        let (sender, mut receiver) = channel(5);

        let (close_sender, mut close_receiver) = channel(1);
        let inner_close_sender = close_sender.clone();
        let is_closed = Arc::new(Mutex::new(false));
        let inner_is_closed = Arc::clone(&is_closed);

        let connection = Self {
            client_sender: sender,
            server_Sender: server_sender.clone(),
            close_sender,
            is_closed,
        };

        tokio::spawn(async move {
            let mut msg = vec![0; 8096];
            let mut parser: FrameParser<ServerCommand> = FrameParser::new();
            let mut closing = false;

            loop {
                tokio::select! {
                    frame = receiver.recv(), if !closing => {
                         if let Some(message) = frame {
                            match message {
                                StompMessage::Frame(frame) => tcp_stream.write_all(&frame.to_bytes()).await.unwrap(),
                                StompMessage::Ping => tcp_stream.write_u8(BNF_LF).await.unwrap()
                            }

                            tcp_stream.flush().await.unwrap();
                        }
                    },
                    read = tcp_stream.read(&mut msg), if !closing => {
                        match read {
                            Ok(n) => {
                                match parser.parse(&msg[..n]) {
                                    Ok(messages) => {
                                        for message in messages {
                                            debug!("Message received {:?}", message.clone());
                                            server_sender.send(Ok(message)).await.unwrap();
                                        }
                                    }
                                    Err(e) => {
                                        debug!("Parsing error, closing {:?}", e);
                                        server_sender.send(Err(ConnectionError::Closing(ClosingReason::ParseError(e))));
                                        inner_close_sender.send(()).await.unwrap();
                                        closing = true;
                                    }
                                }
                            }
                            Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
                            Err(e) => {
                                debug!("Connection error, closing {:?}", e);
                                server_sender.send(Err(ConnectionError::Closing(ClosingReason::ConnectionError(e))));
                                inner_close_sender.send(()).await.unwrap();
                                closing = true;
                            }
                        }
                    }
                    _ = close_receiver.recv() => {
                        debug!("Closing connection");
                        tcp_stream.shutdown()
                            .await
                            .unwrap();

                        receiver.close();

                        let mut guard = inner_is_closed.lock().await;
                        *guard = true;
                        break;
                    }
                };
            }
        });

        connection
    }

    pub async fn is_closed(&self) -> bool {
        *self.is_closed.lock().await
    }
    pub async fn emit<T: Into<Frame<ClientCommand>>>(
        &self,
        frame: T,
    ) -> Result<(), SendError<StompMessage<ClientCommand>>> {
        self.client_sender
            .send(StompMessage::Frame(frame.into()))
            .await
    }

    pub async fn heartbeat(&self) -> Result<(), SendError<StompMessage<ClientCommand>>> {
        self.client_sender.send(StompMessage::Ping).await
    }

    pub async fn close(&self) {
        self.server_Sender
            .send(Err(ConnectionError::Closing(ClosingReason::Shutdown)));
        self.close_sender.send(()).await;
    }
}