use std::{
net::SocketAddr,
ops::{Deref, DerefMut},
sync::Arc,
time::{Duration, SystemTime},
};
use bytes::BytesMut;
use tokio::{
io::{AsyncReadExt, ReadHalf, split},
net::TcpStream,
sync::{RwLock, Semaphore},
time::interval,
};
use tracing::{error, info, warn};
use crate::{
app::{LynnRouter, ReactorEventSender, event_api::event_api::ReactorEvent},
const_config::{
DEFAULT_MAX_RECEIVE_BYTES_SIZE, DEFAULT_MESSAGE_HEADER_MARK, DEFAULT_MESSAGE_TAIL_MARK,
SERVER_MESSAGE_HEADER_MARK, SERVER_MESSAGE_TAIL_MARK,
},
handler::{ClientsContext, HandlerContext},
lynn_tcp_dependents::{HandlerResult, InputBufVO},
vo_factory::big_buf::BigBufReader,
};
use super::{AsyncFunc, ClientsStruct, ClientsStructType, lynn_server_user::LynnUser};
use crate::vo_factory::InputBufVOTrait;
#[inline(always)]
pub(super) fn spawn_check_heart(
server_check_heart_interval: u64,
server_check_heart_timeout_time: u64,
clients: ClientsStructType,
) {
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(server_check_heart_interval));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
info!(
"Server - [check heart] start success!!! with [server_check_heart_interval:{}s] [server_check_heart_timeout_time:{}s]",
server_check_heart_interval, server_check_heart_timeout_time
);
let timeout_duration = Duration::from_secs(server_check_heart_timeout_time);
loop {
interval.tick().await;
let mut remove_list = Vec::with_capacity(clients.len() / 10); let current_time = SystemTime::now();
for entry in clients.iter() {
let addr = *entry.key();
let last_communicate_time = entry.value().get_last_communicate_time();
if let Ok(last_time_guard) = last_communicate_time.try_read() {
let time_old = *last_time_guard;
if let Ok(duration) = current_time.duration_since(time_old) {
if duration > timeout_duration {
remove_list.push(addr);
}
}
}
}
for addr in remove_list {
if let Some((_, user)) = clients.remove(&addr) {
info!(
"Clean up addr:{}, that have not sent messages for a long time",
addr
);
}
}
info!("Server check online socket count:{}", clients.len());
}
});
}
#[inline(always)]
pub(crate) async fn check_handler_result(
mut handler_result: HandlerResult,
clients: ClientsStructType,
) {
if handler_result.get_is_send() {
let response = handler_result.get_response_data();
if response.is_some() && handler_result.get_addrs().is_some() {
if !handler_result.is_with_mark() {
handler_result.set_marks(
*SERVER_MESSAGE_HEADER_MARK
.get()
.unwrap_or(&DEFAULT_MESSAGE_HEADER_MARK),
*SERVER_MESSAGE_TAIL_MARK
.get()
.unwrap_or(&DEFAULT_MESSAGE_TAIL_MARK),
);
}
let response = response.unwrap();
{
if let Some(addrs) = handler_result.get_addrs() {
if let Some(delay_socket) = send_response(&response, &addrs, &clients).await {
if !delay_socket.is_empty() {
delay_socket.iter().for_each(|addr|{
warn!("Failed to find the client correctly, message sending is invalid , target-addr:{}",addr);
});
}
}
}
}
}
}
}
#[inline(always)]
async fn send_response(
response: &BytesMut,
addrs: &[SocketAddr],
clients: &ClientsStructType,
) -> Option<Vec<SocketAddr>> {
if addrs.is_empty() {
return None;
}
let mut delay_socket = Vec::with_capacity(addrs.len());
for socket_addr in addrs {
if let Some(socket) = clients.get(socket_addr) {
socket.send_response(response).await;
} else {
delay_socket.push(*socket_addr);
}
}
if delay_socket.is_empty() {
None
} else {
Some(delay_socket)
}
}
#[inline(always)]
pub(crate) async fn input_dto_build(
addr: SocketAddr,
input_buf_vo: InputBufVO,
process_permit: Arc<Semaphore>,
clients: ClientsStructType,
handler_method: Arc<AsyncFunc>,
reactor_event_sender: ReactorEventSender,
) {
match process_permit.try_acquire() {
Ok(_permit) => {
reactor_event_sender.push(ReactorEvent::crate_excute_task_event((
handler_method,
HandlerContext::new(
input_buf_vo,
ClientsContext::new(ClientsStruct(clients.clone())),
),
clients,
)));
}
Err(_) => {
warn!("addr:{} PROCESS_PERMIT_SIZE is full", addr)
}
}
}
#[inline(always)]
pub(crate) async fn add_client(
clients: ClientsStructType,
socket: TcpStream,
addr: SocketAddr,
last_communicate_time: Arc<RwLock<SystemTime>>,
) -> ReadHalf<TcpStream> {
let (read_half, write_half) = split(socket);
let lynn_user = LynnUser::new(write_half, last_communicate_time);
clients.insert(addr, lynn_user);
read_half
}
#[inline(always)]
pub(crate) async fn push_read_half(
mut read_half: ReadHalf<TcpStream>,
process_permit: Arc<Semaphore>,
addr: SocketAddr,
clients: ClientsStructType,
message_header_mark: u16,
message_tail_mark: u16,
lynn_router: Arc<LynnRouter>,
reactor_event_sender: ReactorEventSender,
last_communicate_time: Arc<RwLock<SystemTime>>,
) {
tokio::spawn(async move {
let mut buf = [0; DEFAULT_MAX_RECEIVE_BYTES_SIZE];
let mut big_buf = BigBufReader::new(message_header_mark, message_tail_mark);
loop {
let result = read_half.read(&mut buf).await;
match result {
Ok(n) if n <= 0 => break,
Ok(n) => {
big_buf.extend_from_slice(&buf[..n]);
while big_buf.is_complete() {
let mut input_buf_vo = InputBufVO::new(big_buf.get_data(), addr);
if let Some(constructor_id) = input_buf_vo.get_constructor_id() {
match constructor_id {
2 => {
if let Ok(mut time_guard) = last_communicate_time.try_write() {
*time_guard = SystemTime::now();
}
continue;
}
1 => {
if let Some(method_id) = input_buf_vo.get_method_id() {
if let Some(handler_method) =
lynn_router.get_handler_by_method_id(&method_id)
{
input_dto_build(
addr,
input_buf_vo,
process_permit.clone(),
clients.clone(),
handler_method.clone(),
reactor_event_sender.clone(),
)
.await;
} else {
warn!("router_map_async no method match,{}", method_id);
}
} else {
warn!("router_map_async input_buf_vo no method_id");
}
}
_ => {
warn!("Unknown constructor_id: {}", constructor_id);
}
}
}
}
}
Err(e) => {
error!("Failed to read from socket: {}", e.to_string());
break;
}
}
}
});
}