1pub mod target_router;
2
3use crate::client::target_router::TargetRouter;
4use crate::structures::s_type;
5use crate::structures::s_type::{PacketMeta, StructureType, SystemSType};
6use crate::structures::traffic_proc::TrafficProcessorHolder;
7use crate::structures::transport::Transport;
8use futures_util::SinkExt;
9use std::io;
10use std::sync::Arc;
11use tokio::net::TcpStream;
12use tokio::sync::mpsc::{Receiver, Sender};
13use tokio::sync::{mpsc};
14use tokio_rustls::TlsConnector;
15use tokio_rustls::rustls::ClientConfig;
16use tokio_util::bytes::{Bytes, BytesMut};
17use tokio_util::codec::{ Framed};
18use crate::codec::codec_trait::TfCodec;
19
20#[derive(Clone)]
21pub enum ClientMode {
22 Tcp { client_config: Option<ClientConfig> },
24 WebSocket { url: String },
27}
28
29#[derive(Debug)]
30pub enum ClientError {
31 Io(io::Error),
32 Tls(String),
33 Codec(io::Error),
34 Router(String),
35 ChannelClosed,
36 Protocol(String),
37}
38
39impl From<io::Error> for ClientError {
40 fn from(e: io::Error) -> Self {
41 ClientError::Io(e)
42 }
43}
44
45pub struct ClientConnect {
46 tx: Sender<ClientRequest>,
47}
48
49#[derive( Clone)]
50pub struct HandlerInfo {
52 id: Option<u64>,
53 named: Option<String>,
54}
55
56impl HandlerInfo {
57 pub fn new_named(name: String) -> Self {
59 Self {
60 id: None,
61 named: Some(name),
62 }
63 }
64 pub fn new_id(id: u64) -> Self {
66 Self {
67 id: Some(id),
68 named: None,
69 }
70 }
71
72 pub fn id(&self) -> Option<u64> {
73 self.id
74 }
75
76 pub fn named(&self) -> &Option<String> {
77 &self.named
78 }
79}
80pub struct DataRequest {
84 pub handler_info: HandlerInfo,
85 pub data: Vec<u8>,
86 pub s_type: Box<dyn StructureType>,
87}
88pub struct ClientRequest {
93 pub req: DataRequest,
94 pub consumer: tokio::sync::oneshot::Sender<BytesMut>,
95}
96
97impl ClientConnect {
98 pub async fn new<C: TfCodec>(
106 server_name: String,
107 connection_dest: String,
108 processor: Option<TrafficProcessorHolder<C>>,
109 mut codec: C,
110 mode: ClientMode, max_request_in_time: usize,
112 ) -> Result<Self, ClientError> {
113 let mut transport = Self::connect(server_name, connection_dest, &mode).await?;
114
115 if !codec.initial_setup(&mut transport).await {
116 panic!("Failed to initial setup transport");
117 }
118
119 let framed = Framed::new(transport, codec);
120 let (tx, rx) = mpsc::channel(max_request_in_time);
121 Self::connection_main(framed, processor, rx);
122
123 Ok(Self { tx })
124 }
125 async fn connect(
126 server_name: String,
127 connection_dest: String,
128 mode: &ClientMode,
129 ) -> Result<Transport, ClientError> {
130 match mode {
131 ClientMode::Tcp { client_config } => {
132 let socket = TcpStream::connect(&connection_dest).await?;
133 socket.set_nodelay(true)?;
134
135 if let Some(cfg) = client_config {
136 let connector = TlsConnector::from(Arc::new(cfg.clone()));
137 let domain = server_name
138 .try_into()
139 .map_err(|_| ClientError::Tls("Invalid server name".into()))?;
140 let tls = connector
141 .connect(domain, socket)
142 .await
143 .map_err(|e| ClientError::Tls(e.to_string()))?;
144 Ok(Transport::tls_client(tls))
145 } else {
146 Ok(Transport::plain(socket))
147 }
148 }
149
150 ClientMode::WebSocket { url } => {
151 Transport::connect(url).await.map_err(|e| ClientError::Tls(e.to_string()))
152 }
153 }
154 }
155
156 pub async fn dispatch_request(&self, request: ClientRequest) -> Result<(), ClientError> {
158 self.tx
159 .send(request)
160 .await
161 .map_err(|_| ClientError::ChannelClosed)
162 }
163
164 fn connection_main<
165 C: TfCodec,
166 >(
167 mut socket: Framed<Transport, C>,
168 processor: Option<TrafficProcessorHolder<C>>,
169 mut rx: Receiver<ClientRequest>,
170 ) {
171 let mut processor = processor.unwrap_or_else(TrafficProcessorHolder::new);
172 let mut router = TargetRouter::new();
173
174 tokio::spawn(async move {
175 while let Some(request) = rx.recv().await {
176 if let Err(err) =
177 Self::process_request(request, &mut socket, &mut processor, &mut router).await
178 {
179 eprintln!("Client request failed: {:?}", err);
180 }
181 }
182 });
183 }
184
185 async fn process_request<
186 C: TfCodec,
187 >(
188 request: ClientRequest,
189 socket: &mut Framed<Transport, C>,
190 processor: &mut TrafficProcessorHolder<C>,
191 target_router: &mut TargetRouter,
192 ) -> Result<(), ClientError> {
193 let handler_id = match request.req.handler_info.id() {
194 Some(id) => id,
195 None => {
196 let name = request
197 .req
198 .handler_info
199 .named
200 .ok_or_else(|| ClientError::Protocol("Missing handler name".into()))?;
201
202 target_router
203 .request_route(name.as_str(), socket, processor)
204 .await
205 .map_err(|e| ClientError::Router(format!("{:?}", e)))?
206 }
207 };
208
209 let meta = PacketMeta {
210 s_type: SystemSType::PacketMeta,
211 s_type_req: request.req.s_type.get_serialize_function()(request.req.s_type),
212 handler_id,
213 has_payload: !request.req.data.is_empty(),
214 };
215
216 let meta_vec = s_type::to_vec(&meta)
217 .ok_or_else(|| ClientError::Protocol("PacketMeta serialization failed".into()))?;
218
219 let meta_bytes = processor.post_process_traffic(meta_vec).await;
220 let payload = processor.post_process_traffic(request.req.data).await;
221
222 socket.send(Bytes::from(meta_bytes)).await?;
223 socket.send(Bytes::from(payload)).await?;
224
225 let response = wait_for_data(socket).await?;
226 let response = processor.pre_process_traffic(response).await;
227
228 let _ = request
229 .consumer
230 .send(response);
231
232 Ok(())
233 }
234}
235
236pub async fn wait_for_data<
237 C: TfCodec,
238>(
239 socket: &mut Framed<Transport, C>,
240) -> Result<BytesMut, ClientError> {
241 use futures_util::StreamExt;
242
243 match socket.next().await {
244 Some(Ok(data)) => Ok(data),
245 Some(Err(e)) => Err(ClientError::Codec(e)),
246 None => Err(ClientError::Protocol("Connection closed".into())),
247 }
248}