1pub mod target_router;
2
3use crate::client::target_router::TargetRouter;
4use crate::log_macros::{tf_debug, tf_info, tf_warn};
5use crate::structures::s_type;
6use crate::structures::s_type::{PacketMeta, StructureType, SystemSType};
7use crate::structures::traffic_proc::TrafficProcessorHolder;
8use crate::structures::transport::Transport;
9use futures_util::SinkExt;
10use std::io;
11#[cfg(not(target_arch = "wasm32"))]
12use std::sync::Arc;
13#[cfg(not(target_arch = "wasm32"))]
14use tokio::net::TcpStream;
15use tokio::sync::mpsc::{Receiver, Sender};
16use tokio::sync::mpsc;
17#[cfg(not(target_arch = "wasm32"))]
18use tokio_rustls::TlsConnector;
19#[cfg(not(target_arch = "wasm32"))]
20use tokio_rustls::rustls::ClientConfig;
21use tokio_util::bytes::{Bytes, BytesMut};
22use tokio_util::codec::Framed;
23use crate::codec::codec_trait::TfCodec;
24
25#[derive(Clone)]
26pub enum ClientMode {
27 #[cfg(not(target_arch = "wasm32"))]
29 Tcp { client_config: Option<ClientConfig> },
30 WebSocket { url: String },
33}
34
35#[derive(Debug)]
36pub enum ClientError {
37 Io(io::Error),
38 Tls(String),
39 Codec(io::Error),
40 Router(String),
41 ChannelClosed,
42 Protocol(String),
43 Setup(String),
45}
46
47impl From<io::Error> for ClientError {
48 fn from(e: io::Error) -> Self {
49 ClientError::Io(e)
50 }
51}
52
53pub struct ClientConnect {
54 tx: Sender<ClientRequest>,
55}
56
57#[derive(Clone)]
58pub struct HandlerInfo {
60 id: Option<u64>,
61 named: Option<String>,
62}
63
64impl HandlerInfo {
65 pub fn new_named(name: String) -> Self {
67 Self {
68 id: None,
69 named: Some(name),
70 }
71 }
72 pub fn new_id(id: u64) -> Self {
74 Self {
75 id: Some(id),
76 named: None,
77 }
78 }
79
80 pub fn id(&self) -> Option<u64> {
81 self.id
82 }
83
84 pub fn named(&self) -> &Option<String> {
85 &self.named
86 }
87}
88
89pub struct DataRequest {
93 pub handler_info: HandlerInfo,
94 pub data: Vec<u8>,
95 pub s_type: Box<dyn StructureType>,
96}
97
98pub struct ClientRequest {
100 pub req: DataRequest,
101 pub consumer: tokio::sync::oneshot::Sender<BytesMut>,
102}
103
104impl ClientConnect {
105 pub async fn new<C: TfCodec>(
111 server_name: String,
112 connection_dest: String,
113 processor: Option<TrafficProcessorHolder<C>>,
114 mut codec: C,
115 mode: ClientMode,
116 max_request_in_time: usize,
117 ) -> Result<Self, ClientError> {
118 tf_info!("Connecting to {}", connection_dest);
119 let mut transport = Self::connect(server_name, connection_dest.clone(), &mode).await?;
120
121 if !codec.initial_setup(&mut transport).await {
122 tf_warn!("Codec initial_setup failed for {}", connection_dest);
123 return Err(ClientError::Setup(
124 "codec initial_setup failed".into(),
125 ));
126 }
127 tf_debug!("Connected to {}", connection_dest);
128
129 let framed = Framed::new(transport, codec);
130 let (tx, rx) = mpsc::channel(max_request_in_time);
131 Self::connection_main(framed, processor, rx);
132
133 Ok(Self { tx })
134 }
135
136 async fn connect(
137 server_name: String,
138 connection_dest: String,
139 mode: &ClientMode,
140 ) -> Result<Transport, ClientError> {
141 match mode {
142 #[cfg(not(target_arch = "wasm32"))]
143 ClientMode::Tcp { client_config } => {
144 let socket = TcpStream::connect(&connection_dest).await?;
145 socket.set_nodelay(true)?;
146
147 if let Some(cfg) = client_config {
148 let connector = TlsConnector::from(Arc::new(cfg.clone()));
149 let domain = server_name
150 .try_into()
151 .map_err(|_| ClientError::Tls("Invalid server name".into()))?;
152 let tls = connector
153 .connect(domain, socket)
154 .await
155 .map_err(|e| ClientError::Tls(e.to_string()))?;
156 Ok(Transport::tls_client(tls))
157 } else {
158 Ok(Transport::plain(socket))
159 }
160 }
161
162 ClientMode::WebSocket { url } => {
163 Transport::connect(url).await.map_err(|e| ClientError::Tls(e.to_string()))
164 }
165 }
166 }
167
168 pub async fn dispatch_request(&self, request: ClientRequest) -> Result<(), ClientError> {
170 self.tx
171 .send(request)
172 .await
173 .map_err(|_| ClientError::ChannelClosed)
174 }
175
176 fn connection_main<C: TfCodec>(
177 mut socket: Framed<Transport, C>,
178 processor: Option<TrafficProcessorHolder<C>>,
179 mut rx: Receiver<ClientRequest>,
180 ) {
181 let mut processor = processor.unwrap_or_else(TrafficProcessorHolder::new);
182 let mut router = TargetRouter::new();
183
184 tokio::spawn(async move {
185 while let Some(request) = rx.recv().await {
186 if let Err(err) =
187 Self::process_request(request, &mut socket, &mut processor, &mut router).await
188 {
189 tf_warn!("Client request failed: {:?}", err);
190 }
191 }
192 tf_debug!("Client connection loop ended");
193 });
194 }
195
196 async fn process_request<C: TfCodec>(
197 request: ClientRequest,
198 socket: &mut Framed<Transport, C>,
199 processor: &mut TrafficProcessorHolder<C>,
200 target_router: &mut TargetRouter,
201 ) -> Result<(), ClientError> {
202 let handler_id = match request.req.handler_info.id() {
203 Some(id) => id,
204 None => {
205 let name = request
206 .req
207 .handler_info
208 .named
209 .ok_or_else(|| ClientError::Protocol("Missing handler name".into()))?;
210
211 tf_debug!("Requesting route id for handler '{}'", name);
212 target_router
213 .request_route(name.as_str(), socket, processor)
214 .await
215 .map_err(|e| ClientError::Router(format!("{:?}", e)))?
216 }
217 };
218
219 let meta = PacketMeta {
220 s_type: SystemSType::PacketMeta,
221 s_type_req: request.req.s_type.get_serialize_function()(request.req.s_type),
222 handler_id,
223 has_payload: !request.req.data.is_empty(),
224 };
225
226 let meta_vec = s_type::to_vec(&meta)
227 .ok_or_else(|| ClientError::Protocol("PacketMeta serialization failed".into()))?;
228
229 let meta_bytes = processor.post_process_traffic(meta_vec).await;
230 let payload = processor.post_process_traffic(request.req.data).await;
231
232 socket.send(Bytes::from(meta_bytes)).await?;
233 socket.send(Bytes::from(payload)).await?;
234
235 let response = wait_for_data(socket).await?;
236 let response = processor.pre_process_traffic(response).await;
237
238 let _ = request.consumer.send(response);
239
240 Ok(())
241 }
242}
243
244pub async fn wait_for_data<C: TfCodec>(
245 socket: &mut Framed<Transport, C>,
246) -> Result<BytesMut, ClientError> {
247 use futures_util::StreamExt;
248
249 match socket.next().await {
250 Some(Ok(data)) => Ok(data),
251 Some(Err(e)) => Err(ClientError::Codec(e)),
252 None => Err(ClientError::Protocol("Connection closed".into())),
253 }
254}