tfserver/client/
target_router.rs1use crate::client::wait_for_data;
2use crate::structures::s_type;
3use crate::structures::s_type::{HandlerMetaAns, HandlerMetaReq, SystemSType};
4use crate::structures::traffic_proc::TrafficProcessorHolder;
5use crate::structures::transport::Transport;
6use futures_util::SinkExt;
7use std::collections::HashMap;
8use std::io;
9use tokio_util::codec::{Framed};
10use crate::codec::codec_trait::TfCodec;
11
12#[derive(Debug)]
13pub enum RouterError {
14 Io(io::Error),
15 Protocol(String),
16 Codec(String),
17 ConnectionClosed,
18}
19
20impl From<io::Error> for RouterError {
21 fn from(e: io::Error) -> Self {
22 RouterError::Io(e)
23 }
24}
25
26pub struct TargetRouter {
28 known_routes: HashMap<String, u64>,
29}
30
31impl TargetRouter {
32 pub fn new() -> Self {
33 Self {
34 known_routes: HashMap::new(),
35 }
36 }
37 pub fn lookup_route(&self, route: &str) -> Option<u64> {
39 self.known_routes.get(route).cloned()
40 }
41 pub async fn request_route<
43 C: TfCodec,
44 >(
45 &mut self,
46 route: &str,
47 stream: &mut Framed<Transport, C>,
48 processor: &mut TrafficProcessorHolder<C>,
49 ) -> Result<u64, RouterError> {
50 if let Some(id) = self.lookup_route(route) {
51 return Ok(id);
52 }
53
54 let id = Self::request_route_from_server(route, stream, processor).await?;
55 self.known_routes.insert(route.to_string(), id);
56 Ok(id)
57 }
58
59 async fn request_route_from_server<
60 C: TfCodec,
61 >(
62 name: &str,
63 stream: &mut Framed<Transport, C>,
64 processor: &mut TrafficProcessorHolder<C>,
65 ) -> Result<u64, RouterError> {
66 let meta_req = HandlerMetaReq {
67 s_type: SystemSType::HandlerMetaReq,
68 handler_name: name.to_string(),
69 };
70
71 let serialized = s_type::to_vec(&meta_req)
72 .ok_or(RouterError::Protocol("serialization failed".into()))?;
73
74 let processed = processor.post_process_traffic(serialized).await;
75
76 stream.send(processed.into()).await?;
77
78 let response = wait_for_data(stream)
79 .await
80 .map_err(|_| RouterError::ConnectionClosed)?;
81
82 let mut response = processor.pre_process_traffic(response).await;
83
84 let meta_ans = s_type::from_slice::<HandlerMetaAns>(response.as_mut())
85 .map_err(|e| RouterError::Protocol(e.to_string()))?;
86
87 Ok(meta_ans.id)
88 }
89}