Skip to main content

tfserver/client/
target_router.rs

1use crate::client::wait_for_data;
2use crate::structures::s_type;
3use crate::structures::s_type::{HandlerMetaAns, HandlerMetaReq, SystemSType};
4use crate::structures::traffic_proc::TrafficProcessorHolder;
5use crate::structures::transport::Transport;
6use futures_util::SinkExt;
7use std::collections::HashMap;
8use std::io;
9use tokio_util::bytes::{Bytes, BytesMut};
10use tokio_util::codec::{Decoder, Encoder, Framed};
11use crate::codec::codec_trait::TfCodec;
12
13#[derive(Debug)]
14pub enum RouterError {
15    Io(io::Error),
16    Protocol(String),
17    Codec(String),
18    ConnectionClosed,
19}
20
21impl From<io::Error> for RouterError {
22    fn from(e: io::Error) -> Self {
23        RouterError::Io(e)
24    }
25}
26
27///The structures that handler router_name/router_id mapping requests
28pub struct TargetRouter {
29    known_routes: HashMap<String, u64>,
30}
31
32impl TargetRouter {
33    pub fn new() -> Self {
34        Self {
35            known_routes: HashMap::new(),
36        }
37    }
38    ///Returns the id of handler, if the route was already requested from server
39    pub fn lookup_route(&self, route: &str) -> Option<u64> {
40        self.known_routes.get(route).cloned()
41    }
42    ///Returns the id of handler, but may request id from server side.
43    pub async fn request_route<
44        C: Encoder<Bytes, Error = io::Error>
45        + Decoder<Item = BytesMut, Error = io::Error>
46        + Clone
47        + Send
48        + Sync
49        + 'static
50        +TfCodec,
51    >(
52        &mut self,
53        route: &str,
54        stream: &mut Framed<Transport, C>,
55        processor: &mut TrafficProcessorHolder<C>,
56    ) -> Result<u64, RouterError> {
57        if let Some(id) = self.lookup_route(route) {
58            return Ok(id);
59        }
60
61        let id = Self::request_route_from_server(route, stream, processor).await?;
62        self.known_routes.insert(route.to_string(), id);
63        Ok(id)
64    }
65
66    async fn request_route_from_server<
67        C: Encoder<Bytes, Error = io::Error>
68        + Decoder<Item = BytesMut, Error = io::Error>
69        + Clone
70        + Send
71        + Sync
72        + 'static
73        +TfCodec,
74    >(
75        name: &str,
76        stream: &mut Framed<Transport, C>,
77        processor: &mut TrafficProcessorHolder<C>,
78    ) -> Result<u64, RouterError> {
79        let meta_req = HandlerMetaReq {
80            s_type: SystemSType::HandlerMetaReq,
81            handler_name: name.to_string(),
82        };
83
84        let serialized = s_type::to_vec(&meta_req)
85            .ok_or(RouterError::Protocol("serialization failed".into()))?;
86
87        let processed = processor.post_process_traffic(serialized).await;
88
89        stream.send(processed.into()).await?;
90
91        let response = wait_for_data(stream)
92            .await
93            .map_err(|_| RouterError::ConnectionClosed)?;
94
95        let mut response = processor.pre_process_traffic(response).await;
96
97        let meta_ans = s_type::from_slice::<HandlerMetaAns>(response.as_mut())
98            .map_err(|e| RouterError::Protocol(e.to_string()))?;
99
100        Ok(meta_ans.id)
101    }
102}