tfserver/server/
server_router.rs1use std::collections::{HashMap, HashSet};
2use std::net::{SocketAddr};
3use std::ops::Deref;
4use std::panic::AssertUnwindSafe;
5use std::sync::{Arc};
6use futures_util::FutureExt;
7use tokio::sync::{RwLock};
8use tokio::sync::oneshot::Sender;
9use tokio_util::bytes::{BytesMut};
10use crate::codec::codec_trait::TfCodec;
11use crate::server::handler::Handler;
12use crate::structures::s_type;
13use crate::structures::s_type::{HandlerMetaAns, HandlerMetaReq, PacketMeta, ServerError, ServerErrorEn, StructureType, SystemSType, TypeContainer, TypeTuple};
14use crate::structures::s_type::ServerErrorEn::InternalError;
15
16
17pub struct TfServerRouter<C>
20where
21 C: TfCodec {
22 routes: Arc<HashMap<TypeTuple, Arc<RwLock<dyn Handler<Codec = C>>>>>,
23 routes_text_names: Arc<HashMap<String, u64>>,
24 routes_to_add: Vec<(TypeTuple, (Arc<RwLock<dyn Handler<Codec = C>>>, String))>,
25 router_incremental: u64,
26 routes_commited: bool,
27 user_s_type: Box<dyn StructureType>,
28}
29
30impl<C> TfServerRouter<C>
31where
32 C: TfCodec {
33
34 pub fn new(user_s_type: Box<dyn StructureType>) -> Self {
38 Self {
39 routes: Arc::new(HashMap::new()),
40 routes_text_names: Arc::new(HashMap::new()),
41 routes_to_add: Vec::new(),
42 router_incremental: 0,
43 routes_commited: false,
44 user_s_type,
45 }
46 }
47
48 pub fn add_route(
53 &mut self,
54 handler: Arc<RwLock<dyn Handler<Codec = C>>>,
55 handler_name: String,
56 mut s_types: Vec<Box<dyn StructureType>>,
57 ) {
58 if self.routes_commited {
59 return;
60 }
61 let mut s_typess: HashSet<TypeContainer> = HashSet::new();
62 while !s_types.is_empty(){
63 s_typess.insert(TypeContainer::new(s_types.pop().unwrap()));
64 }
65 let types_tupple = TypeTuple {
66 s_types: s_typess,
67 handler_id: self.router_incremental,
68 };
69
70 self.routes_to_add.push((types_tupple, (handler, handler_name)));
71 self.router_incremental += 1;
72 }
73
74 pub fn commit_routes(&mut self) {
76 if self.routes_commited || self.routes_to_add.is_empty() {
77 return;
78 }
79
80 let mut routes = HashMap::new();
81 let mut names = HashMap::new();
82
83 for (types, (handler, name)) in self.routes_to_add.drain(..) {
84 routes.insert(types.clone(), handler);
85 names.insert(name, types.handler_id);
86 }
87
88 self.routes = Arc::new(routes);
89 self.routes_text_names = Arc::new(names);
90 self.routes_commited = true;
91 }
92
93
94 pub fn get_routes(&self) -> Arc<HashMap<TypeTuple, Arc<RwLock<dyn Handler<Codec = C>>>>> {
95 self.routes.clone()
96 }
97
98 pub async fn serve_packet(
100 &self,
101 meta: BytesMut,
102 payload: BytesMut,
103 client_meta: (SocketAddr, &mut Option<Sender<Arc<RwLock<dyn Handler<Codec = C>>>>>),
104 ) -> Result<Vec<u8>, ServerError> {
105 if let Ok(meta_pack) = s_type::from_slice::<PacketMeta>(&meta) {
107 let s_type = self.user_s_type.get_deserialize_function().deref()(meta_pack.s_type_req);
108 let key = TypeTuple {
109 s_types: HashSet::from([TypeContainer::new(s_type.clone_unique())]),
110 handler_id: meta_pack.handler_id,
111 };
112
113 let handler = self.routes.get(&key).ok_or(ServerError::new(ServerErrorEn::NoSuchHandler(None)))?;
114 let mut handler_lock = handler.write().await;
115 let res = AssertUnwindSafe(
116 handler_lock.serve_route(client_meta, s_type, payload)
117 )
118 .catch_unwind().await;
119 return match res {
120 Ok(data) => match data{
121 Ok(data) => Ok(data),
122 Err(err) => {Err(ServerError::new(ServerErrorEn::InternalError(Some(err.to_vec()))))}
123 },
124 Err(_) => Err(ServerError::new(InternalError(Some("handler died :(".as_bytes().to_vec())))),
125 };
126 }
127
128 if let Ok(meta_req) = s_type::from_slice::<HandlerMetaReq>(&meta) {
130 if let Some(route_id) = self.routes_text_names.get(&meta_req.handler_name) {
131 let meta_ans = HandlerMetaAns {
132 s_type: SystemSType::HandlerMetaAns,
133 id: *route_id,
134 };
135 return Ok(s_type::to_vec(&meta_ans).unwrap());
136 } else {
137 return Err(ServerError::new(ServerErrorEn::NoSuchHandler(None)));
138 }
139 }
140
141 Err(ServerError::new(ServerErrorEn::MalformedMetaInfo(None)))
142 }
143}