tfserver/server/
server_router.rs1use std::collections::{HashMap, HashSet};
2use std::net::{SocketAddr};
3use std::ops::Deref;
4use std::panic::{catch_unwind, AssertUnwindSafe};
5use std::sync::{Arc};
6use tokio::io;
7use tokio::sync::{Mutex};
8use tokio::sync::oneshot::Sender;
9use tokio_util::bytes::{Bytes, BytesMut};
10use tokio_util::codec::{Decoder, Encoder};
11use crate::codec::codec_trait::TfCodec;
12use crate::server::handler::Handler;
13use crate::structures::s_type;
14use crate::structures::s_type::{HandlerMetaAns, HandlerMetaReq, PacketMeta, ServerError, ServerErrorEn, StructureType, SystemSType, TypeContainer, TypeTupple};
15use crate::structures::s_type::ServerErrorEn::InternalError;
16
17
18pub struct TcpServerRouter<C>
21where
22 C: Encoder<Bytes, Error = io::Error> + Decoder<Item = BytesMut, Error = io::Error> + Clone + Send + Sync+ 'static +TfCodec {
23 routes: Arc<HashMap<TypeTupple, Arc<Mutex<dyn Handler<Codec = C>>>>>,
24 routes_text_names: Arc<HashMap<String, u64>>,
25 routes_to_add: Vec<(TypeTupple, (Arc<Mutex<dyn Handler<Codec = C>>>, String))>,
26 router_incremental: u64,
27 routes_commited: bool,
28 user_s_type: Box<dyn StructureType>,
29}
30
31impl<C> TcpServerRouter<C>
32where
33 C: Encoder<Bytes, Error = io::Error> + Decoder<Item = BytesMut, Error = io::Error> + Clone + Send + Sync + 'static +TfCodec {
34
35 pub fn new(user_s_type: Box<dyn StructureType>) -> Self {
39 Self {
40 routes: Arc::new(HashMap::new()),
41 routes_text_names: Arc::new(HashMap::new()),
42 routes_to_add: Vec::new(),
43 router_incremental: 0,
44 routes_commited: false,
45 user_s_type,
46 }
47 }
48
49 pub fn add_route(
54 &mut self,
55 handler: Arc<Mutex<dyn Handler<Codec = C>>>,
56 handler_name: String,
57 mut s_types: Vec<Box<dyn StructureType>>,
58 ) {
59 if self.routes_commited {
60 return;
61 }
62 let mut s_typess: HashSet<TypeContainer> = HashSet::new();
63 while !s_types.is_empty(){
64 s_typess.insert(TypeContainer::new(s_types.pop().unwrap()));
65 }
66 let types_tupple = TypeTupple {
67 s_types: s_typess,
68 handler_id: self.router_incremental,
69 };
70
71 self.routes_to_add.push((types_tupple, (handler, handler_name)));
72 self.router_incremental += 1;
73 }
74
75 pub fn commit_routes(&mut self) {
77 if self.routes_commited || self.routes_to_add.is_empty() {
78 return;
79 }
80
81 let mut routes = HashMap::new();
82 let mut names = HashMap::new();
83
84 for (types, (handler, name)) in self.routes_to_add.drain(..) {
85 routes.insert(types.clone(), handler);
86 names.insert(name, types.handler_id);
87 }
88
89 self.routes = Arc::new(routes);
90 self.routes_text_names = Arc::new(names);
91 self.routes_commited = true;
92 }
93
94
95 pub fn get_routes(&self) -> Arc<HashMap<TypeTupple, Arc<Mutex<dyn Handler<Codec = C>>>>> {
96 self.routes.clone()
97 }
98
99 pub async fn serve_packet(
101 &self,
102 meta: BytesMut,
103 payload: BytesMut,
104 client_meta: (SocketAddr, &mut Option<Sender<Arc<Mutex<dyn Handler<Codec = C>>>>>),
105 ) -> Result<Vec<u8>, ServerError> {
106 if let Ok(meta_pack) = s_type::from_slice::<PacketMeta>(&meta) {
108 let s_type = self.user_s_type.get_deserialize_function().deref()(meta_pack.s_type_req);
109 let key = TypeTupple {
110 s_types: HashSet::from([TypeContainer::new(s_type.clone_unique())]),
111 handler_id: meta_pack.handler_id,
112 };
113
114 let handler = self.routes.get(&key).ok_or(ServerError::new(ServerErrorEn::NoSuchHandler(None)))?;
115 let mut handler_lock = handler.lock().await;
116 let res = catch_unwind(AssertUnwindSafe(async || {
117 handler_lock.serve_route(client_meta, s_type, payload).await
118 }));
119
120 return match res {
121 Ok(data) => match data.await{
122 Ok(data) => Ok(data),
123 Err(err) => {Err(ServerError::new(ServerErrorEn::InternalError(Some(err.to_vec()))))}
124 },
125 Err(_) => Err(ServerError::new(InternalError(Some("handler died :(".as_bytes().to_vec())))),
126 };
127 }
128
129 if let Ok(meta_req) = s_type::from_slice::<HandlerMetaReq>(&meta) {
131 if let Some(route_id) = self.routes_text_names.get(&meta_req.handler_name) {
132 let meta_ans = HandlerMetaAns {
133 s_type: SystemSType::HandlerMetaAns,
134 id: *route_id,
135 };
136 return Ok(s_type::to_vec(&meta_ans).unwrap());
137 } else {
138 return Err(ServerError::new(ServerErrorEn::NoSuchHandler(None)));
139 }
140 }
141
142 Err(ServerError::new(ServerErrorEn::MalformedMetaInfo(None)))
143 }
144}