logo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
//! UnixListener module
use std::io::{Error as IoError, Result as IoResult};
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};

use hyper::server::accept::Accept;
pub use hyper::Server;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};

use super::Listener;
use crate::addr::SocketAddr;
use crate::transport::Transport;

/// Unix domain socket listener.
#[cfg(unix)]
pub struct UnixListener {
    incoming: tokio::net::UnixListener,
}
#[cfg(unix)]
impl UnixListener {
    /// Creates a new `UnixListener` bind to the specified path.
    #[inline]
    pub fn bind(path: impl AsRef<Path>) -> UnixListener {
        Self::try_bind(path).unwrap()
    }
    /// Creates a new `UnixListener` bind to the specified path.
    ///
    /// # Panics
    ///
    /// This function panics if thread-local runtime is not set.
    ///
    /// The runtime is usually set implicitly when this function is called
    /// from a future driven by a tokio runtime.
    #[inline]
    pub fn try_bind(path: impl AsRef<Path>) -> IoResult<UnixListener> {
        Ok(UnixListener {
            incoming: tokio::net::UnixListener::bind(path)?,
        })
    }
}

#[cfg(unix)]
impl Listener for UnixListener {}
#[cfg(unix)]
impl Accept for UnixListener {
    type Conn = UnixStream;
    type Error = IoError;

    #[inline]
    fn poll_accept(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
        match self.incoming.poll_accept(cx) {
            Poll::Ready(Ok((stream, remote_addr))) => {
                Poll::Ready(Some(Ok(UnixStream::new(stream, remote_addr.into()))))
            }
            Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
            Poll::Pending => Poll::Pending,
        }
    }
}

/// UnixStream
pub struct UnixStream {
    inner_stream: tokio::net::UnixStream,
    remote_addr: SocketAddr,
}
impl Transport for UnixStream {
    #[inline]
    fn remote_addr(&self) -> Option<SocketAddr> {
        Some(self.remote_addr.clone())
    }
}

impl UnixStream {
    #[inline]
    fn new(inner_stream: tokio::net::UnixStream, remote_addr: SocketAddr) -> Self {
        UnixStream {
            inner_stream,
            remote_addr,
        }
    }
}

impl AsyncRead for UnixStream {
    #[inline]
    fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut ReadBuf) -> Poll<IoResult<()>> {
        Pin::new(&mut self.get_mut().inner_stream).poll_read(cx, buf)
    }
}

impl AsyncWrite for UnixStream {
    #[inline]
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
        Pin::new(&mut self.get_mut().inner_stream).poll_write(cx, buf)
    }

    #[inline]
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
        Pin::new(&mut self.get_mut().inner_stream).poll_flush(cx)
    }

    #[inline]
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
        Pin::new(&mut self.get_mut().inner_stream).poll_shutdown(cx)
    }
}

#[cfg(test)]
mod tests {
    use futures_util::{Stream, StreamExt};
    use tokio::io::{AsyncReadExt, AsyncWriteExt};

    use super::*;

    impl Stream for UnixListener {
        type Item = Result<UnixStream, IoError>;

        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
            self.poll_accept(cx)
        }
    }
    #[tokio::test]
    async fn test_unix_listener() {
        let sock_file = "/tmp/test-salvo.sock";
        let mut listener = UnixListener::bind(sock_file);

        tokio::spawn(async move {
            let mut stream = tokio::net::UnixStream::connect(sock_file).await.unwrap();
            stream.write_i32(518).await.unwrap();
        });

        let mut stream = listener.next().await.unwrap().unwrap();
        assert_eq!(stream.read_i32().await.unwrap(), 518);
        std::fs::remove_file(sock_file).unwrap();
    }
}