tfserver/server/
server_router.rs1use std::collections::{HashMap, HashSet};
2use std::net::{SocketAddr};
3use std::ops::Deref;
4use std::panic::AssertUnwindSafe;
5use std::sync::{Arc};
6use futures_util::FutureExt;
7use tokio::io;
8use tokio::sync::{RwLock};
9use tokio::sync::oneshot::Sender;
10use tokio_util::bytes::{Bytes, BytesMut};
11use tokio_util::codec::{Decoder, Encoder};
12use crate::codec::codec_trait::TfCodec;
13use crate::server::handler::Handler;
14use crate::structures::s_type;
15use crate::structures::s_type::{HandlerMetaAns, HandlerMetaReq, PacketMeta, ServerError, ServerErrorEn, StructureType, SystemSType, TypeContainer, TypeTuple};
16use crate::structures::s_type::ServerErrorEn::InternalError;
17
18
19pub struct TcpServerRouter<C>
22where
23 C: Encoder<Bytes, Error = io::Error> + Decoder<Item = BytesMut, Error = io::Error> + Clone + Send + Sync+ 'static +TfCodec {
24 routes: Arc<HashMap<TypeTuple, Arc<RwLock<dyn Handler<Codec = C>>>>>,
25 routes_text_names: Arc<HashMap<String, u64>>,
26 routes_to_add: Vec<(TypeTuple, (Arc<RwLock<dyn Handler<Codec = C>>>, String))>,
27 router_incremental: u64,
28 routes_commited: bool,
29 user_s_type: Box<dyn StructureType>,
30}
31
32impl<C> TcpServerRouter<C>
33where
34 C: Encoder<Bytes, Error = io::Error> + Decoder<Item = BytesMut, Error = io::Error> + Clone + Send + Sync + 'static +TfCodec {
35
36 pub fn new(user_s_type: Box<dyn StructureType>) -> Self {
40 Self {
41 routes: Arc::new(HashMap::new()),
42 routes_text_names: Arc::new(HashMap::new()),
43 routes_to_add: Vec::new(),
44 router_incremental: 0,
45 routes_commited: false,
46 user_s_type,
47 }
48 }
49
50 pub fn add_route(
55 &mut self,
56 handler: Arc<RwLock<dyn Handler<Codec = C>>>,
57 handler_name: String,
58 mut s_types: Vec<Box<dyn StructureType>>,
59 ) {
60 if self.routes_commited {
61 return;
62 }
63 let mut s_typess: HashSet<TypeContainer> = HashSet::new();
64 while !s_types.is_empty(){
65 s_typess.insert(TypeContainer::new(s_types.pop().unwrap()));
66 }
67 let types_tupple = TypeTuple {
68 s_types: s_typess,
69 handler_id: self.router_incremental,
70 };
71
72 self.routes_to_add.push((types_tupple, (handler, handler_name)));
73 self.router_incremental += 1;
74 }
75
76 pub fn commit_routes(&mut self) {
78 if self.routes_commited || self.routes_to_add.is_empty() {
79 return;
80 }
81
82 let mut routes = HashMap::new();
83 let mut names = HashMap::new();
84
85 for (types, (handler, name)) in self.routes_to_add.drain(..) {
86 routes.insert(types.clone(), handler);
87 names.insert(name, types.handler_id);
88 }
89
90 self.routes = Arc::new(routes);
91 self.routes_text_names = Arc::new(names);
92 self.routes_commited = true;
93 }
94
95
96 pub fn get_routes(&self) -> Arc<HashMap<TypeTuple, Arc<RwLock<dyn Handler<Codec = C>>>>> {
97 self.routes.clone()
98 }
99
100 pub async fn serve_packet(
102 &self,
103 meta: BytesMut,
104 payload: BytesMut,
105 client_meta: (SocketAddr, &mut Option<Sender<Arc<RwLock<dyn Handler<Codec = C>>>>>),
106 ) -> Result<Vec<u8>, ServerError> {
107 if let Ok(meta_pack) = s_type::from_slice::<PacketMeta>(&meta) {
109 let s_type = self.user_s_type.get_deserialize_function().deref()(meta_pack.s_type_req);
110 let key = TypeTuple {
111 s_types: HashSet::from([TypeContainer::new(s_type.clone_unique())]),
112 handler_id: meta_pack.handler_id,
113 };
114
115 let handler = self.routes.get(&key).ok_or(ServerError::new(ServerErrorEn::NoSuchHandler(None)))?;
116 let mut handler_lock = handler.write().await;
117 let res = AssertUnwindSafe(
118 handler_lock.serve_route(client_meta, s_type, payload)
119 )
120 .catch_unwind().await;
121 return match res {
122 Ok(data) => match data{
123 Ok(data) => Ok(data),
124 Err(err) => {Err(ServerError::new(ServerErrorEn::InternalError(Some(err.to_vec()))))}
125 },
126 Err(_) => Err(ServerError::new(InternalError(Some("handler died :(".as_bytes().to_vec())))),
127 };
128 }
129
130 if let Ok(meta_req) = s_type::from_slice::<HandlerMetaReq>(&meta) {
132 if let Some(route_id) = self.routes_text_names.get(&meta_req.handler_name) {
133 let meta_ans = HandlerMetaAns {
134 s_type: SystemSType::HandlerMetaAns,
135 id: *route_id,
136 };
137 return Ok(s_type::to_vec(&meta_ans).unwrap());
138 } else {
139 return Err(ServerError::new(ServerErrorEn::NoSuchHandler(None)));
140 }
141 }
142
143 Err(ServerError::new(ServerErrorEn::MalformedMetaInfo(None)))
144 }
145}