mod common_api;
mod lynn_server_config;
mod lynn_server_user;
mod router;
mod tcp_reactor;
use std::{
net::{SocketAddr, ToSocketAddrs},
sync::Arc,
};
use common_api::spawn_check_heart;
use crossbeam_deque::Injector;
use dashmap::DashMap;
use lynn_server_config::LynnServerConfig;
use lynn_server_user::LynnUser;
use tokio::net::TcpListener;
use tracing::{Level, error, info, warn};
use tracing_subscriber::fmt;
#[cfg(feature = "server")]
use crate::app::{router::LynnRouter, tcp_reactor::TcpReactor};
use crate::{
app::tcp_reactor::event_api::ReactorEvent,
const_config::{SERVER_MESSAGE_HEADER_MARK, SERVER_MESSAGE_TAIL_MARK},
handler::{HandlerContext, IHandler, IntoSystem},
};
pub mod lynn_config_api {
pub use super::lynn_server_config::LynnServerConfig;
pub use super::lynn_server_config::LynnServerConfigBuilder;
}
pub(crate) mod event_api {
pub(crate) use super::tcp_reactor::*;
}
#[cfg(feature = "server")]
pub struct LynnServer<'a> {
clients: ClientsStruct,
lynn_router: Arc<LynnRouter>,
lynn_config: LynnServerConfig<'a>,
reactor: TcpReactor,
}
pub(crate) type ClientsStructType = Arc<DashMap<SocketAddr, LynnUser>>;
#[derive(Clone)]
pub(crate) struct ClientsStruct(pub(crate) ClientsStructType);
pub(crate) type AsyncFunc = Box<dyn IHandler>;
type TaskBodyOutChannel = (Arc<AsyncFunc>, HandlerContext, ClientsStructType);
pub(crate) type ReactorEventSender = Arc<Injector<ReactorEvent>>;
impl<'a> LynnServer<'a> {
pub async fn new() -> Self {
let lynn_config = LynnServerConfig::default();
let app = Self {
clients: ClientsStruct(Arc::new(DashMap::new())),
lynn_router: Arc::new(LynnRouter::new()),
lynn_config,
reactor: TcpReactor::new(),
};
app
}
#[deprecated(note = "use `new_with_addr`", since = "1.1.7")]
pub async fn new_with_ipv4(ipv4: &'a str) -> Self {
let mut app = Self::new().await;
app.lynn_config.server_addr = ipv4.to_socket_addrs().unwrap().next().unwrap();
app
}
pub async fn new_with_addr<T>(addr: T) -> Self
where
T: ToSocketAddrs,
{
let mut app = Self::new().await;
app.lynn_config.server_addr = addr.to_socket_addrs().unwrap().next().unwrap();
app
}
pub async fn new_with_config(lynn_config: LynnServerConfig<'a>) -> Self {
let mut app = Self::new().await;
app.lynn_config = lynn_config;
app
}
pub fn add_router<Param>(mut self, method_id: u16, handler: impl IntoSystem<Param>) -> Self {
self.lynn_router.add_router(method_id, handler);
self
}
async fn check_heart(&self) {
let clients = self.clients.0.clone();
let server_check_heart_interval =
self.lynn_config.get_server_check_heart_interval().clone();
let server_check_heart_timeout_time = self
.lynn_config
.get_server_check_heart_timeout_time()
.clone();
spawn_check_heart(
server_check_heart_interval,
server_check_heart_timeout_time,
clients,
);
}
pub async fn start(mut self: Self) {
self.init_marks().await;
let server_arc = Arc::new(self);
if let Err(e) = server_arc.run().await {
error!("{}", e);
}
}
async fn run(self: Arc<Self>) -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind(self.lynn_config.get_server_addr()).await?;
info!(
"Server - [Main-LynnServer] start success!!! with [server_addr:{}]",
self.lynn_config.get_server_addr()
);
self.check_heart().await;
self.reactor
.start(
self.clients.0.clone(),
self.lynn_config.get_server_single_processs_permit(),
*self.lynn_config.get_message_header_mark(),
*self.lynn_config.get_message_tail_mark(),
self.lynn_router.clone(),
listener,
self.lynn_config.get_server_max_connections(),
self.lynn_config.get_server_max_reactor_taskpool_size(),
)
.await;
Ok(())
}
async fn init_marks(&self) {
SERVER_MESSAGE_HEADER_MARK.get_or_init(|| *self.lynn_config.get_message_header_mark());
SERVER_MESSAGE_TAIL_MARK.get_or_init(|| *self.lynn_config.get_message_tail_mark());
}
#[cfg(feature = "server")]
pub fn log_server(&self) {
let subscriber = fmt::Subscriber::builder()
.with_max_level(Level::INFO)
.finish();
match tracing::subscriber::set_global_default(subscriber) {
Ok(_) => {
info!("Server - [log server] start sucess!!!")
}
Err(e) => {
warn!("set_global_default failed - e: {:?}", e.to_string())
}
}
}
}