async_std/net/tcp/listener.rs
1use std::future::Future;
2use std::net::SocketAddr;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use crate::future;
7use crate::io;
8use crate::net::driver::Watcher;
9use crate::net::{TcpStream, ToSocketAddrs};
10use crate::stream::Stream;
11use crate::task::{Context, Poll};
12
13/// A TCP socket server, listening for connections.
14///
15/// After creating a `TcpListener` by [`bind`]ing it to a socket address, it listens for incoming
16/// TCP connections. These can be accepted by awaiting elements from the async stream of
17/// [`incoming`] connections.
18///
19/// The socket will be closed when the value is dropped.
20///
21/// The Transmission Control Protocol is specified in [IETF RFC 793].
22///
23/// This type is an async version of [`std::net::TcpListener`].
24///
25/// [`bind`]: #method.bind
26/// [`incoming`]: #method.incoming
27/// [IETF RFC 793]: https://tools.ietf.org/html/rfc793
28/// [`std::net::TcpListener`]: https://doc.rust-lang.org/std/net/struct.TcpListener.html
29///
30/// # Examples
31///
32/// ```no_run
33/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
34/// #
35/// use async_std::io;
36/// use async_std::net::TcpListener;
37/// use async_std::prelude::*;
38///
39/// let listener = TcpListener::bind("127.0.0.1:8080").await?;
40/// let mut incoming = listener.incoming();
41///
42/// while let Some(stream) = incoming.next().await {
43/// let stream = stream?;
44/// let (reader, writer) = &mut (&stream, &stream);
45/// io::copy(reader, writer).await?;
46/// }
47/// #
48/// # Ok(()) }) }
49/// ```
50#[derive(Debug)]
51pub struct TcpListener {
52 watcher: Watcher<mio::net::TcpListener>,
53}
54
55impl TcpListener {
56 /// Creates a new `TcpListener` which will be bound to the specified address.
57 ///
58 /// The returned listener is ready for accepting connections.
59 ///
60 /// Binding with a port number of 0 will request that the OS assigns a port to this listener.
61 /// The port allocated can be queried via the [`local_addr`] method.
62 ///
63 /// # Examples
64 /// Create a TCP listener bound to 127.0.0.1:0:
65 ///
66 /// ```no_run
67 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
68 /// #
69 /// use async_std::net::TcpListener;
70 ///
71 /// let listener = TcpListener::bind("127.0.0.1:0").await?;
72 /// #
73 /// # Ok(()) }) }
74 /// ```
75 ///
76 /// [`local_addr`]: #method.local_addr
77 pub async fn bind<A: ToSocketAddrs>(addrs: A) -> io::Result<TcpListener> {
78 let mut last_err = None;
79 let addrs = addrs.to_socket_addrs().await?;
80
81 for addr in addrs {
82 match mio::net::TcpListener::bind(&addr) {
83 Ok(mio_listener) => {
84 return Ok(TcpListener {
85 watcher: Watcher::new(mio_listener),
86 });
87 }
88 Err(err) => last_err = Some(err),
89 }
90 }
91
92 Err(last_err.unwrap_or_else(|| {
93 io::Error::new(
94 io::ErrorKind::InvalidInput,
95 "could not resolve to any addresses",
96 )
97 }))
98 }
99
100 /// Accepts a new incoming connection to this listener.
101 ///
102 /// When a connection is established, the corresponding stream and address will be returned.
103 ///
104 /// ## Examples
105 ///
106 /// ```no_run
107 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
108 /// #
109 /// use async_std::net::TcpListener;
110 ///
111 /// let listener = TcpListener::bind("127.0.0.1:0").await?;
112 /// let (stream, addr) = listener.accept().await?;
113 /// #
114 /// # Ok(()) }) }
115 /// ```
116 pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
117 let (io, addr) =
118 future::poll_fn(|cx| self.watcher.poll_read_with(cx, |inner| inner.accept_std()))
119 .await?;
120
121 let mio_stream = mio::net::TcpStream::from_stream(io)?;
122 let stream = TcpStream {
123 watcher: Arc::new(Watcher::new(mio_stream)),
124 };
125 Ok((stream, addr))
126 }
127
128 /// Returns a stream of incoming connections.
129 ///
130 /// Iterating over this stream is equivalent to calling [`accept`] in a loop. The stream of
131 /// connections is infinite, i.e awaiting the next connection will never result in [`None`].
132 ///
133 /// [`accept`]: #method.accept
134 /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
135 ///
136 /// ## Examples
137 ///
138 /// ```no_run
139 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
140 /// #
141 /// use async_std::net::TcpListener;
142 /// use async_std::prelude::*;
143 ///
144 /// let listener = TcpListener::bind("127.0.0.1:0").await?;
145 /// let mut incoming = listener.incoming();
146 ///
147 /// while let Some(stream) = incoming.next().await {
148 /// let mut stream = stream?;
149 /// stream.write_all(b"hello world").await?;
150 /// }
151 /// #
152 /// # Ok(()) }) }
153 /// ```
154 pub fn incoming(&self) -> Incoming<'_> {
155 Incoming(self)
156 }
157
158 /// Returns the local address that this listener is bound to.
159 ///
160 /// This can be useful, for example, to identify when binding to port 0 which port was assigned
161 /// by the OS.
162 ///
163 /// # Examples
164 ///
165 /// ```no_run
166 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
167 /// #
168 /// use async_std::net::TcpListener;
169 ///
170 /// let listener = TcpListener::bind("127.0.0.1:8080").await?;
171 /// let addr = listener.local_addr()?;
172 /// #
173 /// # Ok(()) }) }
174 /// ```
175 pub fn local_addr(&self) -> io::Result<SocketAddr> {
176 self.watcher.get_ref().local_addr()
177 }
178}
179
180/// A stream of incoming TCP connections.
181///
182/// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is
183/// created by the [`incoming`] method on [`TcpListener`].
184///
185/// This type is an async version of [`std::net::Incoming`].
186///
187/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
188/// [`incoming`]: struct.TcpListener.html#method.incoming
189/// [`TcpListener`]: struct.TcpListener.html
190/// [`std::net::Incoming`]: https://doc.rust-lang.org/std/net/struct.Incoming.html
191#[derive(Debug)]
192pub struct Incoming<'a>(&'a TcpListener);
193
194impl<'a> Stream for Incoming<'a> {
195 type Item = io::Result<TcpStream>;
196
197 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
198 let future = self.0.accept();
199 pin_utils::pin_mut!(future);
200
201 let (socket, _) = futures_core::ready!(future.poll(cx))?;
202 Poll::Ready(Some(Ok(socket)))
203 }
204}
205
206impl From<std::net::TcpListener> for TcpListener {
207 /// Converts a `std::net::TcpListener` into its asynchronous equivalent.
208 fn from(listener: std::net::TcpListener) -> TcpListener {
209 let mio_listener = mio::net::TcpListener::from_std(listener).unwrap();
210 TcpListener {
211 watcher: Watcher::new(mio_listener),
212 }
213 }
214}
215
216cfg_unix! {
217 use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
218
219 impl AsRawFd for TcpListener {
220 fn as_raw_fd(&self) -> RawFd {
221 self.watcher.get_ref().as_raw_fd()
222 }
223 }
224
225 impl FromRawFd for TcpListener {
226 unsafe fn from_raw_fd(fd: RawFd) -> TcpListener {
227 std::net::TcpListener::from_raw_fd(fd).into()
228 }
229 }
230
231 impl IntoRawFd for TcpListener {
232 fn into_raw_fd(self) -> RawFd {
233 self.watcher.into_inner().into_raw_fd()
234 }
235 }
236}
237
238cfg_windows! {
239 // use crate::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
240 //
241 // impl AsRawSocket for TcpListener {
242 // fn as_raw_socket(&self) -> RawSocket {
243 // self.raw_socket
244 // }
245 // }
246 //
247 // impl FromRawSocket for TcpListener {
248 // unsafe fn from_raw_socket(handle: RawSocket) -> TcpListener {
249 // net::TcpListener::from_raw_socket(handle).try_into().unwrap()
250 // }
251 // }
252 //
253 // impl IntoRawSocket for TcpListener {
254 // fn into_raw_socket(self) -> RawSocket {
255 // self.raw_socket
256 // }
257 // }
258}