Skip to main content

tfserver/server/
server_router.rs

1use std::collections::{HashMap, HashSet};
2use std::net::{SocketAddr};
3use std::ops::Deref;
4use std::panic::AssertUnwindSafe;
5use std::sync::{Arc};
6use futures_util::FutureExt;
7use tokio::io;
8use tokio::sync::{RwLock};
9use tokio::sync::oneshot::Sender;
10use tokio_util::bytes::{Bytes, BytesMut};
11use tokio_util::codec::{Decoder, Encoder};
12use crate::codec::codec_trait::TfCodec;
13use crate::server::handler::Handler;
14use crate::structures::s_type;
15use crate::structures::s_type::{HandlerMetaAns, HandlerMetaReq, PacketMeta, ServerError, ServerErrorEn, StructureType, SystemSType, TypeContainer, TypeTuple};
16use crate::structures::s_type::ServerErrorEn::InternalError;
17
18
19///Tcp server router.
20///Handles the every data route destination
21pub struct TcpServerRouter<C>
22where
23    C:  Encoder<Bytes, Error = io::Error> + Decoder<Item = BytesMut, Error = io::Error> + Clone + Send  + Sync+ 'static +TfCodec {
24    routes: Arc<HashMap<TypeTuple, Arc<RwLock<dyn Handler<Codec = C>>>>>,
25    routes_text_names: Arc<HashMap<String, u64>>,
26    routes_to_add: Vec<(TypeTuple, (Arc<RwLock<dyn Handler<Codec = C>>>, String))>,
27    router_incremental: u64,
28    routes_commited: bool,
29    user_s_type: Box<dyn StructureType>,
30}
31
32impl<C> TcpServerRouter<C>
33where
34    C: Encoder<Bytes, Error = io::Error> + Decoder<Item = BytesMut, Error = io::Error> + Clone + Send  + Sync + 'static +TfCodec {
35    
36    ///Returns the new instance of router
37    ///
38    /// 'user_s_type' random enum value of current project defined structure_type
39    pub fn new(user_s_type: Box<dyn StructureType>) -> Self {
40        Self {
41            routes: Arc::new(HashMap::new()),
42            routes_text_names: Arc::new(HashMap::new()),
43            routes_to_add: Vec::new(),
44            router_incremental: 0,
45            routes_commited: false,
46            user_s_type,
47        }
48    }
49
50    ///Registers the new handler, for selected structure types
51    ///
52    /// 'handler_name' must be the same on the client site, used for initial identification. When client sends request to server.
53    /// 's_type' handled structure types by current handler.
54    pub fn add_route(
55        &mut self,
56        handler: Arc<RwLock<dyn Handler<Codec = C>>>,
57        handler_name: String,
58        mut s_types: Vec<Box<dyn StructureType>>,
59    ) {
60        if self.routes_commited {
61            return;
62        }
63        let mut s_typess: HashSet<TypeContainer> = HashSet::new();
64        while !s_types.is_empty(){
65            s_typess.insert(TypeContainer::new(s_types.pop().unwrap()));
66        }
67        let types_tupple = TypeTuple {
68            s_types: s_typess,
69            handler_id: self.router_incremental,
70        };
71
72        self.routes_to_add.push((types_tupple, (handler, handler_name)));
73        self.router_incremental += 1;
74    }
75
76    ///Commits the registered handlers. Making the current router is final and ready to be passed to the server
77    pub fn commit_routes(&mut self) {
78        if self.routes_commited || self.routes_to_add.is_empty() {
79            return;
80        }
81
82        let mut routes = HashMap::new();
83        let mut names = HashMap::new();
84
85        for (types, (handler, name)) in self.routes_to_add.drain(..) {
86            routes.insert(types.clone(), handler);
87            names.insert(name, types.handler_id);
88        }
89
90        self.routes = Arc::new(routes);
91        self.routes_text_names = Arc::new(names);
92        self.routes_commited = true;
93    }
94
95
96    pub fn get_routes(&self) -> Arc<HashMap<TypeTuple, Arc<RwLock<dyn Handler<Codec = C>>>>> {
97        self.routes.clone()
98    }
99
100    ///Called from server connection task
101    pub async fn serve_packet(
102        &self,
103        meta: BytesMut,
104        payload: BytesMut,
105        client_meta: (SocketAddr,  &mut Option<Sender<Arc<RwLock<dyn Handler<Codec = C>>>>>),
106    ) -> Result<Vec<u8>, ServerError> {
107        // Try to deserialize normal PacketMeta
108        if let Ok(meta_pack) = s_type::from_slice::<PacketMeta>(&meta) {
109            let s_type = self.user_s_type.get_deserialize_function().deref()(meta_pack.s_type_req);
110            let key = TypeTuple {
111                s_types: HashSet::from([TypeContainer::new(s_type.clone_unique())]),
112                handler_id: meta_pack.handler_id,
113            };
114
115            let handler = self.routes.get(&key).ok_or(ServerError::new(ServerErrorEn::NoSuchHandler(None)))?;
116            let mut handler_lock = handler.write().await;
117            let res = AssertUnwindSafe(
118                handler_lock.serve_route(client_meta, s_type, payload)
119            )
120                .catch_unwind().await;
121            return match res {
122                Ok(data) => match data{
123                    Ok(data) => Ok(data),
124                    Err(err) => {Err(ServerError::new(ServerErrorEn::InternalError(Some(err.to_vec()))))}
125                },
126                Err(_) => Err(ServerError::new(InternalError(Some("handler died :(".as_bytes().to_vec())))),
127            };
128        }
129
130        // Try to handle as HandlerMetaReq
131        if let Ok(meta_req) = s_type::from_slice::<HandlerMetaReq>(&meta) {
132            if let Some(route_id) = self.routes_text_names.get(&meta_req.handler_name) {
133                let meta_ans = HandlerMetaAns {
134                    s_type: SystemSType::HandlerMetaAns,
135                    id: *route_id,
136                };
137                return Ok(s_type::to_vec(&meta_ans).unwrap());
138            } else {
139                return Err(ServerError::new(ServerErrorEn::NoSuchHandler(None)));
140            }
141        }
142
143        Err(ServerError::new(ServerErrorEn::MalformedMetaInfo(None)))
144    }
145}