romio/uds/listener.rs
1use super::UnixStream;
2
3use crate::raw::PollEvented;
4
5use async_ready::{AsyncReady, TakeError};
6use futures::{ready, Poll, Stream};
7use mio_uds;
8
9use std::fmt;
10use std::io;
11use std::os::unix::io::{AsRawFd, RawFd};
12use std::os::unix::net::{self, SocketAddr};
13use std::path::Path;
14use std::pin::Pin;
15use std::task::Context;
16
17/// A Unix socket which can accept connections from other Unix sockets.
18///
19/// # Examples
20///
21/// ```no_run
22/// use romio::uds::{UnixListener, UnixStream};
23/// use futures::prelude::*;
24///
25/// async fn say_hello(mut stream: UnixStream) {
26/// stream.write_all(b"Shall I hear more, or shall I speak at this?!").await;
27/// }
28///
29/// async fn listen() -> Result<(), Box<dyn std::error::Error + 'static>> {
30/// let listener = UnixListener::bind("/tmp/sock")?;
31/// let mut incoming = listener.incoming();
32///
33/// // accept connections and process them serially
34/// while let Some(stream) = incoming.next().await {
35/// say_hello(stream?).await;
36/// }
37/// Ok(())
38/// }
39/// ```
40pub struct UnixListener {
41 io: PollEvented<mio_uds::UnixListener>,
42}
43
44impl UnixListener {
45 /// Creates a new `UnixListener` bound to the specified path.
46 ///
47 /// # Examples
48 /// Create a Unix Domain Socket on `/tmp/sock`.
49 ///
50 /// ```rust,no_run
51 /// use romio::uds::UnixListener;
52 ///
53 /// # fn main () -> Result<(), Box<dyn std::error::Error + 'static>> {
54 /// let socket = UnixListener::bind("/tmp/sock")?;
55 /// # Ok(())}
56 /// ```
57 ///
58 pub fn bind(path: impl AsRef<Path>) -> io::Result<UnixListener> {
59 let listener = mio_uds::UnixListener::bind(path)?;
60 let io = PollEvented::new(listener);
61 Ok(UnixListener { io })
62 }
63
64 /// Returns the local socket address of this listener.
65 ///
66 /// # Examples
67 ///
68 /// ```rust,no_run
69 /// use romio::uds::UnixListener;
70 ///
71 /// # fn main () -> Result<(), Box<dyn std::error::Error + 'static>> {
72 /// let socket = UnixListener::bind("/tmp/sock")?;
73 /// let addr = socket.local_addr()?;
74 /// # Ok(())}
75 /// ```
76 pub fn local_addr(&self) -> io::Result<SocketAddr> {
77 self.io.get_ref().local_addr()
78 }
79
80 /// Consumes this listener, returning a stream of the sockets this listener
81 /// accepts.
82 ///
83 /// This method returns an implementation of the `Stream` trait which
84 /// resolves to the sockets the are accepted on this listener.
85 ///
86 ///
87 /// # Examples
88 ///
89 /// ```rust,no_run
90 /// use romio::uds::UnixListener;
91 /// use futures::prelude::*;
92 ///
93 /// # async fn run () -> Result<(), Box<dyn std::error::Error + 'static>> {
94 /// let listener = UnixListener::bind("/tmp/sock")?;
95 /// let mut incoming = listener.incoming();
96 ///
97 /// // accept connections and process them serially
98 /// while let Some(stream) = incoming.next().await {
99 /// match stream {
100 /// Ok(stream) => {
101 /// println!("new client!");
102 /// },
103 /// Err(e) => { /* connection failed */ }
104 /// }
105 /// }
106 /// # Ok(())}
107 /// ```
108 pub fn incoming(self) -> Incoming {
109 Incoming::new(self)
110 }
111
112 fn poll_accept_std(
113 mut self: Pin<&mut Self>,
114 cx: &mut Context<'_>,
115 ) -> Poll<io::Result<(net::UnixStream, SocketAddr)>> {
116 ready!(Pin::new(&mut self.io).poll_read_ready(cx)?);
117
118 match Pin::new(&mut self.io).get_ref().accept_std() {
119 Ok(Some((sock, addr))) => Poll::Ready(Ok((sock, addr))),
120 Ok(None) => {
121 Pin::new(&mut self.io).clear_read_ready(cx)?;
122 Poll::Pending
123 }
124 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
125 Pin::new(&mut self.io).clear_read_ready(cx)?;
126 Poll::Pending
127 }
128 Err(err) => Poll::Ready(Err(err)),
129 }
130 }
131}
132
133impl AsyncReady for UnixListener {
134 type Ok = (UnixStream, SocketAddr);
135 type Err = std::io::Error;
136
137 /// Check if the stream can be read from.
138 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Self::Ok, Self::Err>> {
139 let (io, addr) = ready!(self.poll_accept_std(cx)?);
140 let io = mio_uds::UnixStream::from_stream(io)?;
141 Poll::Ready(Ok((UnixStream::new(io), addr)))
142 }
143}
144
145impl TakeError for UnixListener {
146 type Ok = io::Error;
147 type Err = io::Error;
148
149 /// Returns the value of the `SO_ERROR` option.
150 ///
151 /// # Examples
152 ///
153 /// ```rust,no_run
154 /// use romio::uds::UnixListener;
155 /// use romio::raw::TakeError;
156 ///
157 /// # fn main () -> Result<(), Box<dyn std::error::Error + 'static>> {
158 /// let listener = UnixListener::bind("/tmp/sock")?;
159 /// if let Ok(Some(err)) = listener.take_error() {
160 /// println!("Got error: {:?}", err);
161 /// }
162 /// # Ok(())}
163 /// ```
164 fn take_error(&self) -> Result<Option<Self::Ok>, Self::Err> {
165 self.io.get_ref().take_error()
166 }
167}
168
169impl fmt::Debug for UnixListener {
170 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
171 self.io.get_ref().fmt(f)
172 }
173}
174
175impl AsRawFd for UnixListener {
176 fn as_raw_fd(&self) -> RawFd {
177 self.io.get_ref().as_raw_fd()
178 }
179}
180
181/// Stream of listeners
182#[derive(Debug)]
183pub struct Incoming {
184 inner: UnixListener,
185}
186
187impl Incoming {
188 pub(crate) fn new(listener: UnixListener) -> Incoming {
189 Incoming { inner: listener }
190 }
191}
192
193impl Stream for Incoming {
194 type Item = io::Result<UnixStream>;
195
196 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
197 let (socket, _) = ready!(Pin::new(&mut self.inner).poll_ready(cx)?);
198 Poll::Ready(Some(Ok(socket)))
199 }
200}