1use std::fmt;
2use std::io;
3use std::net::SocketAddr;
4use std::path::PathBuf;
5use std::pin::Pin;
6use std::task::{Poll, Context};
7
8use async_std::io::{Read, Write, IoSlice, IoSliceMut};
9use async_std::net::{TcpStream, Shutdown};
10#[cfg(unix)] use async_std::os::unix::net::UnixStream;
11
12use crate::backpressure::Token;
13
14
15#[derive(Debug, Clone)]
16enum Stream {
17 Tcp(TcpStream),
18 #[cfg(unix)]
19 Unix(UnixStream),
20}
21
22#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub enum PeerAddr {
33 Tcp(SocketAddr),
35 Unix(Option<PathBuf>),
37}
38
39#[derive(Debug, Clone)]
59pub struct ByteStream {
60 stream: Stream,
61 token: Option<Token>,
62}
63
64trait Assert: Read + Write + Send + Unpin + 'static { }
65impl Assert for ByteStream {}
66
67impl fmt::Display for PeerAddr {
68 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
69 match self {
70 PeerAddr::Tcp(s) => s.fmt(f),
71 PeerAddr::Unix(None) => "<unnamed>".fmt(f),
72 PeerAddr::Unix(Some(s)) => s.display().fmt(f),
73 }
74 }
75}
76
77impl ByteStream {
78 pub fn new_tcp(token: Token, stream: TcpStream) -> ByteStream {
80 ByteStream {
81 stream: Stream::Tcp(stream),
82 token: Some(token),
83 }
84 }
85
86 pub fn new_tcp_detached(stream: TcpStream) -> ByteStream {
93 ByteStream {
94 stream: Stream::Tcp(stream),
95 token: None,
96 }
97 }
98
99 #[cfg(unix)]
101 pub fn new_unix(token: Token, stream: UnixStream) -> ByteStream {
102 ByteStream {
103 stream: Stream::Unix(stream),
104 token: Some(token),
105 }
106 }
107
108 #[cfg(unix)]
115 pub fn new_unix_detached(stream: UnixStream) -> ByteStream {
116 ByteStream {
117 stream: Stream::Unix(stream),
118 token: None,
119 }
120 }
121
122 pub fn peer_addr(&self) -> io::Result<PeerAddr> {
139 match &self.stream {
140 Stream::Tcp(s) => s.peer_addr().map(PeerAddr::Tcp),
141 #[cfg(unix)]
142 Stream::Unix(s) => {
143 s.peer_addr()
144 .map(|a| a.as_pathname().map(|p| p.to_owned()))
145 .map(PeerAddr::Unix)
146 }
147 }
148 }
149
150 pub fn nodelay(&self) -> io::Result<bool> {
159 match &self.stream {
160 Stream::Tcp(s) => s.nodelay(),
161 #[cfg(unix)]
162 Stream::Unix(_) => Ok(true),
163 }
164 }
165
166 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
177 match &self.stream {
178 Stream::Tcp(s) => s.set_nodelay(nodelay),
179 #[cfg(unix)]
180 Stream::Unix(_) => Ok(()),
181 }
182 }
183
184 pub fn shutdown(&self, how: Shutdown) -> Result<(), io::Error> {
190 match &self.stream {
191 Stream::Tcp(s) => s.shutdown(how),
192 #[cfg(unix)]
193 Stream::Unix(s) => s.shutdown(how),
194 }
195 }
196}
197
198impl From<(Token, TcpStream)> for ByteStream {
199 fn from((token, stream): (Token, TcpStream)) -> ByteStream {
200 ByteStream::new_tcp(token, stream)
201 }
202}
203
204#[cfg(unix)]
205impl From<(Token, UnixStream)> for ByteStream {
206 fn from((token, stream): (Token, UnixStream)) -> ByteStream {
207 ByteStream::new_unix(token, stream)
208 }
209}
210
211impl Read for ByteStream {
212
213 fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8])
214 -> Poll<Result<usize, io::Error>>
215 {
216 match self.stream {
217 Stream::Tcp(ref s) => {
218 Pin::new(&mut &*s).poll_read(cx, buf)
219 }
220 #[cfg(unix)]
221 Stream::Unix(ref s) => {
222 Pin::new(&mut &*s).poll_read(cx, buf)
223 }
224 }
225 }
226
227 fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context,
228 bufs: &mut [IoSliceMut])
229 -> Poll<Result<usize, io::Error>>
230 {
231 match self.stream {
232 Stream::Tcp(ref s) => {
233 Pin::new(&mut &*s).poll_read_vectored(cx, bufs)
234 }
235 #[cfg(unix)]
236 Stream::Unix(ref s) => {
237 Pin::new(&mut &*s).poll_read_vectored(cx, bufs)
238 }
239 }
240 }
241}
242
243impl Read for &ByteStream {
244 fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8])
245 -> Poll<Result<usize, io::Error>>
246 {
247 match self.stream {
248 Stream::Tcp(ref s) => {
249 Pin::new(&mut &*s).poll_read(cx, buf)
250 }
251 #[cfg(unix)]
252 Stream::Unix(ref s) => {
253 Pin::new(&mut &*s).poll_read(cx, buf)
254 }
255 }
256 }
257 fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context,
258 bufs: &mut [IoSliceMut])
259 -> Poll<Result<usize, io::Error>>
260 {
261 match self.stream {
262 Stream::Tcp(ref s) => {
263 Pin::new(&mut &*s).poll_read_vectored(cx, bufs)
264 }
265 #[cfg(unix)]
266 Stream::Unix(ref s) => {
267 Pin::new(&mut &*s).poll_read_vectored(cx, bufs)
268 }
269 }
270 }
271}
272
273impl Write for ByteStream {
274 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8])
275 -> Poll<Result<usize, io::Error>>
276 {
277 match self.stream {
278 Stream::Tcp(ref s) => {
279 Pin::new(&mut &*s).poll_write(cx, buf)
280 }
281 #[cfg(unix)]
282 Stream::Unix(ref s) => {
283 Pin::new(&mut &*s).poll_write(cx, buf)
284 }
285 }
286 }
287 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context)
288 -> Poll<Result<(), io::Error>>
289 {
290 match self.stream {
291 Stream::Tcp(ref s) => {
292 Pin::new(&mut &*s).poll_flush(cx)
293 }
294 #[cfg(unix)]
295 Stream::Unix(ref s) => {
296 Pin::new(&mut &*s).poll_flush(cx)
297 }
298 }
299 }
300 fn poll_close(self: Pin<&mut Self>, cx: &mut Context)
301 -> Poll<Result<(), io::Error>>
302 {
303 match self.stream {
304 Stream::Tcp(ref s) => {
305 Pin::new(&mut &*s).poll_close(cx)
306 }
307 #[cfg(unix)]
308 Stream::Unix(ref s) => {
309 Pin::new(&mut &*s).poll_close(cx)
310 }
311 }
312 }
313 fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context,
314 bufs: &[IoSlice])
315 -> Poll<Result<usize, io::Error>>
316 {
317 match self.stream {
318 Stream::Tcp(ref s) => {
319 Pin::new(&mut &*s).poll_write_vectored(cx, bufs)
320 }
321 #[cfg(unix)]
322 Stream::Unix(ref s) => {
323 Pin::new(&mut &*s).poll_write_vectored(cx, bufs)
324 }
325 }
326 }
327}
328
329impl Write for &ByteStream {
330 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8])
331 -> Poll<Result<usize, io::Error>>
332 {
333 match self.stream {
334 Stream::Tcp(ref s) => {
335 Pin::new(&mut &*s).poll_write(cx, buf)
336 }
337 #[cfg(unix)]
338 Stream::Unix(ref s) => {
339 Pin::new(&mut &*s).poll_write(cx, buf)
340 }
341 }
342 }
343 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context)
344 -> Poll<Result<(), io::Error>>
345 {
346 match self.stream {
347 Stream::Tcp(ref s) => {
348 Pin::new(&mut &*s).poll_flush(cx)
349 }
350 #[cfg(unix)]
351 Stream::Unix(ref s) => {
352 Pin::new(&mut &*s).poll_flush(cx)
353 }
354 }
355 }
356 fn poll_close(self: Pin<&mut Self>, cx: &mut Context)
357 -> Poll<Result<(), io::Error>>
358 {
359 match self.stream {
360 Stream::Tcp(ref s) => {
361 Pin::new(&mut &*s).poll_close(cx)
362 }
363 #[cfg(unix)]
364 Stream::Unix(ref s) => {
365 Pin::new(&mut &*s).poll_close(cx)
366 }
367 }
368 }
369 fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context,
370 bufs: &[IoSlice])
371 -> Poll<Result<usize, io::Error>>
372 {
373 match self.stream {
374 Stream::Tcp(ref s) => {
375 Pin::new(&mut &*s).poll_write_vectored(cx, bufs)
376 }
377 #[cfg(unix)]
378 Stream::Unix(ref s) => {
379 Pin::new(&mut &*s).poll_write_vectored(cx, bufs)
380 }
381 }
382 }
383}