Skip to main content

tfserver/client/
mod.rs

1pub mod target_router;
2
3use crate::client::target_router::TargetRouter;
4use crate::structures::s_type;
5use crate::structures::s_type::{PacketMeta, StructureType, SystemSType};
6use crate::structures::traffic_proc::TrafficProcessorHolder;
7use crate::structures::transport::Transport;
8use futures_util::SinkExt;
9use std::io;
10use std::sync::Arc;
11use tokio::net::TcpStream;
12use tokio::sync::mpsc::{Receiver, Sender};
13use tokio::sync::{mpsc};
14use tokio_rustls::TlsConnector;
15use tokio_rustls::rustls::ClientConfig;
16use tokio_util::bytes::{Bytes, BytesMut};
17use tokio_util::codec::{ Framed};
18use crate::codec::codec_trait::TfCodec;
19
20#[derive(Clone)]
21pub enum ClientMode {
22    /// Raw TCP, optionally wrapped in TLS
23    Tcp { client_config: Option<ClientConfig> },
24    /// WebSocket — for environments without raw TCP access (e.g. WASM)
25    /// 'url' full ws:// or wss:// URL, e.g. "wss://example.com:9000/ws"
26    WebSocket { url: String },
27}
28
29#[derive(Debug)]
30pub enum ClientError {
31    Io(io::Error),
32    Tls(String),
33    Codec(io::Error),
34    Router(String),
35    ChannelClosed,
36    Protocol(String),
37}
38
39impl From<io::Error> for ClientError {
40    fn from(e: io::Error) -> Self {
41        ClientError::Io(e)
42    }
43}
44
45pub struct ClientConnect {
46    tx: Sender<ClientRequest>,
47}
48
49#[derive( Clone)]
50///The structure that describes target handler
51pub struct HandlerInfo {
52    id: Option<u64>,
53    named: Option<String>,
54}
55
56impl HandlerInfo {
57    ///Creates handler info by handler name
58    pub fn new_named(name: String) -> Self {
59        Self {
60            id: None,
61            named: Some(name),
62        }
63    }
64    ///Creates handler info by handler id
65    pub fn new_id(id: u64) -> Self {
66        Self {
67            id: Some(id),
68            named: None,
69        }
70    }
71
72    pub fn id(&self) -> Option<u64> {
73        self.id
74    }
75
76    pub fn named(&self) -> &Option<String> {
77        &self.named
78    }
79}
80/// 'handler_info' info about target handler
81/// 'data' the request payload. E.g structure that will be deserialized on server side.
82/// 's_type' structure type indetifiers what data is send and how handler on server side will process this data.
83pub struct DataRequest {
84    pub handler_info: HandlerInfo,
85    pub data: Vec<u8>,
86    pub s_type: Box<dyn StructureType>,
87}
88///The request wrapper struct.
89/// 'req' data request
90/// 'consumer' the signal that will be called by connection, when the response arrives
91 
92pub struct ClientRequest {
93    pub req: DataRequest,
94    pub consumer: tokio::sync::oneshot::Sender<BytesMut>,
95}
96
97impl ClientConnect {
98    ///Creates and connect to the designated server address
99    /// 'server_name' used for tls mode. You need to pass domain name of the server. If there is no tls, you can pass random data or empty
100    /// 'connection_dest' the (server address/domain name):port. E.g temp_domain.com:443, or 65.88.95.127:9090.
101    /// 'processor' the traffic processor, must be symmetric to the server one processor.
102    /// 'codec' the connection codec. Recommended base LengthDelimitedCodec from module codec.
103    /// 'client_config' the tls config.
104    /// 'max_request_in_time' max amount of requests that can be dispatched in the same time.
105    pub async fn new<C: TfCodec>(
106        server_name: String,
107        connection_dest: String, 
108        processor: Option<TrafficProcessorHolder<C>>,
109        mut codec: C,
110        mode: ClientMode,          // ← replaces client_config
111        max_request_in_time: usize,
112    ) -> Result<Self, ClientError> {
113        let mut transport = Self::connect(server_name, connection_dest, &mode).await?;
114
115        if !codec.initial_setup(&mut transport).await {
116            panic!("Failed to initial setup transport");
117        }
118
119        let framed = Framed::new(transport, codec);
120        let (tx, rx) = mpsc::channel(max_request_in_time);
121        Self::connection_main(framed, processor, rx);
122
123        Ok(Self { tx })
124    }
125    async fn connect(
126        server_name: String,
127        connection_dest: String,
128        mode: &ClientMode,
129    ) -> Result<Transport, ClientError> {
130        match mode {
131            ClientMode::Tcp { client_config } => {
132                let socket = TcpStream::connect(&connection_dest).await?;
133                socket.set_nodelay(true)?;
134
135                if let Some(cfg) = client_config {
136                    let connector = TlsConnector::from(Arc::new(cfg.clone()));
137                    let domain = server_name
138                        .try_into()
139                        .map_err(|_| ClientError::Tls("Invalid server name".into()))?;
140                    let tls = connector
141                        .connect(domain, socket)
142                        .await
143                        .map_err(|e| ClientError::Tls(e.to_string()))?;
144                    Ok(Transport::tls_client(tls))
145                } else {
146                    Ok(Transport::plain(socket))
147                }
148            }
149
150            ClientMode::WebSocket { url } => {
151                Transport::connect(url).await.map_err(|e| ClientError::Tls(e.to_string()))
152            }
153        }
154    }
155
156    ///Dispatches the request.
157    pub async fn dispatch_request(&self, request: ClientRequest) -> Result<(), ClientError> {
158        self.tx
159            .send(request)
160            .await
161            .map_err(|_| ClientError::ChannelClosed)
162    }
163    
164    fn connection_main<
165        C: TfCodec,
166    >(
167        mut socket: Framed<Transport, C>,
168        processor: Option<TrafficProcessorHolder<C>>,
169        mut rx: Receiver<ClientRequest>,
170    ) {
171        let mut processor = processor.unwrap_or_else(TrafficProcessorHolder::new);
172        let mut router = TargetRouter::new();
173
174        tokio::spawn(async move {
175            while let Some(request) = rx.recv().await {
176                if let Err(err) =
177                    Self::process_request(request, &mut socket, &mut processor, &mut router).await
178                {
179                    eprintln!("Client request failed: {:?}", err);
180                }
181            }
182        });
183    }
184
185    async fn process_request<
186        C: TfCodec,
187    >(
188        request: ClientRequest,
189        socket: &mut Framed<Transport, C>,
190        processor: &mut TrafficProcessorHolder<C>,
191        target_router: &mut TargetRouter,
192    ) -> Result<(), ClientError> {
193        let handler_id = match request.req.handler_info.id() {
194            Some(id) => id,
195            None => {
196                let name = request
197                    .req
198                    .handler_info
199                    .named
200                    .ok_or_else(|| ClientError::Protocol("Missing handler name".into()))?;
201
202                target_router
203                    .request_route(name.as_str(), socket, processor)
204                    .await
205                    .map_err(|e| ClientError::Router(format!("{:?}", e)))?
206            }
207        };
208
209        let meta = PacketMeta {
210            s_type: SystemSType::PacketMeta,
211            s_type_req: request.req.s_type.get_serialize_function()(request.req.s_type),
212            handler_id,
213            has_payload: !request.req.data.is_empty(),
214        };
215
216        let meta_vec = s_type::to_vec(&meta)
217            .ok_or_else(|| ClientError::Protocol("PacketMeta serialization failed".into()))?;
218
219        let meta_bytes = processor.post_process_traffic(meta_vec).await;
220        let payload = processor.post_process_traffic(request.req.data).await;
221
222        socket.send(Bytes::from(meta_bytes)).await?;
223        socket.send(Bytes::from(payload)).await?;
224
225        let response = wait_for_data(socket).await?;
226        let response = processor.pre_process_traffic(response).await;
227
228        let _ = request
229            .consumer
230            .send(response);
231
232        Ok(())
233    }
234}
235
236pub async fn wait_for_data<
237    C: TfCodec,
238>(
239    socket: &mut Framed<Transport, C>,
240) -> Result<BytesMut, ClientError> {
241    use futures_util::StreamExt;
242
243    match socket.next().await {
244        Some(Ok(data)) => Ok(data),
245        Some(Err(e)) => Err(ClientError::Codec(e)),
246        None => Err(ClientError::Protocol("Connection closed".into())),
247    }
248}