Skip to main content

tfserver/client/
target_router.rs

1use crate::client::wait_for_data;
2use crate::structures::s_type;
3use crate::structures::s_type::{HandlerMetaAns, HandlerMetaReq, SystemSType};
4use crate::structures::traffic_proc::TrafficProcessorHolder;
5use crate::structures::transport::Transport;
6use futures_util::SinkExt;
7use std::collections::HashMap;
8use std::io;
9use tokio_util::codec::{Framed};
10use crate::codec::codec_trait::TfCodec;
11
12#[derive(Debug)]
13pub enum RouterError {
14    Io(io::Error),
15    Protocol(String),
16    Codec(String),
17    ConnectionClosed,
18}
19
20impl From<io::Error> for RouterError {
21    fn from(e: io::Error) -> Self {
22        RouterError::Io(e)
23    }
24}
25
26///The structures that handler router_name/router_id mapping requests
27pub struct TargetRouter {
28    known_routes: HashMap<String, u64>,
29}
30
31impl TargetRouter {
32    pub fn new() -> Self {
33        Self {
34            known_routes: HashMap::new(),
35        }
36    }
37    ///Returns the id of handler, if the route was already requested from server
38    pub fn lookup_route(&self, route: &str) -> Option<u64> {
39        self.known_routes.get(route).cloned()
40    }
41    ///Returns the id of handler, but may request id from server side.
42    pub async fn request_route<
43        C: TfCodec,
44    >(
45        &mut self,
46        route: &str,
47        stream: &mut Framed<Transport, C>,
48        processor: &mut TrafficProcessorHolder<C>,
49    ) -> Result<u64, RouterError> {
50        if let Some(id) = self.lookup_route(route) {
51            return Ok(id);
52        }
53
54        let id = Self::request_route_from_server(route, stream, processor).await?;
55        self.known_routes.insert(route.to_string(), id);
56        Ok(id)
57    }
58
59    async fn request_route_from_server<
60        C: TfCodec,
61    >(
62        name: &str,
63        stream: &mut Framed<Transport, C>,
64        processor: &mut TrafficProcessorHolder<C>,
65    ) -> Result<u64, RouterError> {
66        let meta_req = HandlerMetaReq {
67            s_type: SystemSType::HandlerMetaReq,
68            handler_name: name.to_string(),
69        };
70
71        let serialized = s_type::to_vec(&meta_req)
72            .ok_or(RouterError::Protocol("serialization failed".into()))?;
73
74        let processed = processor.post_process_traffic(serialized).await;
75
76        stream.send(processed.into()).await?;
77
78        let response = wait_for_data(stream)
79            .await
80            .map_err(|_| RouterError::ConnectionClosed)?;
81
82        let mut response = processor.pre_process_traffic(response).await;
83
84        let meta_ans = s_type::from_slice::<HandlerMetaAns>(response.as_mut())
85            .map_err(|e| RouterError::Protocol(e.to_string()))?;
86
87        Ok(meta_ans.id)
88    }
89}