cfix/
socket.rs

1use std::sync::{
2    atomic::{AtomicBool, Ordering},
3    Arc,
4};
5
6use async_std::{
7    channel::Sender,
8    io::{BufReader, ReadExt},
9    net::TcpStream,
10    task,
11};
12
13use crate::{
14    messages::ResponseMessage,
15    types::{ConnectionHandler, Error, DELIMITER},
16};
17
18pub struct Socket {
19    pub stream: Arc<TcpStream>,
20    res_sender: Sender<ResponseMessage>,
21    msg_buffer: String,
22}
23impl Socket {
24    pub async fn connect(
25        server: &str,
26        port: u16,
27        res_sender: Sender<ResponseMessage>,
28    ) -> std::io::Result<Self> {
29        let addr = format!("{}:{}", server, port);
30        let stream = TcpStream::connect(addr).await?;
31        Ok(Socket {
32            stream: Arc::new(stream),
33            res_sender,
34            // res_msg: Arc::new(Mutex::new(VecDeque::new())),
35            msg_buffer: String::new(),
36        })
37    }
38
39    pub async fn recv_loop(
40        &mut self,
41        is_connected: Arc<AtomicBool>,
42        handler: Option<Arc<dyn ConnectionHandler + Send + Sync>>,
43    ) -> Result<(), Error> {
44        let mut reader = BufReader::new(self.stream.as_ref());
45        let mut buffer = vec![0u8; 4096];
46        loop {
47            let bytes_read = reader.read(&mut buffer).await?;
48            if bytes_read == 0 {
49                log::debug!("stream disconnected");
50                break;
51            }
52            let res = String::from_utf8_lossy(&buffer[..bytes_read]);
53            log::trace!("Received msg : {}", res);
54            self.msg_buffer.push_str(res.as_ref());
55
56            loop {
57                let mut pos = self
58                    .msg_buffer
59                    .find(&format!("{}10=", DELIMITER))
60                    .unwrap_or(usize::MAX);
61
62                if pos == usize::MAX {
63                    break;
64                }
65                pos += 8;
66                //
67                let mut rest = None;
68                let res = if pos >= self.msg_buffer.len() {
69                    // send itself.
70                    ResponseMessage::new(&self.msg_buffer, DELIMITER)
71                } else {
72                    // } else if pos < self.msg_buffer.len() {
73                    rest = Some(self.msg_buffer.split_off(pos));
74                    ResponseMessage::new(&self.msg_buffer, DELIMITER)
75                };
76                // else {
77                //     println!("\n{:?}\n", self.msg_buffer);
78                //     break;
79                // };
80
81                log::debug!("Handle the response : {}", self.msg_buffer);
82                if let Err(err) = self.res_sender.send(res).await {
83                    log::error!("Failed to send ResponseMessage : {:?}", err);
84                    break;
85                }
86
87                if let Some(rest) = rest {
88                    self.msg_buffer = rest;
89                } else {
90                    self.msg_buffer.clear();
91                    break;
92                }
93            }
94            // FIXME
95            // if self.msg_buffer.find(&format!("{}10=", DELIMITER)).is_some()
96            //     && self.msg_buffer.ends_with(DELIMITER)
97            // {
98            //     log::debug!("Handle response : {}", self.msg_buffer);
99            //     let res = ResponseMessage::new(&self.msg_buffer, DELIMITER);
100            //     self.msg_buffer.clear();
101            // }
102        }
103
104        is_connected.store(false, Ordering::Relaxed);
105        // notify disconnection
106        if let Some(handler) = handler {
107            task::spawn(async move {
108                handler.on_disconnect().await;
109            });
110        }
111        Ok(())
112    }
113}