1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use std::sync::{
    atomic::{AtomicBool, Ordering},
    Arc,
};

use async_std::{
    channel::Sender,
    io::{BufReader, ReadExt},
    net::TcpStream,
    task,
};

use crate::{
    messages::ResponseMessage,
    types::{ConnectionHandler, Error, DELIMITER},
};

pub struct Socket {
    pub stream: Arc<TcpStream>,
    res_sender: Sender<ResponseMessage>,
    msg_buffer: String,
}
impl Socket {
    pub async fn connect(
        server: &str,
        port: u16,
        res_sender: Sender<ResponseMessage>,
    ) -> std::io::Result<Self> {
        let addr = format!("{}:{}", server, port);
        let stream = TcpStream::connect(addr).await?;
        Ok(Socket {
            stream: Arc::new(stream),
            res_sender,
            // res_msg: Arc::new(Mutex::new(VecDeque::new())),
            msg_buffer: String::new(),
        })
    }

    pub async fn recv_loop(
        &mut self,
        is_connected: Arc<AtomicBool>,
        handler: Option<Arc<dyn ConnectionHandler + Send + Sync>>,
    ) -> Result<(), Error> {
        let mut reader = BufReader::new(self.stream.as_ref());
        let mut buffer = vec![0u8; 4096];
        loop {
            let bytes_read = reader.read(&mut buffer).await?;
            if bytes_read == 0 {
                log::debug!("stream disconnected");
                break;
            }
            let res = String::from_utf8_lossy(&buffer[..bytes_read]);
            log::trace!("Received msg : {}", res);
            self.msg_buffer.push_str(res.as_ref());

            loop {
                let mut pos = self
                    .msg_buffer
                    .find(&format!("{}10=", DELIMITER))
                    .unwrap_or(usize::MAX);

                if pos == usize::MAX {
                    break;
                }
                pos += 8;
                //
                let mut rest = None;
                let res = if pos >= self.msg_buffer.len() {
                    // send itself.
                    ResponseMessage::new(&self.msg_buffer, DELIMITER)
                } else {
                    // } else if pos < self.msg_buffer.len() {
                    rest = Some(self.msg_buffer.split_off(pos));
                    ResponseMessage::new(&self.msg_buffer, DELIMITER)
                };
                // else {
                //     println!("\n{:?}\n", self.msg_buffer);
                //     break;
                // };

                log::debug!("Handle the response : {}", self.msg_buffer);
                if let Err(err) = self.res_sender.send(res).await {
                    log::error!("Failed to send ResponseMessage : {:?}", err);
                    break;
                }

                if let Some(rest) = rest {
                    self.msg_buffer = rest;
                } else {
                    self.msg_buffer.clear();
                    break;
                }
            }
            // FIXME
            // if self.msg_buffer.find(&format!("{}10=", DELIMITER)).is_some()
            //     && self.msg_buffer.ends_with(DELIMITER)
            // {
            //     log::debug!("Handle response : {}", self.msg_buffer);
            //     let res = ResponseMessage::new(&self.msg_buffer, DELIMITER);
            //     self.msg_buffer.clear();
            // }
        }

        is_connected.store(false, Ordering::Relaxed);
        // notify disconnection
        if let Some(handler) = handler {
            task::spawn(async move {
                handler.on_disconnect().await;
            });
        }
        Ok(())
    }
}