Skip to main content

tfserver/server/
server_router.rs

1use std::collections::{HashMap, HashSet};
2use std::net::{SocketAddr};
3use std::ops::Deref;
4use std::panic::{catch_unwind, AssertUnwindSafe};
5use std::sync::{Arc};
6use tokio::io;
7use tokio::sync::{Mutex};
8use tokio::sync::oneshot::Sender;
9use tokio_util::bytes::{Bytes, BytesMut};
10use tokio_util::codec::{Decoder, Encoder};
11use crate::codec::codec_trait::TfCodec;
12use crate::server::handler::Handler;
13use crate::structures::s_type;
14use crate::structures::s_type::{HandlerMetaAns, HandlerMetaReq, PacketMeta, ServerError, ServerErrorEn, StructureType, SystemSType, TypeContainer, TypeTupple};
15use crate::structures::s_type::ServerErrorEn::InternalError;
16
17
18///Tcp server router.
19///Handles the every data route destination
20pub struct TcpServerRouter<C>
21where
22    C:  Encoder<Bytes, Error = io::Error> + Decoder<Item = BytesMut, Error = io::Error> + Clone + Send  + Sync+ 'static +TfCodec {
23    routes: Arc<HashMap<TypeTupple, Arc<Mutex<dyn Handler<Codec = C>>>>>,
24    routes_text_names: Arc<HashMap<String, u64>>,
25    routes_to_add: Vec<(TypeTupple, (Arc<Mutex<dyn Handler<Codec = C>>>, String))>,
26    router_incremental: u64,
27    routes_commited: bool,
28    user_s_type: Box<dyn StructureType>,
29}
30
31impl<C> TcpServerRouter<C>
32where
33    C: Encoder<Bytes, Error = io::Error> + Decoder<Item = BytesMut, Error = io::Error> + Clone + Send  + Sync + 'static +TfCodec {
34    
35    ///Returns the new instance of router
36    ///
37    /// 'user_s_type' random enum value of current project defined structure_type
38    pub fn new(user_s_type: Box<dyn StructureType>) -> Self {
39        Self {
40            routes: Arc::new(HashMap::new()),
41            routes_text_names: Arc::new(HashMap::new()),
42            routes_to_add: Vec::new(),
43            router_incremental: 0,
44            routes_commited: false,
45            user_s_type,
46        }
47    }
48
49    ///Registers the new handler, for selected structure types
50    ///
51    /// 'handler_name' must be the same on the client site, used for initial identification. When client sends request to server.
52    /// 's_type' handled structure types by current handler.
53    pub fn add_route(
54        &mut self,
55        handler: Arc<Mutex<dyn Handler<Codec = C>>>,
56        handler_name: String,
57        mut s_types: Vec<Box<dyn StructureType>>,
58    ) {
59        if self.routes_commited {
60            return;
61        }
62        let mut s_typess: HashSet<TypeContainer> = HashSet::new();
63        while !s_types.is_empty(){
64            s_typess.insert(TypeContainer::new(s_types.pop().unwrap()));
65        }
66        let types_tupple = TypeTupple {
67            s_types: s_typess,
68            handler_id: self.router_incremental,
69        };
70
71        self.routes_to_add.push((types_tupple, (handler, handler_name)));
72        self.router_incremental += 1;
73    }
74
75    ///Commits the registered handlers. Making the current router is final and ready to be passed to the server
76    pub fn commit_routes(&mut self) {
77        if self.routes_commited || self.routes_to_add.is_empty() {
78            return;
79        }
80
81        let mut routes = HashMap::new();
82        let mut names = HashMap::new();
83
84        for (types, (handler, name)) in self.routes_to_add.drain(..) {
85            routes.insert(types.clone(), handler);
86            names.insert(name, types.handler_id);
87        }
88
89        self.routes = Arc::new(routes);
90        self.routes_text_names = Arc::new(names);
91        self.routes_commited = true;
92    }
93
94
95    pub fn get_routes(&self) -> Arc<HashMap<TypeTupple, Arc<Mutex<dyn Handler<Codec = C>>>>> {
96        self.routes.clone()
97    }
98
99    ///Called from server connection task
100    pub async fn serve_packet(
101        &self,
102        meta: BytesMut,
103        payload: BytesMut,
104        client_meta: (SocketAddr,  &mut Option<Sender<Arc<Mutex<dyn Handler<Codec = C>>>>>),
105    ) -> Result<Vec<u8>, ServerError> {
106        // Try to deserialize normal PacketMeta
107        if let Ok(meta_pack) = s_type::from_slice::<PacketMeta>(&meta) {
108            let s_type = self.user_s_type.get_deserialize_function().deref()(meta_pack.s_type_req);
109            let key = TypeTupple {
110                s_types: HashSet::from([TypeContainer::new(s_type.clone_unique())]),
111                handler_id: meta_pack.handler_id,
112            };
113
114            let handler = self.routes.get(&key).ok_or(ServerError::new(ServerErrorEn::NoSuchHandler(None)))?;
115            let mut handler_lock = handler.lock().await;
116            let res = catch_unwind(AssertUnwindSafe(async || {
117                handler_lock.serve_route(client_meta, s_type, payload).await
118            }));
119
120            return match res {
121                Ok(data) => match data.await{
122                    Ok(data) => Ok(data),
123                    Err(err) => {Err(ServerError::new(ServerErrorEn::InternalError(Some(err.to_vec()))))}
124                },
125                Err(_) => Err(ServerError::new(InternalError(Some("handler died :(".as_bytes().to_vec())))),
126            };
127        }
128
129        // Try to handle as HandlerMetaReq
130        if let Ok(meta_req) = s_type::from_slice::<HandlerMetaReq>(&meta) {
131            if let Some(route_id) = self.routes_text_names.get(&meta_req.handler_name) {
132                let meta_ans = HandlerMetaAns {
133                    s_type: SystemSType::HandlerMetaAns,
134                    id: *route_id,
135                };
136                return Ok(s_type::to_vec(&meta_ans).unwrap());
137            } else {
138                return Err(ServerError::new(ServerErrorEn::NoSuchHandler(None)));
139            }
140        }
141
142        Err(ServerError::new(ServerErrorEn::MalformedMetaInfo(None)))
143    }
144}