1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
use std::sync::{
    atomic::{AtomicBool, Ordering},
    Arc,
};

use async_std::{
    channel::Sender,
    io::{BufReader, ReadExt},
    net::TcpStream,
    task,
};

use crate::{
    messages::ResponseMessage,
    types::{ConnectionHandler, Error, DELIMITER},
};

pub struct Socket {
    pub stream: Arc<TcpStream>,
    res_sender: Sender<ResponseMessage>,
    msg_buffer: String,
}
impl Socket {
    pub async fn connect(
        server: &str,
        port: u16,
        res_sender: Sender<ResponseMessage>,
    ) -> std::io::Result<Self> {
        let addr = format!("{}:{}", server, port);
        let stream = TcpStream::connect(addr).await?;
        Ok(Socket {
            stream: Arc::new(stream),
            res_sender,
            // res_msg: Arc::new(Mutex::new(VecDeque::new())),
            msg_buffer: String::new(),
        })
    }

    pub async fn recv_loop(
        &mut self,
        is_connected: Arc<AtomicBool>,
        handler: Option<Arc<dyn ConnectionHandler + Send + Sync>>,
    ) -> Result<(), Error> {
        let mut reader = BufReader::new(self.stream.as_ref());
        let mut buffer = vec![0u8; 4096];
        loop {
            let bytes_read = reader.read(&mut buffer).await?;
            if bytes_read == 0 {
                log::debug!("stream disconnected");
                break;
            }
            let res = String::from_utf8_lossy(&buffer[..bytes_read]);
            log::trace!("Received msg : {}", res);
            self.msg_buffer.push_str(res.as_ref());
            // println!("{:?}", self.msg_buffer);

            if self.msg_buffer.find(&format!("{}10=", DELIMITER)).is_some()
                && self.msg_buffer.ends_with(DELIMITER)
            {
                log::debug!("Handle response : {}", self.msg_buffer);
                let res = ResponseMessage::new(&self.msg_buffer, DELIMITER);
                if let Err(err) = self.res_sender.send(res).await {
                    log::error!("Failed to send ResponseMessage : {:?}", err);
                    break;
                }
                self.msg_buffer.clear();
            }
        }

        is_connected.store(false, Ordering::Relaxed);
        // notify disconnection
        if let Some(handler) = handler {
            task::spawn(async move {
                handler.on_disconnect().await;
            });
        }
        Ok(())
    }
}