1use std::sync::{
2 atomic::{AtomicBool, Ordering},
3 Arc,
4};
5
6use async_std::{
7 channel::Sender,
8 io::{BufReader, ReadExt},
9 net::TcpStream,
10 task,
11};
12
13use crate::{
14 messages::ResponseMessage,
15 types::{ConnectionHandler, Error, DELIMITER},
16};
17
18pub struct Socket {
19 pub stream: Arc<TcpStream>,
20 res_sender: Sender<ResponseMessage>,
21 msg_buffer: String,
22}
23impl Socket {
24 pub async fn connect(
25 server: &str,
26 port: u16,
27 res_sender: Sender<ResponseMessage>,
28 ) -> std::io::Result<Self> {
29 let addr = format!("{}:{}", server, port);
30 let stream = TcpStream::connect(addr).await?;
31 Ok(Socket {
32 stream: Arc::new(stream),
33 res_sender,
34 msg_buffer: String::new(),
36 })
37 }
38
39 pub async fn recv_loop(
40 &mut self,
41 is_connected: Arc<AtomicBool>,
42 handler: Option<Arc<dyn ConnectionHandler + Send + Sync>>,
43 ) -> Result<(), Error> {
44 let mut reader = BufReader::new(self.stream.as_ref());
45 let mut buffer = vec![0u8; 4096];
46 loop {
47 let bytes_read = reader.read(&mut buffer).await?;
48 if bytes_read == 0 {
49 log::debug!("stream disconnected");
50 break;
51 }
52 let res = String::from_utf8_lossy(&buffer[..bytes_read]);
53 log::trace!("Received msg : {}", res);
54 self.msg_buffer.push_str(res.as_ref());
55
56 loop {
57 let mut pos = self
58 .msg_buffer
59 .find(&format!("{}10=", DELIMITER))
60 .unwrap_or(usize::MAX);
61
62 if pos == usize::MAX {
63 break;
64 }
65 pos += 8;
66 let mut rest = None;
68 let res = if pos >= self.msg_buffer.len() {
69 ResponseMessage::new(&self.msg_buffer, DELIMITER)
71 } else {
72 rest = Some(self.msg_buffer.split_off(pos));
74 ResponseMessage::new(&self.msg_buffer, DELIMITER)
75 };
76 log::debug!("Handle the response : {}", self.msg_buffer);
82 if let Err(err) = self.res_sender.send(res).await {
83 log::error!("Failed to send ResponseMessage : {:?}", err);
84 break;
85 }
86
87 if let Some(rest) = rest {
88 self.msg_buffer = rest;
89 } else {
90 self.msg_buffer.clear();
91 break;
92 }
93 }
94 }
103
104 is_connected.store(false, Ordering::Relaxed);
105 if let Some(handler) = handler {
107 task::spawn(async move {
108 handler.on_disconnect().await;
109 });
110 }
111 Ok(())
112 }
113}