1pub mod target_router;
2
3use crate::client::target_router::TargetRouter;
4use crate::log_macros::{tf_debug, tf_info, tf_warn};
5use crate::structures::s_type;
6use crate::structures::s_type::{PacketMeta, StructureType, SystemSType};
7use crate::structures::traffic_proc::TrafficProcessorHolder;
8use crate::structures::transport::Transport;
9use futures_util::SinkExt;
10use std::io;
11use std::sync::Arc;
12use tokio::net::TcpStream;
13use tokio::sync::mpsc::{Receiver, Sender};
14use tokio::sync::mpsc;
15use tokio_rustls::TlsConnector;
16use tokio_rustls::rustls::ClientConfig;
17use tokio_util::bytes::{Bytes, BytesMut};
18use tokio_util::codec::Framed;
19use crate::codec::codec_trait::TfCodec;
20
21#[derive(Clone)]
22pub enum ClientMode {
23 Tcp { client_config: Option<ClientConfig> },
25 WebSocket { url: String },
28}
29
30#[derive(Debug)]
31pub enum ClientError {
32 Io(io::Error),
33 Tls(String),
34 Codec(io::Error),
35 Router(String),
36 ChannelClosed,
37 Protocol(String),
38 Setup(String),
40}
41
42impl From<io::Error> for ClientError {
43 fn from(e: io::Error) -> Self {
44 ClientError::Io(e)
45 }
46}
47
48pub struct ClientConnect {
49 tx: Sender<ClientRequest>,
50}
51
52#[derive(Clone)]
53pub struct HandlerInfo {
55 id: Option<u64>,
56 named: Option<String>,
57}
58
59impl HandlerInfo {
60 pub fn new_named(name: String) -> Self {
62 Self {
63 id: None,
64 named: Some(name),
65 }
66 }
67 pub fn new_id(id: u64) -> Self {
69 Self {
70 id: Some(id),
71 named: None,
72 }
73 }
74
75 pub fn id(&self) -> Option<u64> {
76 self.id
77 }
78
79 pub fn named(&self) -> &Option<String> {
80 &self.named
81 }
82}
83
84pub struct DataRequest {
88 pub handler_info: HandlerInfo,
89 pub data: Vec<u8>,
90 pub s_type: Box<dyn StructureType>,
91}
92
93pub struct ClientRequest {
95 pub req: DataRequest,
96 pub consumer: tokio::sync::oneshot::Sender<BytesMut>,
97}
98
99impl ClientConnect {
100 pub async fn new<C: TfCodec>(
106 server_name: String,
107 connection_dest: String,
108 processor: Option<TrafficProcessorHolder<C>>,
109 mut codec: C,
110 mode: ClientMode,
111 max_request_in_time: usize,
112 ) -> Result<Self, ClientError> {
113 tf_info!("Connecting to {}", connection_dest);
114 let mut transport = Self::connect(server_name, connection_dest.clone(), &mode).await?;
115
116 if !codec.initial_setup(&mut transport).await {
117 tf_warn!("Codec initial_setup failed for {}", connection_dest);
118 return Err(ClientError::Setup(
119 "codec initial_setup failed".into(),
120 ));
121 }
122 tf_debug!("Connected to {}", connection_dest);
123
124 let framed = Framed::new(transport, codec);
125 let (tx, rx) = mpsc::channel(max_request_in_time);
126 Self::connection_main(framed, processor, rx);
127
128 Ok(Self { tx })
129 }
130
131 async fn connect(
132 server_name: String,
133 connection_dest: String,
134 mode: &ClientMode,
135 ) -> Result<Transport, ClientError> {
136 match mode {
137 ClientMode::Tcp { client_config } => {
138 let socket = TcpStream::connect(&connection_dest).await?;
139 socket.set_nodelay(true)?;
140
141 if let Some(cfg) = client_config {
142 let connector = TlsConnector::from(Arc::new(cfg.clone()));
143 let domain = server_name
144 .try_into()
145 .map_err(|_| ClientError::Tls("Invalid server name".into()))?;
146 let tls = connector
147 .connect(domain, socket)
148 .await
149 .map_err(|e| ClientError::Tls(e.to_string()))?;
150 Ok(Transport::tls_client(tls))
151 } else {
152 Ok(Transport::plain(socket))
153 }
154 }
155
156 ClientMode::WebSocket { url } => {
157 Transport::connect(url).await.map_err(|e| ClientError::Tls(e.to_string()))
158 }
159 }
160 }
161
162 pub async fn dispatch_request(&self, request: ClientRequest) -> Result<(), ClientError> {
164 self.tx
165 .send(request)
166 .await
167 .map_err(|_| ClientError::ChannelClosed)
168 }
169
170 fn connection_main<C: TfCodec>(
171 mut socket: Framed<Transport, C>,
172 processor: Option<TrafficProcessorHolder<C>>,
173 mut rx: Receiver<ClientRequest>,
174 ) {
175 let mut processor = processor.unwrap_or_else(TrafficProcessorHolder::new);
176 let mut router = TargetRouter::new();
177
178 tokio::spawn(async move {
179 while let Some(request) = rx.recv().await {
180 if let Err(err) =
181 Self::process_request(request, &mut socket, &mut processor, &mut router).await
182 {
183 tf_warn!("Client request failed: {:?}", err);
184 }
185 }
186 tf_debug!("Client connection loop ended");
187 });
188 }
189
190 async fn process_request<C: TfCodec>(
191 request: ClientRequest,
192 socket: &mut Framed<Transport, C>,
193 processor: &mut TrafficProcessorHolder<C>,
194 target_router: &mut TargetRouter,
195 ) -> Result<(), ClientError> {
196 let handler_id = match request.req.handler_info.id() {
197 Some(id) => id,
198 None => {
199 let name = request
200 .req
201 .handler_info
202 .named
203 .ok_or_else(|| ClientError::Protocol("Missing handler name".into()))?;
204
205 tf_debug!("Requesting route id for handler '{}'", name);
206 target_router
207 .request_route(name.as_str(), socket, processor)
208 .await
209 .map_err(|e| ClientError::Router(format!("{:?}", e)))?
210 }
211 };
212
213 let meta = PacketMeta {
214 s_type: SystemSType::PacketMeta,
215 s_type_req: request.req.s_type.get_serialize_function()(request.req.s_type),
216 handler_id,
217 has_payload: !request.req.data.is_empty(),
218 };
219
220 let meta_vec = s_type::to_vec(&meta)
221 .ok_or_else(|| ClientError::Protocol("PacketMeta serialization failed".into()))?;
222
223 let meta_bytes = processor.post_process_traffic(meta_vec).await;
224 let payload = processor.post_process_traffic(request.req.data).await;
225
226 socket.send(Bytes::from(meta_bytes)).await?;
227 socket.send(Bytes::from(payload)).await?;
228
229 let response = wait_for_data(socket).await?;
230 let response = processor.pre_process_traffic(response).await;
231
232 let _ = request.consumer.send(response);
233
234 Ok(())
235 }
236}
237
238pub async fn wait_for_data<C: TfCodec>(
239 socket: &mut Framed<Transport, C>,
240) -> Result<BytesMut, ClientError> {
241 use futures_util::StreamExt;
242
243 match socket.next().await {
244 Some(Ok(data)) => Ok(data),
245 Some(Err(e)) => Err(ClientError::Codec(e)),
246 None => Err(ClientError::Protocol("Connection closed".into())),
247 }
248}