mod common_api;
mod lynn_server_config;
mod lynn_server_user;
mod tcp_reactor;
use std::{
collections::HashMap,
net::{SocketAddr, ToSocketAddrs},
sync::Arc,
};
use common_api::spawn_check_heart;
use crossbeam_deque::Injector;
use dashmap::DashMap;
use lynn_server_config::LynnServerConfig;
use lynn_server_user::LynnUser;
use tokio::{net::TcpListener, sync::RwLock};
use tracing::{Level, error, info, warn};
use tracing_subscriber::fmt;
#[cfg(feature = "server")]
use crate::app::tcp_reactor::TcpReactor;
use crate::{
app::tcp_reactor::event_api::ReactorEvent,
const_config::{SERVER_MESSAGE_HEADER_MARK, SERVER_MESSAGE_TAIL_MARK},
handler::{HandlerContext, IHandler, IntoSystem},
};
pub mod lynn_config_api {
pub use super::lynn_server_config::LynnServerConfig;
pub use super::lynn_server_config::LynnServerConfigBuilder;
}
pub(crate) mod event_api {
pub(crate) use super::tcp_reactor::*;
}
#[cfg(feature = "server")]
pub struct LynnServer<'a> {
clients: ClientsStruct,
router_map_async: RouterMapAsyncStruct,
router_maps: RouterMapsStruct,
lynn_config: LynnServerConfig<'a>,
reactor: TcpReactor,
}
pub(crate) type ClientsStructType = Arc<DashMap<SocketAddr, LynnUser>>;
#[derive(Clone)]
pub(crate) struct ClientsStruct(pub(crate) ClientsStructType);
struct RouterMapAsyncStruct(Arc<Option<HashMap<u16, Arc<AsyncFunc>>>>);
struct RouterMapsStruct(Option<HashMap<u16, Arc<AsyncFunc>>>);
pub(crate) type AsyncFunc = Box<dyn IHandler>;
type TaskBodyOutChannel = (Arc<AsyncFunc>, HandlerContext, ClientsStructType);
pub(crate) type ReactorEventSender = Arc<Injector<ReactorEvent>>;
impl<'a> LynnServer<'a> {
pub async fn new() -> Self {
let lynn_config = LynnServerConfig::default();
let app = Self {
clients: ClientsStruct(Arc::new(DashMap::new())),
router_map_async: RouterMapAsyncStruct(Arc::new(None)),
router_maps: RouterMapsStruct(None),
lynn_config,
reactor: TcpReactor::new(),
};
app
}
#[deprecated(note = "use `new_with_addr`", since = "1.1.7")]
pub async fn new_with_ipv4(ipv4: &'a str) -> Self {
let mut app = Self::new().await;
app.lynn_config.server_addr = ipv4.to_socket_addrs().unwrap().next().unwrap();
app
}
pub async fn new_with_addr<T>(addr: T) -> Self
where
T: ToSocketAddrs,
{
let mut app = Self::new().await;
app.lynn_config.server_addr = addr.to_socket_addrs().unwrap().next().unwrap();
app
}
pub async fn new_with_config(lynn_config: LynnServerConfig<'a>) -> Self {
let mut app = Self::new().await;
app.lynn_config = lynn_config;
app
}
pub fn add_router<Param>(mut self, method_id: u16, handler: impl IntoSystem<Param>) -> Self {
if let Some(ref mut map) = self.router_maps.0 {
map.insert(method_id, Arc::new(Box::new(handler.to_system())));
} else {
let mut map: HashMap<u16, Arc<Box<dyn IHandler>>> = HashMap::new();
map.insert(method_id, Arc::new(Box::new(handler.to_system())));
self.router_maps.0 = Some(map);
}
self
}
async fn synchronous_router(&mut self) {
self.router_map_async.0 = Arc::new(self.router_maps.0.clone());
self.router_maps.0 = None;
}
async fn check_heart(&self) {
let clients = self.clients.0.clone();
let server_check_heart_interval =
self.lynn_config.get_server_check_heart_interval().clone();
let server_check_heart_timeout_time = self
.lynn_config
.get_server_check_heart_timeout_time()
.clone();
spawn_check_heart(
server_check_heart_interval,
server_check_heart_timeout_time,
clients,
);
}
pub async fn start(mut self: Self) {
self.synchronous_router().await;
self.init_marks().await;
let server_arc = Arc::new(self);
if let Err(e) = server_arc.run().await {
error!("{}", e);
}
}
async fn run(self: Arc<Self>) -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind(self.lynn_config.get_server_addr()).await?;
info!(
"Server - [Main-LynnServer] start success!!! with [server_addr:{}]",
self.lynn_config.get_server_addr()
);
self.check_heart().await;
self.reactor
.start(
self.clients.0.clone(),
self.lynn_config.get_server_single_processs_permit(),
*self.lynn_config.get_message_header_mark(),
*self.lynn_config.get_message_tail_mark(),
self.router_map_async.0.clone(),
listener,
self.lynn_config.get_server_max_connections(),
self.lynn_config.get_server_max_reactor_taskpool_size(),
)
.await;
Ok(())
}
async fn init_marks(&self) {
SERVER_MESSAGE_HEADER_MARK.get_or_init(|| *self.lynn_config.get_message_header_mark());
SERVER_MESSAGE_TAIL_MARK.get_or_init(|| *self.lynn_config.get_message_tail_mark());
}
#[cfg(feature = "server")]
pub fn log_server(&self) {
let subscriber = fmt::Subscriber::builder()
.with_max_level(Level::INFO)
.finish();
match tracing::subscriber::set_global_default(subscriber) {
Ok(_) => {
info!("Server - [log server] start sucess!!!")
}
Err(e) => {
warn!("set_global_default failed - e: {:?}", e.to_string())
}
}
}
}