use std::{
sync::Arc,
time::{Duration, SystemTime},
};
use bytes::BytesMut;
use tokio::{
io::{AsyncWriteExt, WriteHalf},
net::TcpStream,
sync::{
RwLock,
mpsc::{Sender, channel},
},
task::JoinHandle,
};
use tracing::error;
use crate::const_config::DEFAULT_SYSTEM_CHANNEL_SIZE;
pub(crate) enum LynnUserSignal {
SendResponse(BytesMut),
}
pub(crate) struct LynnUser {
last_communicate_time: Arc<RwLock<SystemTime>>,
sender: Sender<LynnUserSignal>,
main_join_handle: JoinHandle<()>,
}
impl LynnUser {
pub(crate) fn new(
write_half: WriteHalf<TcpStream>,
last_communicate_time: Arc<RwLock<SystemTime>>,
) -> Self {
let (tx, mut rx) = channel(DEFAULT_SYSTEM_CHANNEL_SIZE);
let main_join_handle = tokio::spawn(async move {
let mut write_half = write_half;
let mut buffer = Vec::with_capacity(4096);
loop {
match rx.recv().await {
Some(LynnUserSignal::SendResponse(response)) => {
if let Err(e) = write_half.write_all(&response).await {
error!("Failed to write to socket: {}", e);
break;
} else {
buffer.extend_from_slice(&response);
if buffer.len() >= 4096 {
if let Err(e) = write_half.flush().await {
error!("Failed to flush socket: {}", e);
break;
}
buffer.clear();
}
}
}
None => break,
}
}
if !buffer.is_empty() {
let _ = write_half.flush().await;
}
});
Self {
last_communicate_time,
sender: tx,
main_join_handle,
}
}
#[inline(always)]
pub(crate) fn get_last_communicate_time(&self) -> Arc<RwLock<SystemTime>> {
self.last_communicate_time.clone()
}
#[inline(always)]
pub(crate) async fn send_response(&self, response: &BytesMut) {
if let Err(e) = self
.sender
.send(LynnUserSignal::SendResponse(response.clone()))
.await
{
error!("Send response error:{}", e);
}
}
}
impl Drop for LynnUser {
fn drop(&mut self) {
self.main_join_handle.abort();
}
}