1pub mod target_router;
2
3use crate::client::target_router::TargetRouter;
4use crate::structures::s_type;
5use crate::structures::s_type::{PacketMeta, StructureType, SystemSType};
6use crate::structures::traffic_proc::TrafficProcessorHolder;
7use crate::structures::transport::Transport;
8use futures_util::SinkExt;
9use std::io;
10use std::sync::Arc;
11use tokio::net::TcpStream;
12use tokio::sync::mpsc::{Receiver, Sender};
13use tokio::sync::{mpsc};
14use tokio_rustls::TlsConnector;
15use tokio_rustls::rustls::ClientConfig;
16use tokio_util::bytes::{Bytes, BytesMut};
17use tokio_util::codec::{Decoder, Encoder, Framed};
18use crate::codec::codec_trait::TfCodec;
19
20#[derive(Debug)]
21pub enum ClientError {
22 Io(io::Error),
23 Tls(String),
24 Codec(io::Error),
25 Router(String),
26 ChannelClosed,
27 Protocol(String),
28}
29
30impl From<io::Error> for ClientError {
31 fn from(e: io::Error) -> Self {
32 ClientError::Io(e)
33 }
34}
35
36pub struct ClientConnect {
37 tx: Sender<ClientRequest>,
38}
39
40#[derive( Clone)]
41pub struct HandlerInfo {
43 id: Option<u64>,
44 named: Option<String>,
45}
46
47impl HandlerInfo {
48 pub fn new_named(name: String) -> Self {
50 Self {
51 id: None,
52 named: Some(name),
53 }
54 }
55 pub fn new_id(id: u64) -> Self {
57 Self {
58 id: Some(id),
59 named: None,
60 }
61 }
62
63 pub fn id(&self) -> Option<u64> {
64 self.id
65 }
66
67 pub fn named(&self) -> &Option<String> {
68 &self.named
69 }
70}
71pub struct DataRequest {
75 pub handler_info: HandlerInfo,
76 pub data: Vec<u8>,
77 pub s_type: Box<dyn StructureType>,
78}
79pub struct ClientRequest {
84 pub req: DataRequest,
85 pub consumer: tokio::sync::oneshot::Sender<BytesMut>,
86}
87
88impl ClientConnect {
89 pub async fn new<
97 C: Encoder<Bytes, Error = io::Error>
98 + Decoder<Item = BytesMut, Error = io::Error>
99 + Clone
100 + Send
101 + Sync
102 + 'static
103 + TfCodec,
104 >(
105 server_name: String,
106 connection_dest: String,
107 processor: Option<TrafficProcessorHolder<C>>,
108 mut codec: C,
109 client_config: Option<ClientConfig>,
110 max_request_in_time: usize,
111 ) -> Result<Self, ClientError> {
112 let socket = TcpStream::connect(connection_dest).await?;
113 socket.set_nodelay(true)?;
114
115 let mut transport = if let Some(client_config) = client_config {
116 let connector = TlsConnector::from(Arc::new(client_config));
117 let domain = server_name
118 .try_into()
119 .map_err(|_| ClientError::Tls("Invalid server name".into()))?;
120
121 let tls = connector
122 .connect(domain, socket)
123 .await
124 .map_err(|e| ClientError::Tls(e.to_string()))?;
125
126 Transport::tls_client(tls)
127 } else {
128 Transport::plain(socket)
129 };
130 if !codec.initial_setup(&mut transport).await {
131 panic!("Failed to initial setup transport");
132 }
133 let framed = Framed::new(transport, codec);
134 let (tx, rx) = mpsc::channel(max_request_in_time);
135
136 Self::connection_main(framed, processor, rx);
137
138 Ok(Self { tx })
139 }
140
141 pub async fn dispatch_request(&self, request: ClientRequest) -> Result<(), ClientError> {
143 self.tx
144 .send(request)
145 .await
146 .map_err(|_| ClientError::ChannelClosed)
147 }
148
149 fn connection_main<
150 C: Encoder<Bytes, Error = io::Error>
151 + Decoder<Item = BytesMut, Error = io::Error>
152 + Clone
153 + Send
154 + Sync
155 + 'static
156 +TfCodec,
157 >(
158 mut socket: Framed<Transport, C>,
159 processor: Option<TrafficProcessorHolder<C>>,
160 mut rx: Receiver<ClientRequest>,
161 ) {
162 let mut processor = processor.unwrap_or_else(TrafficProcessorHolder::new);
163 let mut router = TargetRouter::new();
164
165 tokio::spawn(async move {
166 while let Some(request) = rx.recv().await {
167 if let Err(err) =
168 Self::process_request(request, &mut socket, &mut processor, &mut router).await
169 {
170 eprintln!("Client request failed: {:?}", err);
171 }
172 }
173 });
174 }
175
176 async fn process_request<
177 C: Encoder<Bytes, Error = io::Error>
178 + Decoder<Item = BytesMut, Error = io::Error>
179 + Clone
180 + Send
181 + Sync
182 + 'static
183 +TfCodec,
184 >(
185 request: ClientRequest,
186 socket: &mut Framed<Transport, C>,
187 processor: &mut TrafficProcessorHolder<C>,
188 target_router: &mut TargetRouter,
189 ) -> Result<(), ClientError> {
190 let handler_id = match request.req.handler_info.id() {
191 Some(id) => id,
192 None => {
193 let name = request
194 .req
195 .handler_info
196 .named
197 .ok_or_else(|| ClientError::Protocol("Missing handler name".into()))?;
198
199 target_router
200 .request_route(name.as_str(), socket, processor)
201 .await
202 .map_err(|e| ClientError::Router(format!("{:?}", e)))?
203 }
204 };
205
206 let meta = PacketMeta {
207 s_type: SystemSType::PacketMeta,
208 s_type_req: request.req.s_type.get_serialize_function()(request.req.s_type),
209 handler_id,
210 has_payload: !request.req.data.is_empty(),
211 };
212
213 let meta_vec = s_type::to_vec(&meta)
214 .ok_or_else(|| ClientError::Protocol("PacketMeta serialization failed".into()))?;
215
216 let meta_bytes = processor.post_process_traffic(meta_vec).await;
217 let payload = processor.post_process_traffic(request.req.data).await;
218
219 socket.send(Bytes::from(meta_bytes)).await?;
220 socket.send(Bytes::from(payload)).await?;
221
222 let response = wait_for_data(socket).await?;
223 let response = processor.pre_process_traffic(response).await;
224
225 let _ = request
226 .consumer
227 .send(response);
228
229 Ok(())
230 }
231}
232
233pub async fn wait_for_data<
234 C: Encoder<Bytes, Error = io::Error>
235 + Decoder<Item = BytesMut, Error = io::Error>
236 + Clone
237 + Send
238 + Sync
239 + 'static
240 +TfCodec,
241>(
242 socket: &mut Framed<Transport, C>,
243) -> Result<BytesMut, ClientError> {
244 use futures_util::StreamExt;
245
246 match socket.next().await {
247 Some(Ok(data)) => Ok(data),
248 Some(Err(e)) => Err(ClientError::Codec(e)),
249 None => Err(ClientError::Protocol("Connection closed".into())),
250 }
251}