use crate::{
config::{GateWayConfig, HandlerInfo, PathConfig},
route::{path::PathRoute, trie::Trie},
support::{Context, DisruptionStatus},
};
use common::{BaseHandler_, Handler, RequestHandler_, ResponseHandler_};
use default::DefaultBaseHandler;
use gateway_common::{
error::BoxError,
utils::{async_cache::AsyncCache, async_map::AsyncMap, date_util::get_now_date_time_as_millis},
};
use std::{collections::HashMap, fmt::Debug, sync::Arc};
use tokio::sync::mpsc::Receiver;
use tracing::{info, instrument};
pub mod common;
pub mod default;
pub struct HandlerResource {
pub path: String,
pub path_config: Arc<PathConfig>,
pub request_handlers: Vec<(&'static dyn RequestHandler_, Option<String>)>,
pub response_handlers: Vec<(&'static dyn ResponseHandler_, Option<String>)>,
pub base_handler: (&'static dyn BaseHandler_, Option<String>),
}
impl core::fmt::Debug for HandlerResource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HandlerResource")
.field("path", &self.path)
.field("path_config", &self.path_config)
.field("request_handlers", &"...")
.field("response_handlers", &"...")
.field("base_handler", &"...")
.finish()
}
}
pub struct GateWayHandler {
path_route: AsyncCache<Arc<PathRoute>>,
}
impl Debug for GateWayHandler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GateWayHandler")
.field("path_route", &"...")
.finish()
}
}
impl GateWayHandler {
pub async fn new(
config: GateWayConfig,
mut listener: Receiver<GateWayConfig>,
mut handlers: Vec<Handler>,
) -> Result<Self, BoxError> {
let async_path_route = AsyncCache::new();
handlers.push(Handler::new(
"DefaultBaseHandler".to_owned(),
common::HandlerInvoker::BaseHandler(Box::leak(Box::<DefaultBaseHandler>::default())),
));
let mut map = HashMap::new();
for handler in handlers {
map.insert(handler.get_key(), handler);
}
let path_route = get_path_route(&map, config);
let _ = async_path_route.insert(path_route).await;
let async_path_route_clone = async_path_route.clone();
tokio::spawn(async move {
while let Some(config) = listener.recv().await {
info!("listener gatewayconfig change");
let _ = async_path_route_clone
.insert(get_path_route(&map, config))
.await;
info!("listener gatewayconfig done");
}
});
Ok(Self {
path_route: async_path_route,
})
}
#[instrument]
pub async fn handler(&self, mut context: Context) -> Context {
let start_time = get_now_date_time_as_millis();
info!(message = format!("start handler : {:?}", context));
let handler_resource = self
.path_route
.get()
.await
.unwrap()
.unwrap()
.search(context.get_request().get_path())
.await;
let Some((handler_resource, fields)) = handler_resource else {
info!("request not find : {:?}", context);
return context;
};
context.insert_path_config(handler_resource.path_config.clone());
if let Some(fields) = fields {
let _ = fields.into_iter().map(|(key, value)| {
context
.get_mut_request()
.get_mut_parameters()
.insert(key, value)
});
}
for (request_handler, config) in &handler_resource.request_handlers {
context = request_handler.handler_(context, config.clone()).await;
if !context.eq_disruption(DisruptionStatus::Normal) {
info!("disruption request");
break;
}
}
if context.eq_disruption(DisruptionStatus::Normal) {
let (base_handler, config) = &handler_resource.base_handler;
context = base_handler.handler_(context, config.clone()).await;
}
for (response_handler, config) in &handler_resource.response_handlers {
if context.eq_disruption(DisruptionStatus::Flush) {
info!("disruption response");
break;
}
context = response_handler.handler_(context, config.clone()).await;
}
info!(
message = format!("end handler : {:?}", context),
et = format!("{}", get_now_date_time_as_millis() - start_time)
);
context
}
}
fn get_request_handler(
handler_infos: Vec<HandlerInfo>,
map: &HashMap<String, Handler>,
) -> Vec<(&'static dyn RequestHandler_, Option<String>)> {
let mut handlers = vec![];
for handler_info in handler_infos {
if let Some(handler) = map.get(&format!("{}:{}", handler_info.get_id(), "RequestHandler")) {
if let Some(handler) = handler.get_request_handler() {
handlers.push((handler, handler_info.get_config().map(|e| e.to_string())));
}
}
}
handlers
}
fn get_response_handler(
handler_infos: Vec<HandlerInfo>,
map: &HashMap<String, Handler>,
) -> Vec<(&'static dyn ResponseHandler_, Option<String>)> {
let mut handlers = vec![];
for handler_info in handler_infos {
if let Some(handler) = map.get(&format!("{}:{}", handler_info.get_id(), "ResponseHandler"))
{
if let Some(handler) = handler.get_response_handler() {
handlers.push((handler, handler_info.get_config().map(|e| e.to_string())));
}
}
}
handlers
}
fn get_base_handler(
handler_info: HandlerInfo,
map: &HashMap<String, Handler>,
) -> (&'static dyn BaseHandler_, Option<String>) {
if let Some(handler) = map.get(&format!("{}:{}", handler_info.get_id(), "BaseHandler")) {
if let Some(handler) = handler.get_base_handler() {
return (handler, handler_info.get_config().map(|e| e.to_string()));
}
};
(
map.get("DefaultBaseHandler:BaseHandler")
.unwrap()
.get_base_handler()
.unwrap(),
None,
)
}
fn get_path_route(map: &HashMap<String, Handler>, config: GateWayConfig) -> Arc<PathRoute> {
let group_config = config.group;
let mut trie = Trie::default();
for item in group_config {
let path_configs = item.resolving();
for path_config in path_configs {
let path: String = path_config.url.clone();
let request_handlers = get_request_handler(path_config.request_handlers.clone(), map);
let response_handlers =
get_response_handler(path_config.response_handlers.clone(), map);
let base_handler = get_base_handler(path_config.base_handler.clone(), map);
let handler_resource = HandlerResource {
path,
path_config: Arc::new(path_config),
request_handlers,
response_handlers,
base_handler,
};
trie.insert(Arc::new(handler_resource));
}
}
Arc::new(PathRoute {
one_cache: AsyncMap::default(),
trie,
})
}