mod common_api;
mod lynn_server_config;
mod lynn_server_user;
mod server_thread_pool;
use std::{
collections::HashMap,
net::{SocketAddr, ToSocketAddrs},
ops::{Deref, DerefMut},
sync::Arc,
time::SystemTime,
};
use bytes::Bytes;
use common_api::{spawn_check_heart, spawn_socket_server};
use lynn_server_config::{LynnServerConfig, LynnServerConfigBuilder};
use lynn_server_user::LynnUser;
use server_thread_pool::LynnServerThreadPool;
use tokio::{
io::AsyncWriteExt,
net::TcpListener,
sync::{mpsc, RwLock, Semaphore},
task::JoinHandle,
};
use tracing::{error, info, warn, Level};
use tracing_subscriber::fmt;
use crate::{
const_config::{SERVER_MESSAGE_HEADER_MARK, SERVER_MESSAGE_TAIL_MARK},
handler::{HandlerContext, IHandler, IntoSystem},
};
pub mod lynn_config_api {
pub use super::lynn_server_config::LynnServerConfig;
pub use super::lynn_server_config::LynnServerConfigBuilder;
}
#[cfg(feature = "server")]
pub struct LynnServer<'a> {
clients: ClientsStruct,
router_map_async: RouterMapAsyncStruct,
router_maps: RouterMapsStruct,
lynn_config: LynnServerConfig<'a>,
lynn_thread_pool: LynnServerThreadPool,
}
pub(crate) type ClientsStructType = Arc<RwLock<HashMap<SocketAddr, LynnUser>>>;
#[derive(Clone)]
pub(crate) struct ClientsStruct(pub(crate) ClientsStructType);
struct RouterMapAsyncStruct(Arc<Option<HashMap<u16, Arc<AsyncFunc>>>>);
struct RouterMapsStruct(Option<HashMap<u16, Arc<AsyncFunc>>>);
pub(crate) type AsyncFunc = Box<dyn IHandler>;
pub(crate) type TaskBody = mpsc::Sender<(Arc<AsyncFunc>, HandlerContext, ClientsStructType)>;
impl<'a> LynnServer<'a> {
pub async fn new() -> Self {
let lynn_config = LynnServerConfig::default();
let server_max_threadpool_size = lynn_config.get_server_max_threadpool_size();
let thread_pool = LynnServerThreadPool::new(server_max_threadpool_size).await;
let app = Self {
clients: ClientsStruct(Arc::new(RwLock::new(HashMap::new()))),
router_map_async: RouterMapAsyncStruct(Arc::new(None)),
router_maps: RouterMapsStruct(None),
lynn_config,
lynn_thread_pool: thread_pool,
};
app.log_server().await;
app
}
#[deprecated(note = "use `new_with_addr`", since = "1.1.7")]
pub async fn new_with_ipv4(ipv4: &'a str) -> Self {
let lynn_config = LynnServerConfigBuilder::new()
.with_server_ipv4(ipv4)
.build();
let server_max_threadpool_size = lynn_config.get_server_max_threadpool_size();
let thread_pool = LynnServerThreadPool::new(server_max_threadpool_size).await;
let app = Self {
clients: ClientsStruct(Arc::new(RwLock::new(HashMap::new()))),
router_map_async: RouterMapAsyncStruct(Arc::new(None)),
router_maps: RouterMapsStruct(None),
lynn_config,
lynn_thread_pool: thread_pool,
};
app.log_server().await;
app
}
pub async fn new_with_addr<T>(addr: T) -> Self
where
T: ToSocketAddrs,
{
let lynn_config = LynnServerConfigBuilder::new().with_addr(addr).build();
let server_max_threadpool_size = lynn_config.get_server_max_threadpool_size();
let thread_pool = LynnServerThreadPool::new(server_max_threadpool_size).await;
let app = Self {
clients: ClientsStruct(Arc::new(RwLock::new(HashMap::new()))),
router_map_async: RouterMapAsyncStruct(Arc::new(None)),
router_maps: RouterMapsStruct(None),
lynn_config,
lynn_thread_pool: thread_pool,
};
app.log_server().await;
app
}
pub async fn new_with_config(lynn_config: LynnServerConfig<'a>) -> Self {
let server_max_threadpool_size = lynn_config.get_server_max_threadpool_size();
let thread_pool = LynnServerThreadPool::new(server_max_threadpool_size).await;
let app = Self {
clients: ClientsStruct(Arc::new(RwLock::new(HashMap::new()))),
router_map_async: RouterMapAsyncStruct(Arc::new(None)),
router_maps: RouterMapsStruct(None),
lynn_config,
lynn_thread_pool: thread_pool,
};
app.log_server().await;
app
}
pub fn add_router<Param>(mut self, method_id: u16, handler: impl IntoSystem<Param>) -> Self {
if let Some(ref mut map) = self.router_maps.0 {
map.insert(method_id, Arc::new(Box::new(handler.to_system())));
} else {
let mut map: HashMap<u16, Arc<Box<dyn IHandler>>> = HashMap::new();
map.insert(method_id, Arc::new(Box::new(handler.to_system())));
self.router_maps.0 = Some(map);
}
self
}
async fn synchronous_router(&mut self) {
self.router_map_async.0 = Arc::new(self.router_maps.0.clone());
self.router_maps.0 = None;
}
async fn add_client(
&self,
sender: mpsc::Sender<Bytes>,
addr: SocketAddr,
process_permit: Arc<Semaphore>,
join_handle: JoinHandle<()>,
last_communicate_time: Arc<RwLock<SystemTime>>,
) {
let mut clients = self.clients.0.write().await;
let guard = clients.deref_mut();
let lynn_user = LynnUser::new(sender, process_permit, join_handle, last_communicate_time);
guard.insert(addr, lynn_user);
}
async fn remove_client(&mut self, addr: SocketAddr) {
let mut clients = self.clients.0.write().await;
let guard = clients.deref_mut();
if guard.contains_key(&addr) {
guard.remove(&addr);
}
}
async fn check_heart(&self) {
let clients = self.clients.0.clone();
let server_check_heart_interval =
self.lynn_config.get_server_check_heart_interval().clone();
let server_check_heart_timeout_time = self
.lynn_config
.get_server_check_heart_timeout_time()
.clone();
spawn_check_heart(
server_check_heart_interval,
server_check_heart_timeout_time,
clients,
);
}
pub async fn start(mut self: Self) {
self.synchronous_router().await;
self.init_marks().await;
let server_arc = Arc::new(self);
if let Err(e) = server_arc.run().await {
error!("{}", e);
}
}
async fn run(self: Arc<Self>) -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind(self.lynn_config.get_server_ipv4()).await?;
info!(
"Server - [Main-LynnServer] start success!!! with [server_ipv4:{}]",
self.lynn_config.get_server_ipv4()
);
self.check_heart().await;
loop {
let clinet_result = listener.accept().await;
if let Ok((mut socket, addr)) = clinet_result {
let mut socket_permit = true;
{
if let Some(max_connections) = self.lynn_config.get_server_max_connections() {
let clients = self.clients.0.write().await;
let guard = clients.deref();
if guard.len() < *max_connections {
socket_permit = true;
} else {
socket_permit = false;
}
}
}
if socket_permit {
info!("Accepted connection from: {}", addr);
let process_permit = Arc::new(Semaphore::new(
*self.lynn_config.get_server_single_processs_permit(),
));
let clients = self.clients.0.clone();
let router_map_async = self.router_map_async.0.clone();
let thread_pool_task_body_sender =
self.lynn_thread_pool.task_body_sender.0.clone();
let message_header_mark = self.lynn_config.get_message_header_mark().clone();
let message_tail_mark = self.lynn_config.get_message_tail_mark().clone();
spawn_socket_server(
addr,
process_permit,
message_header_mark,
message_tail_mark,
socket,
router_map_async,
clients,
thread_pool_task_body_sender,
);
} else {
let _ = socket.shutdown().await;
warn!("Server socket's count is more than MAX_CONNECTIONS ,can not accept new client:{}",addr);
}
} else {
if let Err(e) = clinet_result {
warn!(
"Failed to accept connection , server run next, e :{}",
e.to_string()
);
}
}
}
}
async fn init_marks(&self) {
SERVER_MESSAGE_HEADER_MARK.get_or_init(|| *self.lynn_config.get_message_header_mark());
SERVER_MESSAGE_TAIL_MARK.get_or_init(|| *self.lynn_config.get_message_tail_mark());
}
async fn log_server(&self) {
let subscriber = fmt::Subscriber::builder()
.with_max_level(Level::INFO)
.finish();
match tracing::subscriber::set_global_default(subscriber) {
Ok(_) => {
info!("Server - [log server] start sucess!!!")
}
Err(e) => {
warn!("set_global_default failed - e: {:?}", e.to_string())
}
}
}
}