Skip to main content

tfserver/client/
mod.rs

1pub mod target_router;
2
3use crate::client::target_router::TargetRouter;
4use crate::log_macros::{tf_debug, tf_info, tf_warn};
5use crate::structures::s_type;
6use crate::structures::s_type::{PacketMeta, StructureType, SystemSType};
7use crate::structures::traffic_proc::TrafficProcessorHolder;
8use crate::structures::transport::Transport;
9use futures_util::SinkExt;
10use std::io;
11use std::sync::Arc;
12use tokio::net::TcpStream;
13use tokio::sync::mpsc::{Receiver, Sender};
14use tokio::sync::mpsc;
15use tokio_rustls::TlsConnector;
16use tokio_rustls::rustls::ClientConfig;
17use tokio_util::bytes::{Bytes, BytesMut};
18use tokio_util::codec::Framed;
19use crate::codec::codec_trait::TfCodec;
20
21#[derive(Clone)]
22pub enum ClientMode {
23    /// Raw TCP, optionally wrapped in TLS
24    Tcp { client_config: Option<ClientConfig> },
25    /// WebSocket — for environments without raw TCP access (e.g. WASM)
26    /// `url` is the full ws:// or wss:// URL, e.g. "wss://example.com:9000/ws"
27    WebSocket { url: String },
28}
29
30#[derive(Debug)]
31pub enum ClientError {
32    Io(io::Error),
33    Tls(String),
34    Codec(io::Error),
35    Router(String),
36    ChannelClosed,
37    Protocol(String),
38    /// Codec `initial_setup` failed during connection establishment.
39    Setup(String),
40}
41
42impl From<io::Error> for ClientError {
43    fn from(e: io::Error) -> Self {
44        ClientError::Io(e)
45    }
46}
47
48pub struct ClientConnect {
49    tx: Sender<ClientRequest>,
50}
51
52#[derive(Clone)]
53/// Describes the target handler on the server.
54pub struct HandlerInfo {
55    id: Option<u64>,
56    named: Option<String>,
57}
58
59impl HandlerInfo {
60    /// Creates handler info by handler name.
61    pub fn new_named(name: String) -> Self {
62        Self {
63            id: None,
64            named: Some(name),
65        }
66    }
67    /// Creates handler info by handler id.
68    pub fn new_id(id: u64) -> Self {
69        Self {
70            id: Some(id),
71            named: None,
72        }
73    }
74
75    pub fn id(&self) -> Option<u64> {
76        self.id
77    }
78
79    pub fn named(&self) -> &Option<String> {
80        &self.named
81    }
82}
83
84/// `handler_info` — info about the target handler.
85/// `data` — the request payload (serialized structure).
86/// `s_type` — identifies what data is sent and how the server handler processes it.
87pub struct DataRequest {
88    pub handler_info: HandlerInfo,
89    pub data: Vec<u8>,
90    pub s_type: Box<dyn StructureType>,
91}
92
93/// Wraps a data request and the channel that receives the server response.
94pub struct ClientRequest {
95    pub req: DataRequest,
96    pub consumer: tokio::sync::oneshot::Sender<BytesMut>,
97}
98
99impl ClientConnect {
100    /// Creates a client and connects to the server.
101    ///
102    /// - `server_name`: used for TLS SNI; may be empty when not using TLS.
103    /// - `connection_dest`: `host:port`, e.g. `"65.88.95.127:9090"`.
104    /// - `max_request_in_time`: capacity of the in-flight request channel.
105    pub async fn new<C: TfCodec>(
106        server_name: String,
107        connection_dest: String,
108        processor: Option<TrafficProcessorHolder<C>>,
109        mut codec: C,
110        mode: ClientMode,
111        max_request_in_time: usize,
112    ) -> Result<Self, ClientError> {
113        tf_info!("Connecting to {}", connection_dest);
114        let mut transport = Self::connect(server_name, connection_dest.clone(), &mode).await?;
115
116        if !codec.initial_setup(&mut transport).await {
117            tf_warn!("Codec initial_setup failed for {}", connection_dest);
118            return Err(ClientError::Setup(
119                "codec initial_setup failed".into(),
120            ));
121        }
122        tf_debug!("Connected to {}", connection_dest);
123
124        let framed = Framed::new(transport, codec);
125        let (tx, rx) = mpsc::channel(max_request_in_time);
126        Self::connection_main(framed, processor, rx);
127
128        Ok(Self { tx })
129    }
130
131    async fn connect(
132        server_name: String,
133        connection_dest: String,
134        mode: &ClientMode,
135    ) -> Result<Transport, ClientError> {
136        match mode {
137            ClientMode::Tcp { client_config } => {
138                let socket = TcpStream::connect(&connection_dest).await?;
139                socket.set_nodelay(true)?;
140
141                if let Some(cfg) = client_config {
142                    let connector = TlsConnector::from(Arc::new(cfg.clone()));
143                    let domain = server_name
144                        .try_into()
145                        .map_err(|_| ClientError::Tls("Invalid server name".into()))?;
146                    let tls = connector
147                        .connect(domain, socket)
148                        .await
149                        .map_err(|e| ClientError::Tls(e.to_string()))?;
150                    Ok(Transport::tls_client(tls))
151                } else {
152                    Ok(Transport::plain(socket))
153                }
154            }
155
156            ClientMode::WebSocket { url } => {
157                Transport::connect(url).await.map_err(|e| ClientError::Tls(e.to_string()))
158            }
159        }
160    }
161
162    /// Dispatches a request to the server.
163    pub async fn dispatch_request(&self, request: ClientRequest) -> Result<(), ClientError> {
164        self.tx
165            .send(request)
166            .await
167            .map_err(|_| ClientError::ChannelClosed)
168    }
169
170    fn connection_main<C: TfCodec>(
171        mut socket: Framed<Transport, C>,
172        processor: Option<TrafficProcessorHolder<C>>,
173        mut rx: Receiver<ClientRequest>,
174    ) {
175        let mut processor = processor.unwrap_or_else(TrafficProcessorHolder::new);
176        let mut router = TargetRouter::new();
177
178        tokio::spawn(async move {
179            while let Some(request) = rx.recv().await {
180                if let Err(err) =
181                    Self::process_request(request, &mut socket, &mut processor, &mut router).await
182                {
183                    tf_warn!("Client request failed: {:?}", err);
184                }
185            }
186            tf_debug!("Client connection loop ended");
187        });
188    }
189
190    async fn process_request<C: TfCodec>(
191        request: ClientRequest,
192        socket: &mut Framed<Transport, C>,
193        processor: &mut TrafficProcessorHolder<C>,
194        target_router: &mut TargetRouter,
195    ) -> Result<(), ClientError> {
196        let handler_id = match request.req.handler_info.id() {
197            Some(id) => id,
198            None => {
199                let name = request
200                    .req
201                    .handler_info
202                    .named
203                    .ok_or_else(|| ClientError::Protocol("Missing handler name".into()))?;
204
205                tf_debug!("Requesting route id for handler '{}'", name);
206                target_router
207                    .request_route(name.as_str(), socket, processor)
208                    .await
209                    .map_err(|e| ClientError::Router(format!("{:?}", e)))?
210            }
211        };
212
213        let meta = PacketMeta {
214            s_type: SystemSType::PacketMeta,
215            s_type_req: request.req.s_type.get_serialize_function()(request.req.s_type),
216            handler_id,
217            has_payload: !request.req.data.is_empty(),
218        };
219
220        let meta_vec = s_type::to_vec(&meta)
221            .ok_or_else(|| ClientError::Protocol("PacketMeta serialization failed".into()))?;
222
223        let meta_bytes = processor.post_process_traffic(meta_vec).await;
224        let payload = processor.post_process_traffic(request.req.data).await;
225
226        socket.send(Bytes::from(meta_bytes)).await?;
227        socket.send(Bytes::from(payload)).await?;
228
229        let response = wait_for_data(socket).await?;
230        let response = processor.pre_process_traffic(response).await;
231
232        let _ = request.consumer.send(response);
233
234        Ok(())
235    }
236}
237
238pub async fn wait_for_data<C: TfCodec>(
239    socket: &mut Framed<Transport, C>,
240) -> Result<BytesMut, ClientError> {
241    use futures_util::StreamExt;
242
243    match socket.next().await {
244        Some(Ok(data)) => Ok(data),
245        Some(Err(e)) => Err(ClientError::Codec(e)),
246        None => Err(ClientError::Protocol("Connection closed".into())),
247    }
248}