1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use async_std::{
channel::Sender,
io::{BufReader, ReadExt},
net::TcpStream,
task,
};
use crate::{
messages::ResponseMessage,
types::{ConnectionHandler, Error, DELIMITER},
};
pub struct Socket {
pub stream: Arc<TcpStream>,
res_sender: Sender<ResponseMessage>,
msg_buffer: String,
}
impl Socket {
pub async fn connect(
server: &str,
port: u16,
res_sender: Sender<ResponseMessage>,
) -> std::io::Result<Self> {
let addr = format!("{}:{}", server, port);
let stream = TcpStream::connect(addr).await?;
Ok(Socket {
stream: Arc::new(stream),
res_sender,
// res_msg: Arc::new(Mutex::new(VecDeque::new())),
msg_buffer: String::new(),
})
}
pub async fn recv_loop(
&mut self,
is_connected: Arc<AtomicBool>,
handler: Option<Arc<dyn ConnectionHandler + Send + Sync>>,
) -> Result<(), Error> {
let mut reader = BufReader::new(self.stream.as_ref());
let mut buffer = vec![0u8; 4096];
loop {
let bytes_read = reader.read(&mut buffer).await?;
if bytes_read == 0 {
log::debug!("stream disconnected");
break;
}
let res = String::from_utf8_lossy(&buffer[..bytes_read]);
log::trace!("Received msg : {}", res);
self.msg_buffer.push_str(res.as_ref());
loop {
let mut pos = self
.msg_buffer
.find(&format!("{}10=", DELIMITER))
.unwrap_or(usize::MAX);
if pos == usize::MAX {
break;
}
pos += 8;
//
let mut rest = None;
let res = if pos >= self.msg_buffer.len() {
// send itself.
ResponseMessage::new(&self.msg_buffer, DELIMITER)
} else {
// } else if pos < self.msg_buffer.len() {
rest = Some(self.msg_buffer.split_off(pos));
ResponseMessage::new(&self.msg_buffer, DELIMITER)
};
// else {
// println!("\n{:?}\n", self.msg_buffer);
// break;
// };
log::debug!("Handle the response : {}", self.msg_buffer);
if let Err(err) = self.res_sender.send(res).await {
log::error!("Failed to send ResponseMessage : {:?}", err);
break;
}
if let Some(rest) = rest {
self.msg_buffer = rest;
} else {
self.msg_buffer.clear();
break;
}
}
// FIXME
// if self.msg_buffer.find(&format!("{}10=", DELIMITER)).is_some()
// && self.msg_buffer.ends_with(DELIMITER)
// {
// log::debug!("Handle response : {}", self.msg_buffer);
// let res = ResponseMessage::new(&self.msg_buffer, DELIMITER);
// self.msg_buffer.clear();
// }
}
is_connected.store(false, Ordering::Relaxed);
// notify disconnection
if let Some(handler) = handler {
task::spawn(async move {
handler.on_disconnect().await;
});
}
Ok(())
}
}