1#[cfg(not(target_arch = "wasm32"))]
2pub use async_net::*;
3
4#[cfg(not(target_arch = "wasm32"))]
5pub mod tcp_stream;
6
7pub use conn::*;
8
9#[cfg(not(target_arch = "wasm32"))]
10pub use unix_connector::DefaultTcpDomainConnector as DefaultDomainConnector;
11
12#[cfg(target_arch = "wasm32")]
13pub use wasm_connector::DefaultDomainWebsocketConnector as DefaultDomainConnector;
14
15mod conn {
16
17 use async_trait::async_trait;
18 use futures_lite::io::{AsyncRead, AsyncWrite};
19 use std::io::Error as IoError;
20
21 pub trait Connection: AsyncRead + AsyncWrite + Send + Sync + Unpin + SplitConnection {}
22 impl<T: AsyncRead + AsyncWrite + Send + Sync + Unpin + SplitConnection> Connection for T {}
23
24 pub trait ReadConnection: AsyncRead + Send + Sync + Unpin {}
25 impl<T: AsyncRead + Send + Sync + Unpin> ReadConnection for T {}
26
27 pub trait WriteConnection: AsyncWrite + Send + Sync + Unpin {}
28 impl<T: AsyncWrite + Send + Sync + Unpin> WriteConnection for T {}
29
30 pub type BoxConnection = Box<dyn Connection>;
31 pub type BoxReadConnection = Box<dyn ReadConnection>;
32 pub type BoxWriteConnection = Box<dyn WriteConnection>;
33
34 pub trait SplitConnection {
35 fn split_connection(self) -> (BoxWriteConnection, BoxReadConnection);
37 }
38
39 cfg_if::cfg_if! {
40 if #[cfg(unix)] {
41 pub type ConnectionFd = std::os::unix::io::RawFd;
42 pub trait AsConnectionFd: std::os::unix::io::AsRawFd {
43 fn as_connection_fd(&self) -> ConnectionFd {
44 self.as_raw_fd()
45 }
46 }
47 impl AsConnectionFd for async_net::TcpStream { }
48 } else if #[cfg(windows)] {
49 pub type ConnectionFd = std::os::windows::io::RawSocket;
50 pub trait AsConnectionFd: std::os::windows::io::AsRawSocket {
51 fn as_connection_fd(&self) -> ConnectionFd {
52 self.as_raw_socket()
53 }
54 }
55 impl AsConnectionFd for async_net::TcpStream { }
56 } else {
57 pub type ConnectionFd = String;
58 }
59 }
60
61 pub type DomainConnector = Box<dyn TcpDomainConnector>;
62
63 impl Clone for DomainConnector {
64 fn clone(&self) -> DomainConnector {
65 self.clone_box()
66 }
67 }
68
69 pub trait AsyncConnector: Send + Sync {}
70
71 impl<T: Send + Sync> AsyncConnector for T {}
72
73 #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
75 #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
76 pub trait TcpDomainConnector: AsyncConnector {
77 async fn connect(
78 &self,
79 domain: &str,
80 ) -> Result<(BoxWriteConnection, BoxReadConnection, ConnectionFd), IoError>;
81
82 fn new_domain(&self, domain: String) -> DomainConnector;
84
85 fn domain(&self) -> &str;
86
87 fn clone_box(&self) -> DomainConnector;
88 }
89}
90
91#[cfg(not(target_arch = "wasm32"))]
92pub mod certs {
93
94 use std::fs::File;
95 use std::io::BufRead;
96 use std::io::BufReader;
97 use std::path::Path;
98
99 use anyhow::Result;
100 use tracing::debug;
101
102 pub trait CertBuilder: Sized {
103 fn new(bytes: Vec<u8>) -> Self;
104
105 fn from_reader(reader: &mut dyn BufRead) -> Result<Self> {
106 let mut bytes = vec![];
107 reader.read_to_end(&mut bytes)?;
108 Ok(Self::new(bytes))
109 }
110
111 fn from_path(path: impl AsRef<Path>) -> Result<Self> {
112 debug!("loading cert from: {}", path.as_ref().display());
113 let mut reader = BufReader::new(File::open(path)?);
114 Self::from_reader(&mut reader)
115 }
116 }
117}
118
119#[cfg(target_arch = "wasm32")]
120mod wasm_connector {
121 use super::*;
122 use async_trait::async_trait;
123 use futures_util::io::AsyncReadExt;
124 use std::io::Error as IoError;
125 use ws_stream_wasm::WsMeta;
126
127 #[derive(Clone, Default)]
128 pub struct DefaultDomainWebsocketConnector {}
129 impl DefaultDomainWebsocketConnector {
130 pub fn new() -> Self {
131 Self {}
132 }
133 }
134 #[async_trait(?Send)]
135 impl TcpDomainConnector for DefaultDomainWebsocketConnector {
136 async fn connect(
137 &self,
138 addr: &str,
139 ) -> Result<(BoxWriteConnection, BoxReadConnection, ConnectionFd), IoError> {
140 let (mut _ws, wsstream) = WsMeta::connect(addr, None)
141 .await
142 .map_err(|e| IoError::new(std::io::ErrorKind::Other, e))?;
143 let wsstream_io = wsstream.into_io();
144 let (stream, sink) = wsstream_io.split();
145 Ok((Box::new(sink), Box::new(stream), String::from(addr)))
146 }
147
148 fn new_domain(&self, _domain: String) -> DomainConnector {
149 Box::new(self.clone())
150 }
151
152 fn domain(&self) -> &str {
153 "localhost"
154 }
155
156 fn clone_box(&self) -> DomainConnector {
157 Box::new(self.clone())
158 }
159 }
160}
161
162#[cfg(not(target_arch = "wasm32"))]
163mod unix_connector {
164 use async_trait::async_trait;
165 use std::io::Error as IoError;
166 use tracing::debug;
167
168 use super::tcp_stream::stream;
169
170 use super::*;
171
172 impl SplitConnection for TcpStream {
173 fn split_connection(self) -> (BoxWriteConnection, BoxReadConnection) {
174 (Box::new(self.clone()), Box::new(self))
175 }
176 }
177
178 #[derive(Clone, Default)]
180 pub struct DefaultTcpDomainConnector {}
181
182 impl DefaultTcpDomainConnector {
183 pub fn new() -> Self {
184 Self {}
185 }
186 }
187
188 #[async_trait]
189 impl TcpDomainConnector for DefaultTcpDomainConnector {
190 async fn connect(
191 &self,
192 addr: &str,
193 ) -> Result<(BoxWriteConnection, BoxReadConnection, ConnectionFd), IoError> {
194 debug!("connect to tcp addr: {}", addr);
195 let tcp_stream = stream(addr).await?;
196
197 let fd = tcp_stream.as_connection_fd();
198 Ok((Box::new(tcp_stream.clone()), Box::new(tcp_stream), fd))
199 }
200
201 fn new_domain(&self, _domain: String) -> DomainConnector {
202 Box::new(self.clone())
203 }
204
205 fn domain(&self) -> &str {
206 "localhost"
207 }
208
209 fn clone_box(&self) -> DomainConnector {
210 Box::new(self.clone())
211 }
212 }
213}
214
215#[cfg(test)]
216#[cfg(not(target_arch = "wasm32"))]
217mod test {
218 use std::time;
219
220 use futures_lite::future::zip;
221 use futures_lite::stream::StreamExt;
222 use futures_util::AsyncReadExt;
223 use tracing::debug;
224
225 use crate::net::TcpListener;
226 use crate::net::tcp_stream::stream;
227 use crate::test_async;
228 use crate::timer::sleep;
229
230 use super::*;
231
232 #[test_async]
233 async fn test_clone() -> Result<(), ()> {
234 let addr = format!("127.0.0.1:{}", 39000)
235 .parse::<SocketAddr>()
236 .expect("parse");
237
238 let server_ft = async {
239 debug!("server: binding");
240 let listener = TcpListener::bind(&addr).await.expect("listener failed");
241 debug!("server: successfully binding. waiting for incoming");
242
243 let mut incoming = listener.incoming();
244 let incoming_stream = incoming.next().await.expect("incoming");
245
246 debug!("server: got connection from client");
247 let _ = incoming_stream.expect("no stream");
248
249 sleep(time::Duration::from_secs(1)).await;
251
252 Ok(()) as Result<(), ()>
253 };
254
255 let client_ft = async {
256 sleep(time::Duration::from_millis(100)).await;
257 let tcp_stream = stream(&addr).await.expect("test");
258 let (_read, _write) = tcp_stream.split();
259 };
260
261 let _ = zip(client_ft, server_ft).await;
262
263 Ok(())
264 }
265}
266#[cfg(test)]
267#[cfg(target_arch = "wasm32")]
268mod test {
269 use super::*;
270 use futures_util::{AsyncReadExt, AsyncWriteExt};
271 use wasm_bindgen_test::*;
272
273 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
274 #[wasm_bindgen_test]
275 async fn test_connect() {
276 tracing_wasm::set_as_global_default();
277
278 let addr = "ws://127.0.0.1:1234";
279 let input_msg = "foobar".to_string();
280
281 let websocket_stream = DefaultDomainConnector::default();
282 let (mut writer, mut reader, _id) = websocket_stream.connect(addr).await.expect("test");
283
284 writer
285 .write(input_msg.as_bytes())
286 .await
287 .expect("Failed to write");
288
289 let mut output = vec![0; input_msg.len()];
290 let size = reader.read(&mut output).await.expect("Failed to read");
291 assert_eq!(output, input_msg.as_bytes());
292 assert_eq!(size, input_msg.len());
293 }
294}