Skip to main content

tfserver/client/
mod.rs

1pub mod target_router;
2
3use crate::client::target_router::TargetRouter;
4use crate::structures::s_type;
5use crate::structures::s_type::{PacketMeta, StructureType, SystemSType};
6use crate::structures::traffic_proc::TrafficProcessorHolder;
7use crate::structures::transport::Transport;
8use futures_util::SinkExt;
9use std::io;
10use std::sync::Arc;
11use tokio::net::TcpStream;
12use tokio::sync::mpsc::{Receiver, Sender};
13use tokio::sync::{mpsc};
14use tokio_rustls::TlsConnector;
15use tokio_rustls::rustls::ClientConfig;
16use tokio_util::bytes::{Bytes, BytesMut};
17use tokio_util::codec::{Decoder, Encoder, Framed};
18use crate::codec::codec_trait::TfCodec;
19
20#[derive(Debug)]
21pub enum ClientError {
22    Io(io::Error),
23    Tls(String),
24    Codec(io::Error),
25    Router(String),
26    ChannelClosed,
27    Protocol(String),
28}
29
30impl From<io::Error> for ClientError {
31    fn from(e: io::Error) -> Self {
32        ClientError::Io(e)
33    }
34}
35
36pub struct ClientConnect {
37    tx: Sender<ClientRequest>,
38}
39
40#[derive( Clone)]
41///The structure that describes target handler
42pub struct HandlerInfo {
43    id: Option<u64>,
44    named: Option<String>,
45}
46
47impl HandlerInfo {
48    ///Creates handler info by handler name
49    pub fn new_named(name: String) -> Self {
50        Self {
51            id: None,
52            named: Some(name),
53        }
54    }
55    ///Creates handler info by handler id
56    pub fn new_id(id: u64) -> Self {
57        Self {
58            id: Some(id),
59            named: None,
60        }
61    }
62
63    pub fn id(&self) -> Option<u64> {
64        self.id
65    }
66
67    pub fn named(&self) -> &Option<String> {
68        &self.named
69    }
70}
71/// 'handler_info' info about target handler
72/// 'data' the request payload. E.g structure that will be deserialized on server side.
73/// 's_type' structure type indetifiers what data is send and how handler on server side will process this data.
74pub struct DataRequest {
75    pub handler_info: HandlerInfo,
76    pub data: Vec<u8>,
77    pub s_type: Box<dyn StructureType>,
78}
79///The request wrapper struct.
80/// 'req' data request
81/// 'consumer' the signal that will be called by connection, when the response arrives
82 
83pub struct ClientRequest {
84    pub req: DataRequest,
85    pub consumer: tokio::sync::oneshot::Sender<BytesMut>,
86}
87
88impl ClientConnect {
89    ///Creates and connect to the designated server address
90    /// 'server_name' used for tls mode. You need to pass domain name of the server. If there is no tls, you can pass random data or empty
91    /// 'connection_dest' the (server address/domain name):port. E.g temp_domain.com:443, or 65.88.95.127:9090.
92    /// 'processor' the traffic processor, must be symmetric to the server one processor.
93    /// 'codec' the connection codec. Recommended base LengthDelimitedCodec from module codec.
94    /// 'client_config' the tls config.
95    /// 'max_request_in_time' max amount of requests that can be dispatched in the same time.
96    pub async fn new<
97        C: Encoder<Bytes, Error = io::Error>
98            + Decoder<Item = BytesMut, Error = io::Error>
99            + Clone
100            + Send
101            + Sync
102            + 'static
103            + TfCodec,
104    >(
105        server_name: String,
106        connection_dest: String,
107        processor: Option<TrafficProcessorHolder<C>>,
108        mut codec: C,
109        client_config: Option<ClientConfig>,
110        max_request_in_time: usize,
111    ) -> Result<Self, ClientError> {
112        let socket = TcpStream::connect(connection_dest).await?;
113        socket.set_nodelay(true)?;
114
115        let mut transport = if let Some(client_config) = client_config {
116            let connector = TlsConnector::from(Arc::new(client_config));
117            let domain = server_name
118                .try_into()
119                .map_err(|_| ClientError::Tls("Invalid server name".into()))?;
120
121            let tls = connector
122                .connect(domain, socket)
123                .await
124                .map_err(|e| ClientError::Tls(e.to_string()))?;
125
126            Transport::tls_client(tls)
127        } else {
128            Transport::plain(socket)
129        };
130        if !codec.initial_setup(&mut transport).await {
131            panic!("Failed to initial setup transport");
132        }
133        let framed = Framed::new(transport, codec);
134        let (tx, rx) = mpsc::channel(max_request_in_time);
135
136        Self::connection_main(framed, processor, rx);
137
138        Ok(Self { tx })
139    }
140
141    ///Dispatches the request.
142    pub async fn dispatch_request(&self, request: ClientRequest) -> Result<(), ClientError> {
143        self.tx
144            .send(request)
145            .await
146            .map_err(|_| ClientError::ChannelClosed)
147    }
148    
149    fn connection_main<
150        C: Encoder<Bytes, Error = io::Error>
151            + Decoder<Item = BytesMut, Error = io::Error>
152            + Clone
153            + Send
154            + Sync
155            + 'static
156        +TfCodec,
157    >(
158        mut socket: Framed<Transport, C>,
159        processor: Option<TrafficProcessorHolder<C>>,
160        mut rx: Receiver<ClientRequest>,
161    ) {
162        let mut processor = processor.unwrap_or_else(TrafficProcessorHolder::new);
163        let mut router = TargetRouter::new();
164
165        tokio::spawn(async move {
166            while let Some(request) = rx.recv().await {
167                if let Err(err) =
168                    Self::process_request(request, &mut socket, &mut processor, &mut router).await
169                {
170                    eprintln!("Client request failed: {:?}", err);
171                }
172            }
173        });
174    }
175
176    async fn process_request<
177        C: Encoder<Bytes, Error = io::Error>
178            + Decoder<Item = BytesMut, Error = io::Error>
179            + Clone
180            + Send
181            + Sync
182            + 'static
183        +TfCodec,
184    >(
185        request: ClientRequest,
186        socket: &mut Framed<Transport, C>,
187        processor: &mut TrafficProcessorHolder<C>,
188        target_router: &mut TargetRouter,
189    ) -> Result<(), ClientError> {
190        let handler_id = match request.req.handler_info.id() {
191            Some(id) => id,
192            None => {
193                let name = request
194                    .req
195                    .handler_info
196                    .named
197                    .ok_or_else(|| ClientError::Protocol("Missing handler name".into()))?;
198
199                target_router
200                    .request_route(name.as_str(), socket, processor)
201                    .await
202                    .map_err(|e| ClientError::Router(format!("{:?}", e)))?
203            }
204        };
205
206        let meta = PacketMeta {
207            s_type: SystemSType::PacketMeta,
208            s_type_req: request.req.s_type.get_serialize_function()(request.req.s_type),
209            handler_id,
210            has_payload: !request.req.data.is_empty(),
211        };
212
213        let meta_vec = s_type::to_vec(&meta)
214            .ok_or_else(|| ClientError::Protocol("PacketMeta serialization failed".into()))?;
215
216        let meta_bytes = processor.post_process_traffic(meta_vec).await;
217        let payload = processor.post_process_traffic(request.req.data).await;
218
219        socket.send(Bytes::from(meta_bytes)).await?;
220        socket.send(Bytes::from(payload)).await?;
221
222        let response = wait_for_data(socket).await?;
223        let response = processor.pre_process_traffic(response).await;
224
225        let _ = request
226            .consumer
227            .send(response);
228
229        Ok(())
230    }
231}
232
233pub async fn wait_for_data<
234    C: Encoder<Bytes, Error = io::Error>
235        + Decoder<Item = BytesMut, Error = io::Error>
236        + Clone
237        + Send
238        + Sync
239        + 'static
240    +TfCodec,
241>(
242    socket: &mut Framed<Transport, C>,
243) -> Result<BytesMut, ClientError> {
244    use futures_util::StreamExt;
245
246    match socket.next().await {
247        Some(Ok(data)) => Ok(data),
248        Some(Err(e)) => Err(ClientError::Codec(e)),
249        None => Err(ClientError::Protocol("Connection closed".into())),
250    }
251}