1use std::io::ErrorKind;
27use std::net::{IpAddr, SocketAddr};
28use std::time::Duration;
29use core::fmt::Debug;
30
31use async_trait::async_trait;
32
33use tokio::io::{AsyncWriteExt, AsyncReadExt};
34use tokio::time::timeout;
35use tokio::net::{UdpSocket, TcpStream};
36
37use socket2::{Socket, Domain, Type, Protocol, SockAddr};
38
39
40use crate::{internal_error, internal_error_map};
41
42use crate::error::*;
43
44#[async_trait]
47pub trait SocketTap
48{
49 async fn connect(&mut self) -> CDnsResult<()>;
51
52 fn is_tcp(&self) -> bool;
54
55 async fn is_conncected(&self) -> bool;
57
58 fn get_remote_addr(&self) -> &SocketAddr;
60
61 async fn send(&mut self, sndbuf: &[u8]) -> CDnsResult<usize> ;
63
64 async fn recv(&mut self, rcvbuf: &mut [u8]) -> CDnsResult<usize>;
68}
69
70
71impl Debug for dyn SocketTap + Send + Sync
72{
73 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
74 {
75 write!(f, "{:?}", self)
76 }
77}
78
79pub type NetworkTapType = (dyn SocketTap + Send + Sync);
80
81
82#[derive(Debug)]
84pub struct NetworkTap<T>
85{
86 sock: Option<T>,
88 remote_addr: SocketAddr,
90 bind_addr: SocketAddr,
92 timeout: Duration,
94}
95
96unsafe impl<T> Send for NetworkTap<T> {}
97unsafe impl<T> Sync for NetworkTap<T> {}
98
99#[async_trait]
100impl SocketTap for NetworkTap<UdpSocket>
101{
102 async
103 fn connect(&mut self) -> CDnsResult<()>
104 {
105 if self.sock.is_some() == true
106 {
107 return Ok(());
109 }
110
111 let socket =
112 UdpSocket::bind(self.bind_addr)
113 .await
114 .map_err(|e| internal_error_map!(CDnsErrorType::InternalError, "{}", e))?;
115
116 socket.connect(&self.remote_addr)
117 .await
118 .map_err(|e| internal_error_map!(CDnsErrorType::IoError, "{}", e))?;
119
120 self.sock = Some(socket);
121
122 return Ok(());
123 }
124
125 fn is_tcp(&self) -> bool
126 {
127 return false;
128 }
129
130 async
131 fn is_conncected(&self) -> bool
132 {
133 return self.sock.is_some();
134 }
135
136 fn get_remote_addr(&self) -> &SocketAddr
137 {
138 return &self.remote_addr;
139 }
140
141 async
142 fn send(&mut self, sndbuf: &[u8]) -> CDnsResult<usize>
143 {
144 return
145 self.sock.as_mut()
146 .unwrap()
147 .send(sndbuf)
148 .await
149 .map_err(|e| internal_error_map!(CDnsErrorType::IoError, "{}", e));
150 }
151
152 async
153 fn recv(&mut self, rcvbuf: &mut [u8]) -> CDnsResult<usize>
154 {
155 async
156 fn sub_recv(this: &mut NetworkTap<UdpSocket>, rcvbuf: &mut [u8]) -> CDnsResult<usize>
157 {
158 loop
159 {
160 match this.sock.as_mut().unwrap().recv_from(rcvbuf).await
161 {
162 Ok((rcv_len, rcv_src)) =>
163 {
164 if rcv_src != this.remote_addr
166 {
167 internal_error!(
168 CDnsErrorType::DnsResponse,
169 "received answer from unknown host: '{}' exp: '{}'",
170 this.remote_addr,
171 rcv_src
172 );
173 }
174
175 return Ok(rcv_len);
176 },
177 Err(ref e) if e.kind() == ErrorKind::WouldBlock =>
178 {
179 continue;
180 },
181 Err(ref e) if e.kind() == ErrorKind::Interrupted =>
182 {
183 continue;
184 },
185 Err(e) =>
186 {
187 internal_error!(CDnsErrorType::IoError, "{}", e);
188 }
189 } } }
193
194 match timeout(self.timeout, sub_recv(self, rcvbuf)).await
196 {
197 Ok(r) => return r,
198 Err(e) => internal_error!(CDnsErrorType::RequestTimeout, "{}", e)
199 }
200 }
201}
202
203#[async_trait]
204impl SocketTap for NetworkTap<TcpStream>
205{
206 async
207 fn connect(&mut self) -> CDnsResult<()>
208 {
209 if self.sock.is_some() == true
210 {
211 return Ok(());
213 }
214
215 let socket =
217 Socket::new(Domain::for_address(self.remote_addr), Type::STREAM, Some(Protocol::TCP))
218 .map_err(|e| internal_error_map!(CDnsErrorType::IoError, "{}", e))?;
219
220 socket
222 .bind(&SockAddr::from(self.bind_addr))
223 .map_err(|e| internal_error_map!(CDnsErrorType::IoError, "{}", e))?;
224
225 socket
227 .connect(&SockAddr::from(self.remote_addr))
228 .map_err(|e| internal_error_map!(CDnsErrorType::IoError, "{}", e))?;
229
230 let socket: TcpStream =
232 TcpStream::from_std(socket.into())
233 .map_err(|e| internal_error_map!(CDnsErrorType::IoError, "{}", e))?;
234
235 self.sock = Some(socket);
236
237 return Ok(());
238 }
239
240 fn is_tcp(&self) -> bool
241 {
242 return true;
243 }
244
245 async
246 fn is_conncected(&self) -> bool
247 {
248 return self.sock.is_some();
249 }
250
251 fn get_remote_addr(&self) -> &SocketAddr
252 {
253 return &self.remote_addr;
254 }
255
256 async
257 fn send(&mut self, sndbuf: &[u8]) -> CDnsResult<usize>
258 {
259 return
260 self
261 .sock
262 .as_mut()
263 .unwrap()
264 .write(sndbuf)
265 .await
266 .map_err(|e| internal_error_map!(CDnsErrorType::IoError, "{}", e));
267 }
268
269 async
270 fn recv(&mut self, rcvbuf: &mut [u8]) -> CDnsResult<usize>
271 {
272 async
273 fn sub_recv(this: &mut NetworkTap<TcpStream>, rcvbuf: &mut [u8]) -> CDnsResult<usize>
274 {
275 loop
276 {
277 match this.sock.as_mut().unwrap().read(rcvbuf).await
278 {
279 Ok(n) =>
280 {
281 return Ok(n);
282 },
283 Err(ref e) if e.kind() == ErrorKind::WouldBlock =>
284 {
285 continue;
286 },
287 Err(ref e) if e.kind() == ErrorKind::Interrupted =>
288 {
289 continue;
290 },
291 Err(e) =>
292 {
293 internal_error!(CDnsErrorType::IoError, "{}", e);
294 }
295 } } }
298
299 match timeout(self.timeout, sub_recv(self, rcvbuf)).await
301 {
302 Ok(r) => return r,
303 Err(e) => internal_error!(CDnsErrorType::RequestTimeout, "{}", e)
304 }
305 }
306}
307
308impl<T> NetworkTap<T>
309{
310
311}
312
313
314pub
331fn new_udp(
332 resolver_ip: &IpAddr,
333 resolver_port: u16,
334 bind_addr: &SocketAddr,
335 timeout: Duration
336) -> CDnsResult<Box<NetworkTapType>>
337{
338 let remote_dns_host = SocketAddr::from((resolver_ip.clone(), resolver_port));
340
341 let ret =
342 NetworkTap::<UdpSocket>
343 {
344 sock: None,
345 remote_addr: remote_dns_host,
346 bind_addr: bind_addr.clone(),
347 timeout: timeout
348 };
349
350 return Ok( Box::new(ret) );
351}
352
353pub
370fn new_tcp(
371 resolver_ip: &IpAddr,
372 resolver_port: u16,
373 bind_addr: &SocketAddr,
374 timeout: Duration
375) -> CDnsResult<Box<NetworkTapType>>
376{
377 let remote_dns_host = SocketAddr::from((resolver_ip.clone(), resolver_port));
379
380 let ret =
381 NetworkTap::<TcpStream>
382 {
383 sock: None,
384 remote_addr: remote_dns_host,
385 bind_addr: bind_addr.clone(),
386 timeout: timeout,
387 };
388
389 return Ok( Box::new(ret) );
390}
391
392
393
394#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
395async fn test_struct()
396{
397 use crate::a_sync::common::IPV4_BIND_ALL;
398
399 let ip0: IpAddr = "127.0.0.1".parse().unwrap();
400 let bind = SocketAddr::from((IPV4_BIND_ALL, 0));
401 let res = new_udp(&ip0, 53, &bind, Duration::from_secs(5));
402
403 assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
404
405 let _res = res.unwrap();
406}