lynn_tcp 1.2.5

Lightweight asynchronous TCP framework
Documentation
use std::{
    net::SocketAddr,
    ops::{Deref, DerefMut},
    sync::Arc,
    time::{Duration, SystemTime},
};

use bytes::BytesMut;
use tokio::{
    io::{AsyncReadExt, ReadHalf, split},
    net::TcpStream,
    sync::{RwLock, Semaphore},
    time::interval,
};
use tracing::{error, info, warn};

use crate::{
    app::{LynnRouter, ReactorEventSender, event_api::event_api::ReactorEvent},
    const_config::{
        DEFAULT_MAX_RECEIVE_BYTES_SIZE, DEFAULT_MESSAGE_HEADER_MARK, DEFAULT_MESSAGE_TAIL_MARK,
        SERVER_MESSAGE_HEADER_MARK, SERVER_MESSAGE_TAIL_MARK,
    },
    handler::{ClientsContext, HandlerContext},
    lynn_tcp_dependents::{HandlerResult, InputBufVO},
    vo_factory::big_buf::BigBufReader,
};

use super::{AsyncFunc, ClientsStruct, ClientsStructType, lynn_server_user::LynnUser};

use crate::vo_factory::InputBufVOTrait;

#[inline(always)]
pub(super) fn spawn_check_heart(
    server_check_heart_interval: u64,
    server_check_heart_timeout_time: u64,
    clients: ClientsStructType,
) {
    tokio::spawn(async move {
        let mut interval = interval(Duration::from_secs(server_check_heart_interval));
        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

        info!(
            "Server - [check heart] start success!!! with [server_check_heart_interval:{}s] [server_check_heart_timeout_time:{}s]",
            server_check_heart_interval, server_check_heart_timeout_time
        );

        let timeout_duration = Duration::from_secs(server_check_heart_timeout_time);

        loop {
            interval.tick().await;

            let mut remove_list = Vec::with_capacity(clients.len() / 10); // Assuming that up to 10% of clients may timeout
            let current_time = SystemTime::now();

            for entry in clients.iter() {
                let addr = *entry.key();
                let last_communicate_time = entry.value().get_last_communicate_time();

                if let Ok(last_time_guard) = last_communicate_time.try_read() {
                    let time_old = *last_time_guard;

                    if let Ok(duration) = current_time.duration_since(time_old) {
                        if duration > timeout_duration {
                            remove_list.push(addr);
                        }
                    }
                }
                // If unable to obtain a read lock, skip the client to avoid blocking
            }

            for addr in remove_list {
                if let Some((_, user)) = clients.remove(&addr) {
                    info!(
                        "Clean up addr:{}, that have not sent messages for a long time",
                        addr
                    );
                }
            }

            info!("Server check online socket count:{}", clients.len());
        }
    });
}

/// A function for checking and sending a HandlerResult instance.
///
/// This function checks the HandlerResult instance and sends it through a channel if the send flag is set to true.
#[inline(always)]
pub(crate) async fn check_handler_result(
    mut handler_result: HandlerResult,
    clients: ClientsStructType,
) {
    // If the send flag of the HandlerResult instance is set to true, send the instance through the channel.
    if handler_result.get_is_send() {
        let response = handler_result.get_response_data();
        if response.is_some() && handler_result.get_addrs().is_some() {
            if !handler_result.is_with_mark() {
                handler_result.set_marks(
                    *SERVER_MESSAGE_HEADER_MARK
                        .get()
                        .unwrap_or(&DEFAULT_MESSAGE_HEADER_MARK),
                    *SERVER_MESSAGE_TAIL_MARK
                        .get()
                        .unwrap_or(&DEFAULT_MESSAGE_TAIL_MARK),
                );
            }
            let response = response.unwrap();
            {
                if let Some(addrs) = handler_result.get_addrs() {
                    if let Some(delay_socket) = send_response(&response, &addrs, &clients).await {
                        if !delay_socket.is_empty() {
                            delay_socket.iter().for_each(|addr|{
                                warn!("Failed to find the client correctly, message sending is invalid , target-addr:{}",addr);
                            });
                        }
                    }
                }
            }
        }
    }
}

