Skip to main content

tfserver/client/
mod.rs

1pub mod target_router;
2
3use crate::client::target_router::TargetRouter;
4use crate::log_macros::{tf_debug, tf_info, tf_warn};
5use crate::structures::s_type;
6use crate::structures::s_type::{PacketMeta, StructureType, SystemSType};
7use crate::structures::traffic_proc::TrafficProcessorHolder;
8use crate::structures::transport::Transport;
9use futures_util::SinkExt;
10use std::io;
11#[cfg(not(target_arch = "wasm32"))]
12use std::sync::Arc;
13#[cfg(not(target_arch = "wasm32"))]
14use tokio::net::TcpStream;
15use tokio::sync::mpsc::{Receiver, Sender};
16use tokio::sync::mpsc;
17#[cfg(not(target_arch = "wasm32"))]
18use tokio_rustls::TlsConnector;
19#[cfg(not(target_arch = "wasm32"))]
20use tokio_rustls::rustls::ClientConfig;
21use tokio_util::bytes::{Bytes, BytesMut};
22use tokio_util::codec::Framed;
23use crate::codec::codec_trait::TfCodec;
24
25#[derive(Clone)]
26pub enum ClientMode {
27    /// Raw TCP, optionally wrapped in TLS
28    #[cfg(not(target_arch = "wasm32"))]
29    Tcp { client_config: Option<ClientConfig> },
30    /// WebSocket — for environments without raw TCP access (e.g. WASM)
31    /// `url` is the full ws:// or wss:// URL, e.g. "wss://example.com:9000/ws"
32    WebSocket { url: String },
33}
34
35#[derive(Debug)]
36pub enum ClientError {
37    Io(io::Error),
38    Tls(String),
39    Codec(io::Error),
40    Router(String),
41    ChannelClosed,
42    Protocol(String),
43    /// Codec `initial_setup` failed during connection establishment.
44    Setup(String),
45}
46
47impl From<io::Error> for ClientError {
48    fn from(e: io::Error) -> Self {
49        ClientError::Io(e)
50    }
51}
52
53pub struct ClientConnect {
54    tx: Sender<ClientRequest>,
55}
56
57#[derive(Clone)]
58/// Describes the target handler on the server.
59pub struct HandlerInfo {
60    id: Option<u64>,
61    named: Option<String>,
62}
63
64impl HandlerInfo {
65    /// Creates handler info by handler name.
66    pub fn new_named(name: String) -> Self {
67        Self {
68            id: None,
69            named: Some(name),
70        }
71    }
72    /// Creates handler info by handler id.
73    pub fn new_id(id: u64) -> Self {
74        Self {
75            id: Some(id),
76            named: None,
77        }
78    }
79
80    pub fn id(&self) -> Option<u64> {
81        self.id
82    }
83
84    pub fn named(&self) -> &Option<String> {
85        &self.named
86    }
87}
88
89/// `handler_info` — info about the target handler.
90/// `data` — the request payload (serialized structure).
91/// `s_type` — identifies what data is sent and how the server handler processes it.
92pub struct DataRequest {
93    pub handler_info: HandlerInfo,
94    pub data: Vec<u8>,
95    pub s_type: Box<dyn StructureType>,
96}
97
98/// Wraps a data request and the channel that receives the server response.
99pub struct ClientRequest {
100    pub req: DataRequest,
101    pub consumer: tokio::sync::oneshot::Sender<BytesMut>,
102}
103
104impl ClientConnect {
105    /// Creates a client and connects to the server.
106    ///
107    /// - `server_name`: used for TLS SNI; may be empty when not using TLS.
108    /// - `connection_dest`: `host:port`, e.g. `"65.88.95.127:9090"`.
109    /// - `max_request_in_time`: capacity of the in-flight request channel.
110    pub async fn new<C: TfCodec>(
111        server_name: String,
112        connection_dest: String,
113        processor: Option<TrafficProcessorHolder<C>>,
114        mut codec: C,
115        mode: ClientMode,
116        max_request_in_time: usize,
117    ) -> Result<Self, ClientError> {
118        tf_info!("Connecting to {}", connection_dest);
119        let mut transport = Self::connect(server_name, connection_dest.clone(), &mode).await?;
120
121        if !codec.initial_setup(&mut transport).await {
122            tf_warn!("Codec initial_setup failed for {}", connection_dest);
123            return Err(ClientError::Setup(
124                "codec initial_setup failed".into(),
125            ));
126        }
127        tf_debug!("Connected to {}", connection_dest);
128
129        let framed = Framed::new(transport, codec);
130        let (tx, rx) = mpsc::channel(max_request_in_time);
131        Self::connection_main(framed, processor, rx);
132
133        Ok(Self { tx })
134    }
135
136    async fn connect(
137        server_name: String,
138        connection_dest: String,
139        mode: &ClientMode,
140    ) -> Result<Transport, ClientError> {
141        match mode {
142            #[cfg(not(target_arch = "wasm32"))]
143            ClientMode::Tcp { client_config } => {
144                let socket = TcpStream::connect(&connection_dest).await?;
145                socket.set_nodelay(true)?;
146
147                if let Some(cfg) = client_config {
148                    let connector = TlsConnector::from(Arc::new(cfg.clone()));
149                    let domain = server_name
150                        .try_into()
151                        .map_err(|_| ClientError::Tls("Invalid server name".into()))?;
152                    let tls = connector
153                        .connect(domain, socket)
154                        .await
155                        .map_err(|e| ClientError::Tls(e.to_string()))?;
156                    Ok(Transport::tls_client(tls))
157                } else {
158                    Ok(Transport::plain(socket))
159                }
160            }
161
162            ClientMode::WebSocket { url } => {
163                Transport::connect(url).await.map_err(|e| ClientError::Tls(e.to_string()))
164            }
165        }
166    }
167
168    /// Dispatches a request to the server.
169    pub async fn dispatch_request(&self, request: ClientRequest) -> Result<(), ClientError> {
170        self.tx
171            .send(request)
172            .await
173            .map_err(|_| ClientError::ChannelClosed)
174    }
175
176    fn connection_main<C: TfCodec>(
177        mut socket: Framed<Transport, C>,
178        processor: Option<TrafficProcessorHolder<C>>,
179        mut rx: Receiver<ClientRequest>,
180    ) {
181        let mut processor = processor.unwrap_or_else(TrafficProcessorHolder::new);
182        let mut router = TargetRouter::new();
183
184        tokio::spawn(async move {
185            while let Some(request) = rx.recv().await {
186                if let Err(err) =
187                    Self::process_request(request, &mut socket, &mut processor, &mut router).await
188                {
189                    tf_warn!("Client request failed: {:?}", err);
190                }
191            }
192            tf_debug!("Client connection loop ended");
193        });
194    }
195
196    async fn process_request<C: TfCodec>(
197        request: ClientRequest,
198        socket: &mut Framed<Transport, C>,
199        processor: &mut TrafficProcessorHolder<C>,
200        target_router: &mut TargetRouter,
201    ) -> Result<(), ClientError> {
202        let handler_id = match request.req.handler_info.id() {
203            Some(id) => id,
204            None => {
205                let name = request
206                    .req
207                    .handler_info
208                    .named
209                    .ok_or_else(|| ClientError::Protocol("Missing handler name".into()))?;
210
211                tf_debug!("Requesting route id for handler '{}'", name);
212                target_router
213                    .request_route(name.as_str(), socket, processor)
214                    .await
215                    .map_err(|e| ClientError::Router(format!("{:?}", e)))?
216            }
217        };
218
219        let meta = PacketMeta {
220            s_type: SystemSType::PacketMeta,
221            s_type_req: request.req.s_type.get_serialize_function()(request.req.s_type),
222            handler_id,
223            has_payload: !request.req.data.is_empty(),
224        };
225
226        let meta_vec = s_type::to_vec(&meta)
227            .ok_or_else(|| ClientError::Protocol("PacketMeta serialization failed".into()))?;
228
229        let meta_bytes = processor.post_process_traffic(meta_vec).await;
230        let payload = processor.post_process_traffic(request.req.data).await;
231
232        socket.send(Bytes::from(meta_bytes)).await?;
233        socket.send(Bytes::from(payload)).await?;
234
235        let response = wait_for_data(socket).await?;
236        let response = processor.pre_process_traffic(response).await;
237
238        let _ = request.consumer.send(response);
239
240        Ok(())
241    }
242}
243
244pub async fn wait_for_data<C: TfCodec>(
245    socket: &mut Framed<Transport, C>,
246) -> Result<BytesMut, ClientError> {
247    use futures_util::StreamExt;
248
249    match socket.next().await {
250        Some(Ok(data)) => Ok(data),
251        Some(Err(e)) => Err(ClientError::Codec(e)),
252        None => Err(ClientError::Protocol("Connection closed".into())),
253    }
254}