tfserver/client/
target_router.rs1use crate::client::wait_for_data;
2use crate::structures::s_type;
3use crate::structures::s_type::{HandlerMetaAns, HandlerMetaReq, SystemSType};
4use crate::structures::traffic_proc::TrafficProcessorHolder;
5use crate::structures::transport::Transport;
6use futures_util::SinkExt;
7use std::collections::HashMap;
8use std::io;
9use tokio_util::bytes::{Bytes, BytesMut};
10use tokio_util::codec::{Decoder, Encoder, Framed};
11use crate::codec::codec_trait::TfCodec;
12
13#[derive(Debug)]
14pub enum RouterError {
15 Io(io::Error),
16 Protocol(String),
17 Codec(String),
18 ConnectionClosed,
19}
20
21impl From<io::Error> for RouterError {
22 fn from(e: io::Error) -> Self {
23 RouterError::Io(e)
24 }
25}
26
27pub struct TargetRouter {
29 known_routes: HashMap<String, u64>,
30}
31
32impl TargetRouter {
33 pub fn new() -> Self {
34 Self {
35 known_routes: HashMap::new(),
36 }
37 }
38 pub fn lookup_route(&self, route: &str) -> Option<u64> {
40 self.known_routes.get(route).cloned()
41 }
42 pub async fn request_route<
44 C: Encoder<Bytes, Error = io::Error>
45 + Decoder<Item = BytesMut, Error = io::Error>
46 + Clone
47 + Send
48 + Sync
49 + 'static
50 +TfCodec,
51 >(
52 &mut self,
53 route: &str,
54 stream: &mut Framed<Transport, C>,
55 processor: &mut TrafficProcessorHolder<C>,
56 ) -> Result<u64, RouterError> {
57 if let Some(id) = self.lookup_route(route) {
58 return Ok(id);
59 }
60
61 let id = Self::request_route_from_server(route, stream, processor).await?;
62 self.known_routes.insert(route.to_string(), id);
63 Ok(id)
64 }
65
66 async fn request_route_from_server<
67 C: Encoder<Bytes, Error = io::Error>
68 + Decoder<Item = BytesMut, Error = io::Error>
69 + Clone
70 + Send
71 + Sync
72 + 'static
73 +TfCodec,
74 >(
75 name: &str,
76 stream: &mut Framed<Transport, C>,
77 processor: &mut TrafficProcessorHolder<C>,
78 ) -> Result<u64, RouterError> {
79 let meta_req = HandlerMetaReq {
80 s_type: SystemSType::HandlerMetaReq,
81 handler_name: name.to_string(),
82 };
83
84 let serialized = s_type::to_vec(&meta_req)
85 .ok_or(RouterError::Protocol("serialization failed".into()))?;
86
87 let processed = processor.post_process_traffic(serialized).await;
88
89 stream.send(processed.into()).await?;
90
91 let response = wait_for_data(stream)
92 .await
93 .map_err(|_| RouterError::ConnectionClosed)?;
94
95 let mut response = processor.pre_process_traffic(response).await;
96
97 let meta_ans = s_type::from_slice::<HandlerMetaAns>(response.as_mut())
98 .map_err(|e| RouterError::Protocol(e.to_string()))?;
99
100 Ok(meta_ans.id)
101 }
102}