fluvio_future/net/
mod.rs

1#[cfg(not(target_arch = "wasm32"))]
2pub use async_net::*;
3
4#[cfg(not(target_arch = "wasm32"))]
5pub mod tcp_stream;
6
7pub use conn::*;
8
9#[cfg(not(target_arch = "wasm32"))]
10pub use unix_connector::DefaultTcpDomainConnector as DefaultDomainConnector;
11
12#[cfg(target_arch = "wasm32")]
13pub use wasm_connector::DefaultDomainWebsocketConnector as DefaultDomainConnector;
14
15mod conn {
16
17    use async_trait::async_trait;
18    use futures_lite::io::{AsyncRead, AsyncWrite};
19    use std::io::Error as IoError;
20
21    pub trait Connection: AsyncRead + AsyncWrite + Send + Sync + Unpin + SplitConnection {}
22    impl<T: AsyncRead + AsyncWrite + Send + Sync + Unpin + SplitConnection> Connection for T {}
23
24    pub trait ReadConnection: AsyncRead + Send + Sync + Unpin {}
25    impl<T: AsyncRead + Send + Sync + Unpin> ReadConnection for T {}
26
27    pub trait WriteConnection: AsyncWrite + Send + Sync + Unpin {}
28    impl<T: AsyncWrite + Send + Sync + Unpin> WriteConnection for T {}
29
30    pub type BoxConnection = Box<dyn Connection>;
31    pub type BoxReadConnection = Box<dyn ReadConnection>;
32    pub type BoxWriteConnection = Box<dyn WriteConnection>;
33
34    pub trait SplitConnection {
35        // split into write and read
36        fn split_connection(self) -> (BoxWriteConnection, BoxReadConnection);
37    }
38
39    cfg_if::cfg_if! {
40        if #[cfg(unix)] {
41            pub type ConnectionFd = std::os::unix::io::RawFd;
42            pub trait AsConnectionFd: std::os::unix::io::AsRawFd {
43                fn as_connection_fd(&self) -> ConnectionFd {
44                    self.as_raw_fd()
45                }
46            }
47            impl AsConnectionFd for async_net::TcpStream { }
48        } else if #[cfg(windows)] {
49            pub type ConnectionFd = std::os::windows::io::RawSocket;
50            pub trait AsConnectionFd: std::os::windows::io::AsRawSocket {
51                fn as_connection_fd(&self) -> ConnectionFd {
52                    self.as_raw_socket()
53                }
54            }
55            impl AsConnectionFd for async_net::TcpStream { }
56        } else {
57            pub type ConnectionFd = String;
58        }
59    }
60
61    pub type DomainConnector = Box<dyn TcpDomainConnector>;
62
63    impl Clone for DomainConnector {
64        fn clone(&self) -> DomainConnector {
65            self.clone_box()
66        }
67    }
68
69    pub trait AsyncConnector: Send + Sync {}
70
71    impl<T: Send + Sync> AsyncConnector for T {}
72
73    /// connect to domain and return connection
74    #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
75    #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
76    pub trait TcpDomainConnector: AsyncConnector {
77        async fn connect(
78            &self,
79            domain: &str,
80        ) -> Result<(BoxWriteConnection, BoxReadConnection, ConnectionFd), IoError>;
81
82        // create new version of my self with new domain
83        fn new_domain(&self, domain: String) -> DomainConnector;
84
85        fn domain(&self) -> &str;
86
87        fn clone_box(&self) -> DomainConnector;
88    }
89}
90
91#[cfg(not(target_arch = "wasm32"))]
92pub mod certs {
93
94    use std::fs::File;
95    use std::io::BufRead;
96    use std::io::BufReader;
97    use std::path::Path;
98
99    use anyhow::Result;
100    use tracing::debug;
101
102    pub trait CertBuilder: Sized {
103        fn new(bytes: Vec<u8>) -> Self;
104
105        fn from_reader(reader: &mut dyn BufRead) -> Result<Self> {
106            let mut bytes = vec![];
107            reader.read_to_end(&mut bytes)?;
108            Ok(Self::new(bytes))
109        }
110
111        fn from_path(path: impl AsRef<Path>) -> Result<Self> {
112            debug!("loading cert from: {}", path.as_ref().display());
113            let mut reader = BufReader::new(File::open(path)?);
114            Self::from_reader(&mut reader)
115        }
116    }
117}
118
119#[cfg(target_arch = "wasm32")]
120mod wasm_connector {
121    use super::*;
122    use async_trait::async_trait;
123    use futures_util::io::AsyncReadExt;
124    use std::io::Error as IoError;
125    use ws_stream_wasm::WsMeta;
126
127    #[derive(Clone, Default)]
128    pub struct DefaultDomainWebsocketConnector {}
129    impl DefaultDomainWebsocketConnector {
130        pub fn new() -> Self {
131            Self {}
132        }
133    }
134    #[async_trait(?Send)]
135    impl TcpDomainConnector for DefaultDomainWebsocketConnector {
136        async fn connect(
137            &self,
138            addr: &str,
139        ) -> Result<(BoxWriteConnection, BoxReadConnection, ConnectionFd), IoError> {
140            let (mut _ws, wsstream) = WsMeta::connect(addr, None)
141                .await
142                .map_err(|e| IoError::new(std::io::ErrorKind::Other, e))?;
143            let wsstream_io = wsstream.into_io();
144            let (stream, sink) = wsstream_io.split();
145            Ok((Box::new(sink), Box::new(stream), String::from(addr)))
146        }
147
148        fn new_domain(&self, _domain: String) -> DomainConnector {
149            Box::new(self.clone())
150        }
151
152        fn domain(&self) -> &str {
153            "localhost"
154        }
155
156        fn clone_box(&self) -> DomainConnector {
157            Box::new(self.clone())
158        }
159    }
160}
161
162#[cfg(not(target_arch = "wasm32"))]
163mod unix_connector {
164    use async_trait::async_trait;
165    use std::io::Error as IoError;
166    use tracing::debug;
167
168    use super::tcp_stream::stream;
169
170    use super::*;
171
172    impl SplitConnection for TcpStream {
173        fn split_connection(self) -> (BoxWriteConnection, BoxReadConnection) {
174            (Box::new(self.clone()), Box::new(self))
175        }
176    }
177
178    /// creates TcpStream connection
179    #[derive(Clone, Default)]
180    pub struct DefaultTcpDomainConnector {}
181
182    impl DefaultTcpDomainConnector {
183        pub fn new() -> Self {
184            Self {}
185        }
186    }
187
188    #[async_trait]
189    impl TcpDomainConnector for DefaultTcpDomainConnector {
190        async fn connect(
191            &self,
192            addr: &str,
193        ) -> Result<(BoxWriteConnection, BoxReadConnection, ConnectionFd), IoError> {
194            debug!("connect to tcp addr: {}", addr);
195            let tcp_stream = stream(addr).await?;
196
197            let fd = tcp_stream.as_connection_fd();
198            Ok((Box::new(tcp_stream.clone()), Box::new(tcp_stream), fd))
199        }
200
201        fn new_domain(&self, _domain: String) -> DomainConnector {
202            Box::new(self.clone())
203        }
204
205        fn domain(&self) -> &str {
206            "localhost"
207        }
208
209        fn clone_box(&self) -> DomainConnector {
210            Box::new(self.clone())
211        }
212    }
213}
214
215#[cfg(test)]
216#[cfg(not(target_arch = "wasm32"))]
217mod test {
218    use std::time;
219
220    use futures_lite::future::zip;
221    use futures_lite::stream::StreamExt;
222    use futures_util::AsyncReadExt;
223    use tracing::debug;
224
225    use crate::net::TcpListener;
226    use crate::net::tcp_stream::stream;
227    use crate::test_async;
228    use crate::timer::sleep;
229
230    use super::*;
231
232    #[test_async]
233    async fn test_clone() -> Result<(), ()> {
234        let addr = format!("127.0.0.1:{}", 39000)
235            .parse::<SocketAddr>()
236            .expect("parse");
237
238        let server_ft = async {
239            debug!("server: binding");
240            let listener = TcpListener::bind(&addr).await.expect("listener failed");
241            debug!("server: successfully binding. waiting for incoming");
242
243            let mut incoming = listener.incoming();
244            let incoming_stream = incoming.next().await.expect("incoming");
245
246            debug!("server: got connection from client");
247            let _ = incoming_stream.expect("no stream");
248
249            // sleep 1 seconds so we don't lost connection
250            sleep(time::Duration::from_secs(1)).await;
251
252            Ok(()) as Result<(), ()>
253        };
254
255        let client_ft = async {
256            sleep(time::Duration::from_millis(100)).await;
257            let tcp_stream = stream(&addr).await.expect("test");
258            let (_read, _write) = tcp_stream.split();
259        };
260
261        let _ = zip(client_ft, server_ft).await;
262
263        Ok(())
264    }
265}
266#[cfg(test)]
267#[cfg(target_arch = "wasm32")]
268mod test {
269    use super::*;
270    use futures_util::{AsyncReadExt, AsyncWriteExt};
271    use wasm_bindgen_test::*;
272
273    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
274    #[wasm_bindgen_test]
275    async fn test_connect() {
276        tracing_wasm::set_as_global_default();
277
278        let addr = "ws://127.0.0.1:1234";
279        let input_msg = "foobar".to_string();
280
281        let websocket_stream = DefaultDomainConnector::default();
282        let (mut writer, mut reader, _id) = websocket_stream.connect(addr).await.expect("test");
283
284        writer
285            .write(input_msg.as_bytes())
286            .await
287            .expect("Failed to write");
288
289        let mut output = vec![0; input_msg.len()];
290        let size = reader.read(&mut output).await.expect("Failed to read");
291        assert_eq!(output, input_msg.as_bytes());
292        assert_eq!(size, input_msg.len());
293    }
294}