#[inline(always)]
async fn send_response(
    response: &BytesMut,
    addrs: &[SocketAddr],
    clients: &ClientsStructType,
) -> Option<Vec<SocketAddr>> {
    if addrs.is_empty() {
        return None;
    }

    let mut delay_socket = Vec::with_capacity(addrs.len());

    for socket_addr in addrs {
        if let Some(socket) = clients.get(socket_addr) {
            socket.send_response(response).await;
        } else {
            delay_socket.push(*socket_addr);
        }
    }

    if delay_socket.is_empty() {
        None
    } else {
        Some(delay_socket)
    }
}

#[inline(always)]
pub(crate) async fn input_dto_build(
    addr: SocketAddr,
    input_buf_vo: InputBufVO,
    process_permit: Arc<Semaphore>,
    clients: ClientsStructType,
    handler_method: Arc<AsyncFunc>,
    reactor_event_sender: ReactorEventSender,
) {
    // Attempt to acquire a permit from the semaphore.
    match process_permit.try_acquire() {
        Ok(_permit) => {
            reactor_event_sender.push(ReactorEvent::crate_excute_task_event((
                handler_method,
                HandlerContext::new(
                    input_buf_vo,
                    ClientsContext::new(ClientsStruct(clients.clone())),
                ),
                clients,
            )));
        }
        Err(_) => {
            // If the permit cannot be acquired, log a warning.
            warn!("addr:{} PROCESS_PERMIT_SIZE is full", addr)
        }
    }
}

#[inline(always)]
pub(crate) async fn add_client(
    clients: ClientsStructType,
    socket: TcpStream,
    addr: SocketAddr,
    last_communicate_time: Arc<RwLock<SystemTime>>,
) -> ReadHalf<TcpStream> {
    let (read_half, write_half) = split(socket);
    let lynn_user = LynnUser::new(write_half, last_communicate_time);
    clients.insert(addr, lynn_user);
    read_half
}

#[inline(always)]
pub(crate) async fn push_read_half(
    mut read_half: ReadHalf<TcpStream>,
    process_permit: Arc<Semaphore>,
    addr: SocketAddr,
    clients: ClientsStructType,
    message_header_mark: u16,
    message_tail_mark: u16,
    lynn_router: Arc<LynnRouter>,
    reactor_event_sender: ReactorEventSender,
    last_communicate_time: Arc<RwLock<SystemTime>>,
) {
    tokio::spawn(async move {
        let mut buf = [0; DEFAULT_MAX_RECEIVE_BYTES_SIZE];
        let mut big_buf = BigBufReader::new(message_header_mark, message_tail_mark);

        loop {
            let result = read_half.read(&mut buf).await;
            match result {
                Ok(n) if n <= 0 => break,
                Ok(n) => {
                    big_buf.extend_from_slice(&buf[..n]);
                    while big_buf.is_complete() {
                        let mut input_buf_vo = InputBufVO::new(big_buf.get_data(), addr);
                        if let Some(constructor_id) = input_buf_vo.get_constructor_id() {
                            match constructor_id {
                                2 => {
                                    if let Ok(mut time_guard) = last_communicate_time.try_write() {
                                        *time_guard = SystemTime::now();
                                    }
                                    continue;
                                }
                                1 => {
                                    if let Some(method_id) = input_buf_vo.get_method_id() {
                                        if let Some(handler_method) =
                                            lynn_router.get_handler_by_method_id(&method_id)
                                        {
                                            input_dto_build(
                                                addr,
                                                input_buf_vo,
                                                process_permit.clone(),
                                                clients.clone(),
                                                handler_method.clone(),
                                                reactor_event_sender.clone(),
                                            )
                                            .await;
                                        } else {
                                            warn!("router_map_async no method match,{}", method_id);
                                        }
                                    } else {
                                        warn!("router_map_async input_buf_vo no method_id");
                                    }
                                }
                                _ => {
                                    warn!("Unknown constructor_id: {}", constructor_id);
                                }
                            }
                        }
                    }
                }
                Err(e) => {
                    error!("Failed to read from socket: {}", e.to_string());
                    break;
                }
            }
        }
    });
}