romio/uds/
listener.rs

1use super::UnixStream;
2
3use crate::raw::PollEvented;
4
5use async_ready::{AsyncReady, TakeError};
6use futures::{ready, Poll, Stream};
7use mio_uds;
8
9use std::fmt;
10use std::io;
11use std::os::unix::io::{AsRawFd, RawFd};
12use std::os::unix::net::{self, SocketAddr};
13use std::path::Path;
14use std::pin::Pin;
15use std::task::Context;
16
17/// A Unix socket which can accept connections from other Unix sockets.
18///
19/// # Examples
20///
21/// ```no_run
22/// use romio::uds::{UnixListener, UnixStream};
23/// use futures::prelude::*;
24///
25/// async fn say_hello(mut stream: UnixStream) {
26///     stream.write_all(b"Shall I hear more, or shall I speak at this?!").await;
27/// }
28///
29/// async fn listen() -> Result<(), Box<dyn std::error::Error + 'static>> {
30///     let listener = UnixListener::bind("/tmp/sock")?;
31///     let mut incoming = listener.incoming();
32///
33///     // accept connections and process them serially
34///     while let Some(stream) = incoming.next().await {
35///         say_hello(stream?).await;
36///     }
37///     Ok(())
38/// }
39/// ```
40pub struct UnixListener {
41    io: PollEvented<mio_uds::UnixListener>,
42}
43
44impl UnixListener {
45    /// Creates a new `UnixListener` bound to the specified path.
46    ///
47    /// # Examples
48    /// Create a Unix Domain Socket on `/tmp/sock`.
49    ///
50    /// ```rust,no_run
51    /// use romio::uds::UnixListener;
52    ///
53    /// # fn main () -> Result<(), Box<dyn std::error::Error + 'static>> {
54    /// let socket = UnixListener::bind("/tmp/sock")?;
55    /// # Ok(())}
56    /// ```
57    ///
58    pub fn bind(path: impl AsRef<Path>) -> io::Result<UnixListener> {
59        let listener = mio_uds::UnixListener::bind(path)?;
60        let io = PollEvented::new(listener);
61        Ok(UnixListener { io })
62    }
63
64    /// Returns the local socket address of this listener.
65    ///
66    /// # Examples
67    ///
68    /// ```rust,no_run
69    /// use romio::uds::UnixListener;
70    ///
71    /// # fn main () -> Result<(), Box<dyn std::error::Error + 'static>> {
72    /// let socket = UnixListener::bind("/tmp/sock")?;
73    /// let addr = socket.local_addr()?;
74    /// # Ok(())}
75    /// ```
76    pub fn local_addr(&self) -> io::Result<SocketAddr> {
77        self.io.get_ref().local_addr()
78    }
79
80    /// Consumes this listener, returning a stream of the sockets this listener
81    /// accepts.
82    ///
83    /// This method returns an implementation of the `Stream` trait which
84    /// resolves to the sockets the are accepted on this listener.
85    ///
86    ///
87    /// # Examples
88    ///
89    /// ```rust,no_run
90    /// use romio::uds::UnixListener;
91    /// use futures::prelude::*;
92    ///
93    /// # async fn run () -> Result<(), Box<dyn std::error::Error + 'static>> {
94    /// let listener = UnixListener::bind("/tmp/sock")?;
95    /// let mut incoming = listener.incoming();
96    ///
97    /// // accept connections and process them serially
98    /// while let Some(stream) = incoming.next().await {
99    ///     match stream {
100    ///         Ok(stream) => {
101    ///             println!("new client!");
102    ///         },
103    ///         Err(e) => { /* connection failed */ }
104    ///     }
105    /// }
106    /// # Ok(())}
107    /// ```
108    pub fn incoming(self) -> Incoming {
109        Incoming::new(self)
110    }
111
112    fn poll_accept_std(
113        mut self: Pin<&mut Self>,
114        cx: &mut Context<'_>,
115    ) -> Poll<io::Result<(net::UnixStream, SocketAddr)>> {
116        ready!(Pin::new(&mut self.io).poll_read_ready(cx)?);
117
118        match Pin::new(&mut self.io).get_ref().accept_std() {
119            Ok(Some((sock, addr))) => Poll::Ready(Ok((sock, addr))),
120            Ok(None) => {
121                Pin::new(&mut self.io).clear_read_ready(cx)?;
122                Poll::Pending
123            }
124            Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
125                Pin::new(&mut self.io).clear_read_ready(cx)?;
126                Poll::Pending
127            }
128            Err(err) => Poll::Ready(Err(err)),
129        }
130    }
131}
132
133impl AsyncReady for UnixListener {
134    type Ok = (UnixStream, SocketAddr);
135    type Err = std::io::Error;
136
137    /// Check if the stream can be read from.
138    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Self::Ok, Self::Err>> {
139        let (io, addr) = ready!(self.poll_accept_std(cx)?);
140        let io = mio_uds::UnixStream::from_stream(io)?;
141        Poll::Ready(Ok((UnixStream::new(io), addr)))
142    }
143}
144
145impl TakeError for UnixListener {
146    type Ok = io::Error;
147    type Err = io::Error;
148
149    /// Returns the value of the `SO_ERROR` option.
150    ///
151    /// # Examples
152    ///
153    /// ```rust,no_run
154    /// use romio::uds::UnixListener;
155    /// use romio::raw::TakeError;
156    ///
157    /// # fn main () -> Result<(), Box<dyn std::error::Error + 'static>> {
158    /// let listener = UnixListener::bind("/tmp/sock")?;
159    /// if let Ok(Some(err)) = listener.take_error() {
160    ///     println!("Got error: {:?}", err);
161    /// }
162    /// # Ok(())}
163    /// ```
164    fn take_error(&self) -> Result<Option<Self::Ok>, Self::Err> {
165        self.io.get_ref().take_error()
166    }
167}
168
169impl fmt::Debug for UnixListener {
170    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
171        self.io.get_ref().fmt(f)
172    }
173}
174
175impl AsRawFd for UnixListener {
176    fn as_raw_fd(&self) -> RawFd {
177        self.io.get_ref().as_raw_fd()
178    }
179}
180
181/// Stream of listeners
182#[derive(Debug)]
183pub struct Incoming {
184    inner: UnixListener,
185}
186
187impl Incoming {
188    pub(crate) fn new(listener: UnixListener) -> Incoming {
189        Incoming { inner: listener }
190    }
191}
192
193impl Stream for Incoming {
194    type Item = io::Result<UnixStream>;
195
196    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
197        let (socket, _) = ready!(Pin::new(&mut self.inner).poll_ready(cx)?);
198        Poll::Ready(Some(Ok(socket)))
199    }
200}