1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use async_std::{
channel::Sender,
io::{BufReader, ReadExt},
net::TcpStream,
task,
};
use crate::{
messages::ResponseMessage,
types::{ConnectionHandler, Error, DELIMITER},
};
pub struct Socket {
pub stream: Arc<TcpStream>,
res_sender: Sender<ResponseMessage>,
msg_buffer: String,
}
impl Socket {
pub async fn connect(
server: &str,
port: u16,
res_sender: Sender<ResponseMessage>,
) -> std::io::Result<Self> {
let addr = format!("{}:{}", server, port);
let stream = TcpStream::connect(addr).await?;
Ok(Socket {
stream: Arc::new(stream),
res_sender,
// res_msg: Arc::new(Mutex::new(VecDeque::new())),
msg_buffer: String::new(),
})
}
pub async fn recv_loop(
&mut self,
is_connected: Arc<AtomicBool>,
handler: Option<Arc<dyn ConnectionHandler + Send + Sync>>,
) -> Result<(), Error> {
let mut reader = BufReader::new(self.stream.as_ref());
let mut buffer = vec![0u8; 4096];
loop {
let bytes_read = reader.read(&mut buffer).await?;
if bytes_read == 0 {
log::debug!("stream disconnected");
break;
}
let res = String::from_utf8_lossy(&buffer[..bytes_read]);
log::trace!("Received msg : {}", res);
self.msg_buffer.push_str(res.as_ref());
loop {
let mut pos = self
.msg_buffer
.find(&format!("{}10=", DELIMITER))
.unwrap_or(usize::MAX);
if pos == usize::MAX {
break;
}
pos += 8;
//
let mut rest = None;
let res = if pos >= self.msg_buffer.len() {
// send itself.
ResponseMessage::new(&self.msg_buffer, DELIMITER)
} else {
// } else if pos < self.msg_buffer.len() {
rest = Some(self.msg_buffer.split_off(pos));
ResponseMessage::new(&self.msg_buffer, DELIMITER)
};
// else {
// println!("\n{:?}\n", self.msg_buffer);
// break;
// };
log::debug!("Handle the response : {}", self.msg_buffer);
if let Err(err) = self.res_sender.send(res).await {
log::error!("Failed to send ResponseMessage : {:?}", err);
break;
}
if let Some(rest) = rest {
self.msg_buffer = rest;
} else {
self.msg_buffer.clear();
break;
}
}
// FIXME
// if self.msg_buffer.find(&format!("{}10=", DELIMITER)).is_some()
// && self.msg_buffer.ends_with(DELIMITER)
// {
// log::debug!("Handle response : {}", self.msg_buffer);
// let res = ResponseMessage::new(&self.msg_buffer, DELIMITER);
// self.msg_buffer.clear();
// }
}
is_connected.store(false, Ordering::Relaxed);
// notify disconnection
if let Some(handler) = handler {
task::spawn(async move {
handler.on_disconnect().await;
});
}
Ok(())
}
}