fd_queue/
tokio.rs

1// Copyright 2020 Steven Bosnick
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE-2.0 or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms
8
9//! An implementation of [`EnqueueFd`] and [`DequeueFd`] that is integrated with tokio.
10
11use std::{
12    convert::TryFrom,
13    io::{ErrorKind, IoSlice, IoSliceMut},
14    net::Shutdown,
15    os::unix::{
16        io::{AsRawFd, RawFd},
17        net::{SocketAddr, UnixStream as StdUnixStream},
18    },
19    path::Path,
20    pin::Pin,
21    task::{Context, Poll},
22};
23
24use futures_core::stream::Stream;
25use futures_util::ready;
26use pin_project::pin_project;
27
28use tokio::{
29    io::{self, AsyncRead, AsyncWrite, Interest, ReadBuf},
30    net::{
31        unix::SocketAddr as TokioSocketAddr, UnixListener as TokioUnixListener,
32        UnixStream as TokioUnixStream,
33    },
34};
35
36use crate::{biqueue::BiQueue, DequeueFd, EnqueueFd, QueueFullError};
37
38/// A structure representing a connected Unix socket with support for passing
39/// [`RawFd`].
40///
41/// This is the implementation of [`EnqueueFd`] and [`DequeueFd`] that is based
42/// on `tokio` [`UnixStream`][TokioUnixStream]. Conceptually the key interfaces
43/// on `UnixStream` interact as shown in the following diagram:
44///
45/// ```text
46/// EnqueueFd => AsyncWrite => AsyncRead => DequeueFd
47/// ```
48///
49/// That is, you first enqueue a [`RawFd`] to the `UnixStream` and then
50/// [`AsyncWrite`] at least one byte. On the other side  of the `UnixStream` you
51/// then [`AsyncRead`] at least one byte and then dequeue the [`RawFd`].
52///
53/// This socket can be connected directly with [`UnixStream::connect`] or accepted
54/// from a listener with [`UnixListener::accept`]. Additionally, a pair of
55/// anonymous Unix sockets can be created with [`UnixStream::pair`].
56///
57/// # Examples
58///
59/// ```
60/// # use fd_queue::{EnqueueFd, DequeueFd, tokio::UnixStream};
61/// # use std::io::prelude::*;
62/// # use std::os::unix::io::FromRawFd;
63/// # use tempfile::tempfile;
64/// use tokio::io::{AsyncReadExt, AsyncWriteExt};
65/// use std::fs::File;
66///
67/// # tokio_test::block_on(async {
68/// #
69/// let (mut sock1, mut sock2) = UnixStream::pair()?;
70///
71/// // sender side
72/// # let file1: File = tempfile()?;
73/// // let file1: File = ...
74/// sock1.enqueue(&file1).expect("Can't enqueue the file descriptor.");
75/// sock1.write(b"a").await?;
76/// sock1.flush().await?;
77///
78/// // receiver side
79/// let mut buf = [0u8; 1];
80/// sock2.read(&mut buf).await?;
81/// let fd = sock2.dequeue().expect("Can't dequeue the file descriptor.");
82/// let file2 = unsafe { File::from_raw_fd(fd) };
83/// #
84/// # Ok::<(), std::io::Error>(())
85/// #
86/// # });
87/// # Ok::<(), std::io::Error>(())
88/// ```
89#[pin_project]
90#[derive(Debug)]
91pub struct UnixStream {
92    #[pin]
93    inner: TokioUnixStream,
94    biqueue: BiQueue,
95}
96
97/// A Unix socket which can accept connections from other Unix sockets.
98///
99/// You can accept a new connection by using the accept method. Alternatively
100/// UnixListener implements the Stream trait, which allows you to use the
101/// listener in places that want a stream. The stream will never return None and
102/// will also not yield the peer's SocketAddr structure. Iterating over it is
103/// equivalent to calling accept in a loop.
104///
105/// # Examples
106///
107/// ```
108/// # use tempfile::tempdir;
109/// use fd_queue::tokio::{UnixStream, UnixListener};
110/// use futures_util::stream::StreamExt;
111/// use tokio::io::{AsyncReadExt, AsyncWriteExt};
112///
113/// # tokio_test::block_on(async {
114/// # let dir = tempdir()?;
115/// # let path = dir.path().join("mysock");
116/// // let path: Path = ...
117/// let mut listener = UnixListener::bind(&path)?;
118///
119/// tokio::spawn(async move {
120///     let mut sock1 = UnixStream::connect(path).await?;
121///     sock1.write(b"Hello World!").await?;
122/// #   Ok::<(), std::io::Error>(())
123/// });
124///
125/// let mut sock2 = listener.next().await.expect("Listener stream unexpectedly empty")?;
126///
127/// let mut buf = [0u8; 256];
128/// sock2.read(&mut buf).await?;
129///
130/// assert!(buf.starts_with(b"Hello World!"));
131/// #
132/// # Ok::<(), std::io::Error>(())
133/// # });
134#[derive(Debug)]
135pub struct UnixListener {
136    inner: TokioUnixListener,
137}
138
139// === impl UnixStream ===
140
141impl UnixStream {
142    /// Connects to the socket named by path.
143    ///
144    /// This function will create a new socket and connect the the path specified,
145    /// associating the returned stream with the default event loop's handle.
146    ///
147    /// # Examples
148    ///
149    /// ```
150    /// # use tempfile::tempdir;
151    /// # use fd_queue::tokio::UnixListener;
152    /// use fd_queue::tokio::UnixStream;
153    /// # tokio_test::block_on(async {
154    /// # let dir = tempdir()?;
155    /// # let path = dir.path().join("mysock");
156    /// // let path: Path = ...
157    /// # let mut listener = UnixListener::bind(&path)?;
158    /// # tokio::spawn(async move { listener.accept().await.expect("Can't accept")});
159    ///
160    /// UnixStream::connect(path).await?;
161    /// #
162    /// # Ok::<(), std::io::Error>(())
163    /// # });
164    /// ```
165    pub async fn connect(path: impl AsRef<Path>) -> io::Result<UnixStream> {
166        TokioUnixStream::connect(path).await.map(|s| s.into())
167    }
168
169    /// Creates an unnamed pair of connected sockets.
170    ///
171    /// This function will create an unnamed pair of interconnected Unix sockets for
172    /// communicating back and forth between one another. Each socket will be
173    /// associated with the default event loop's handle.
174    ///
175    /// # Examples
176    ///
177    /// ```
178    /// use fd_queue::tokio::UnixStream;
179    ///
180    /// # tokio_test::block_on(async {
181    /// let (sock1, sock2) = UnixStream::pair()?;
182    /// # Ok::<(), std::io::Error>(())
183    /// # });
184    /// ```
185    pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
186        TokioUnixStream::pair().map(|(s1, s2)| (s1.into(), s2.into()))
187    }
188
189    /// Returns the socket address of the local half of this connection.
190    ///
191    /// # Examples
192    ///
193    /// ```
194    /// # use tempfile::tempdir;
195    /// # use fd_queue::tokio::UnixListener;
196    /// use fd_queue::tokio::UnixStream;
197    ///
198    /// # tokio_test::block_on(async {
199    /// # let dir = tempdir()?;
200    /// # let path = dir.path().join("mysock");
201    /// // let path: Path = ...
202    /// # let mut listener = UnixListener::bind(&path)?;
203    /// # tokio::spawn(async move { listener.accept().await.expect("Can't accept")});
204    ///
205    /// let sock = UnixStream::connect(path).await?;
206    ///
207    /// sock.local_addr()?;
208    /// #
209    /// # Ok::<(), std::io::Error>(())
210    /// # });
211    /// ```
212    pub fn local_addr(&self) -> io::Result<SocketAddr> {
213        to_addr(self.inner.local_addr()?)
214    }
215
216    /// Returns the socket address of the remote half of this connection.
217    ///
218    /// # Examples
219    ///
220    /// ```
221    /// # use tempfile::tempdir;
222    /// # use fd_queue::tokio::UnixListener;
223    /// use fd_queue::tokio::UnixStream;
224    ///
225    /// # tokio_test::block_on(async {
226    /// # let dir = tempdir()?;
227    /// # let path = dir.path().join("mysock");
228    /// // let path: Path = ...
229    /// # let mut listener = UnixListener::bind(&path)?;
230    /// # tokio::spawn(async move { listener.accept().await.expect("Can't accept")});
231    ///
232    /// let sock = UnixStream::connect(path).await?;
233    ///
234    /// sock.peer_addr()?;
235    /// #
236    /// # Ok::<(), std::io::Error>(())
237    /// # });
238    /// ```
239    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
240        to_addr(self.inner.peer_addr()?)
241    }
242
243    /// Returns the value of the SO_ERROR option.
244    ///
245    /// # Examples
246    ///
247    /// ```
248    /// # use tempfile::tempdir;
249    /// # use fd_queue::tokio::UnixListener;
250    /// use fd_queue::tokio::UnixStream;
251    ///
252    /// # tokio_test::block_on(async {
253    /// # let dir = tempdir()?;
254    /// # let path = dir.path().join("mysock");
255    /// // let path: Path = ...
256    /// # let mut listener = UnixListener::bind(&path)?;
257    /// # tokio::spawn(async move { listener.accept().await.expect("Can't accept")});
258    ///
259    /// let sock = UnixStream::connect(path).await?;
260    ///
261    /// let err = match sock.take_error() {
262    ///     Ok(Some(err)) => err,
263    ///     Ok(None) => {
264    ///         println!("No error found.");
265    ///         return Ok(());
266    ///     }
267    ///     Err(e) => {
268    ///         println!("Couldn't take the SO_ERROR option: {}", e);
269    ///         return Ok(());
270    ///     }
271    /// };
272    /// #
273    /// # Ok::<(), std::io::Error>(())
274    /// # });
275    /// ```
276    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
277        self.inner.take_error()
278    }
279
280    /// Shuts down the read, write, or both halves of this connection.
281    ///
282    /// This function will cause all pending and future I/O calls on the specified
283    /// portions to immediately return with an appropriate value (see the
284    /// documentation of `Shutdown`).
285    ///
286    /// # Examples
287    ///
288    /// ```
289    /// use std::net::Shutdown;
290    /// use tokio::io::AsyncReadExt;
291    /// use fd_queue::tokio::UnixStream;
292    ///
293    /// # tokio_test::block_on(async {
294    /// let (mut sock, _) = UnixStream::pair()?;
295    ///
296    /// sock.shutdown(Shutdown::Read)?;
297    ///
298    /// let mut buf = [0u8; 256];
299    /// match sock.read(&mut buf).await {
300    ///     Ok(0) => {},
301    ///     _ => panic!("Read unexpectedly not shut down."),
302    /// }
303    /// #
304    /// # Ok::<(), std::io::Error>(())
305    /// # });
306    /// ```
307    pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
308        shutdown(self, how)
309    }
310}
311
312impl EnqueueFd for UnixStream {
313    fn enqueue(&mut self, fd: &impl AsRawFd) -> Result<(), QueueFullError> {
314        self.biqueue.enqueue(fd)
315    }
316}
317
318impl DequeueFd for UnixStream {
319    fn dequeue(&mut self) -> Option<RawFd> {
320        self.biqueue.dequeue()
321    }
322}
323
324impl AsRawFd for UnixStream {
325    fn as_raw_fd(&self) -> RawFd {
326        self.inner.as_raw_fd()
327    }
328}
329
330impl AsyncRead for UnixStream {
331    fn poll_read(
332        self: Pin<&mut Self>,
333        cx: &mut Context,
334        buf: &mut ReadBuf,
335    ) -> Poll<io::Result<()>> {
336        let this = self.project();
337        let inner = this.inner;
338        let biqueue = this.biqueue;
339        let fd = inner.as_raw_fd();
340
341        loop {
342            ready!(inner.poll_read_ready(cx))?;
343
344            match inner.try_io(Interest::READABLE, || {
345                // TODO: find a way to handle uninitialized memory on buf
346                biqueue.read_vectored(fd, &mut [IoSliceMut::new(buf.initialize_unfilled())])
347            }) {
348                Ok(count) => {
349                    buf.advance(count);
350                    return Poll::Ready(Ok(()));
351                }
352                Err(e) if e.kind() == ErrorKind::WouldBlock => {}
353                Err(e) => return Poll::Ready(Err(e)),
354            }
355        }
356    }
357}
358
359impl AsyncWrite for UnixStream {
360    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
361        self.poll_write_vectored(cx, &[IoSlice::new(buf)])
362    }
363
364    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
365        self.project().inner.poll_flush(cx)
366    }
367
368    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
369        self.project().inner.poll_shutdown(cx)
370    }
371
372    fn poll_write_vectored(
373        self: Pin<&mut Self>,
374        cx: &mut Context<'_>,
375        bufs: &[std::io::IoSlice<'_>],
376    ) -> Poll<Result<usize, std::io::Error>> {
377        let this = self.project();
378        let inner = this.inner;
379        let biqueue = this.biqueue;
380        let fd = inner.as_raw_fd();
381
382        loop {
383            ready!(inner.poll_write_ready(cx))?;
384
385            match inner.try_io(Interest::WRITABLE, || biqueue.write_vectored(fd, bufs)) {
386                Ok(count) => return Poll::Ready(Ok(count)),
387                Err(e) if e.kind() == ErrorKind::WouldBlock => {}
388                Err(e) => return Poll::Ready(Err(e)),
389            }
390        }
391    }
392
393    fn is_write_vectored(&self) -> bool {
394        true
395    }
396}
397
398impl From<TokioUnixStream> for UnixStream {
399    fn from(inner: TokioUnixStream) -> UnixStream {
400        UnixStream {
401            inner,
402            biqueue: BiQueue::new(),
403        }
404    }
405}
406
407impl TryFrom<StdUnixStream> for UnixStream {
408    type Error = io::Error;
409
410    fn try_from(inner: StdUnixStream) -> Result<Self, Self::Error> {
411        inner.set_nonblocking(true)?;
412        TokioUnixStream::from_std(inner).map(|stream| stream.into())
413    }
414}
415
416// === impl UnixListener ===
417
418impl UnixListener {
419    /// Creates a new UnixListener bound to the specified path.
420    ///
421    /// This function will bind a UnixListener to the specified path and associate it
422    /// with the default event loop's handler.
423    ///
424    /// # Panics
425    ///
426    /// This function panics if thread-local runtime is not set.
427    ///
428    /// The runtime is usually set implicitly when this function is called from a
429    /// future driven by a tokio runtime, otherwise runtime can be set explicitly
430    /// with `Handle::enter` function.
431    ///
432    /// # Examples
433    ///
434    /// ```
435    /// # use tempfile::tempdir;
436    /// use fd_queue::tokio::UnixListener;
437    ///
438    /// # tokio_test::block_on(async {
439    /// # let dir = tempdir()?;
440    /// # let path = dir.path().join("mysock");
441    /// // let path: Path = ...
442    /// let listener = UnixListener::bind(&path)?;
443    /// #
444    /// # Ok::<(), std::io::Error>(())
445    /// # });
446    pub fn bind(path: impl AsRef<Path>) -> io::Result<UnixListener> {
447        TokioUnixListener::bind(path).map(|l| l.into())
448    }
449
450    /// Returns the local socket address of this listener.
451    ///
452    /// # Examples
453    ///
454    /// ```
455    /// # use tempfile::tempdir;
456    /// use fd_queue::tokio::UnixListener;
457    ///
458    /// # tokio_test::block_on(async {
459    /// # let dir = tempdir()?;
460    /// # let path = dir.path().join("mysock");
461    /// // let path: Path = ...
462    /// let listener = UnixListener::bind(&path)?;
463    ///
464    /// let addr = listener.local_addr()?;
465    ///
466    /// match addr.as_pathname() {
467    ///     Some(path) => println!("The local address is {}.", path.display()),
468    ///     None => println!("The local address does not have a pathname"),
469    /// }
470    /// #
471    /// # Ok::<(), std::io::Error>(())
472    /// # });
473    pub fn local_addr(&self) -> io::Result<SocketAddr> {
474        to_addr(self.inner.local_addr()?)
475    }
476
477    /// Returns the value of the `SO_ERROR` option.
478    ///
479    /// # Examples
480    ///
481    /// ```
482    /// # use tempfile::tempdir;
483    /// use fd_queue::tokio::UnixListener;
484    ///
485    /// # tokio_test::block_on(async {
486    /// # let dir = tempdir()?;
487    /// # let path = dir.path().join("mysock");
488    /// // let path: Path = ...
489    /// let listener = UnixListener::bind(&path)?;
490    ///
491    /// let so_error = listener.take_error()?;
492    ///
493    /// match so_error {
494    ///     Some(err) => println!("The SO_ERROR was {}.", err),
495    ///     None => println!("There was no SO_ERROR."),
496    /// }
497    /// #
498    /// # Ok::<(), std::io::Error>(())
499    /// # });
500    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
501        self.inner.take_error()
502    }
503
504    /// Accepts a new incoming connection on this listener.
505    ///
506    /// # Panics
507    ///
508    /// This function panics if thread-local runtime is not set.
509    ///
510    /// The runtime is usually set implicitly when this function is called from a
511    /// future driven by a tokio runtime, otherwise runtime can be set explicitly
512    /// with `Handle::enter` function.
513    ///
514    /// # Examples
515    ///
516    /// ```
517    /// # use tempfile::tempdir;
518    /// # use fd_queue::tokio::UnixStream;
519    /// use fd_queue::tokio::UnixListener;
520    ///
521    /// # tokio_test::block_on(async {
522    /// # let dir = tempdir()?;
523    /// # let path = dir.path().join("mysock");
524    /// // let path: Path = ...
525    /// let mut listener = UnixListener::bind(&path)?;
526    /// # tokio::spawn(async move { UnixStream::connect(path).await });
527    ///
528    /// let (sock, addr) = listener.accept().await?;
529    /// #
530    /// # Ok::<(), std::io::Error>(())
531    /// # });
532    pub async fn accept(&mut self) -> io::Result<(UnixStream, SocketAddr)> {
533        self.inner
534            .accept()
535            .await
536            .and_then(|(stream, addr)| to_addr(addr).map(|addr| (stream.into(), addr)))
537    }
538
539    fn poll_accept(&self, cx: &mut Context) -> Poll<io::Result<(UnixStream, SocketAddr)>> {
540        self.inner.poll_accept(cx).map(|result| {
541            result.and_then(|(stream, addr)| to_addr(addr).map(|addr| (stream.into(), addr)))
542        })
543    }
544}
545
546impl AsRawFd for UnixListener {
547    fn as_raw_fd(&self) -> RawFd {
548        self.inner.as_raw_fd()
549    }
550}
551
552/// Produces a continuous stream of accepted connections.
553///
554/// This is the equivalent of calling `accept()` in a loop. It will never be ready
555/// with `None`.
556///
557/// # Panics
558///
559/// Polling the stream panics if thread-local runtime is not set.
560///
561/// The runtime is usually set implicitly when this function is called from a
562/// future driven by a tokio runtime, otherwise runtime can be set explicitly
563/// with `Handle::enter` function.
564///
565/// # Examples
566///
567/// ```
568/// # use tempfile::tempdir;
569/// # use fd_queue::tokio::UnixStream;
570/// use fd_queue::tokio::UnixListener;
571/// use futures_util::stream::StreamExt;
572///
573/// # tokio_test::block_on(async {
574/// # let dir = tempdir()?;
575/// # let path = dir.path().join("mysock");
576/// // let path: Path = ...
577/// let mut listener = UnixListener::bind(&path)?;
578/// # tokio::spawn(async move { UnixStream::connect(path).await });
579///
580/// let sock = listener.next().await.expect("Listener stream unexpectedly empty");
581/// #
582/// # Ok::<(), std::io::Error>(())
583/// # });
584impl Stream for UnixListener {
585    type Item = io::Result<UnixStream>;
586
587    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
588        use Poll::{Pending, Ready};
589
590        match self.poll_accept(cx) {
591            Pending => Pending,
592            Ready(Ok((stream, _))) => Ready(Some(Ok(stream))),
593            Ready(Err(err)) => Ready(Some(Err(err))),
594        }
595    }
596}
597
598impl From<TokioUnixListener> for UnixListener {
599    fn from(inner: TokioUnixListener) -> UnixListener {
600        UnixListener { inner }
601    }
602}
603
604// === utility functions ===
605
606fn to_addr(addr: TokioSocketAddr) -> io::Result<SocketAddr> {
607    addr.as_pathname()
608        .map_or(SocketAddr::from_pathname(""), |path| {
609            SocketAddr::from_pathname(path)
610        })
611}
612
613fn shutdown(socket: &impl AsRawFd, how: Shutdown) -> io::Result<()> {
614    let how = match how {
615        Shutdown::Write => libc::SHUT_WR,
616        Shutdown::Read => libc::SHUT_RD,
617        Shutdown::Both => libc::SHUT_RDWR,
618    };
619    // Safety: complies with the FFI documentation and the system call
620    // check the validity of the parameters.
621    let code = unsafe { libc::shutdown(socket.as_raw_fd(), how) };
622    if code == -1 {
623        Err(io::Error::last_os_error())
624    } else {
625        Ok(())
626    }
627}
628
629#[cfg(test)]
630mod tests {
631    use super::*;
632
633    use std::fs::File;
634    use std::io::{prelude::*, SeekFrom};
635    use std::os::unix::io::FromRawFd as _;
636
637    use tempfile::{tempdir, tempfile};
638    use tokio::io::{AsyncReadExt, AsyncWriteExt};
639
640    #[tokio::test]
641    async fn unix_stream_reads_other_sides_writes() {
642        let mut buf: [u8; 12] = [0; 12];
643
644        let (mut sut, mut other) = UnixStream::pair().expect("Can't create UnixStream's");
645        tokio::spawn(async move {
646            other
647                .write_all(b"Hello World!".as_ref())
648                .await
649                .expect("Can't write to UnixStream");
650        });
651        sut.read_exact(buf.as_mut())
652            .await
653            .expect("Can't read from UnixStream");
654
655        assert_eq!(&buf, b"Hello World!");
656    }
657
658    #[tokio::test]
659    async fn unix_stream_passes_fd() {
660        let mut file1 = tempfile().expect("Can't create temp file.");
661        file1
662            .write_all(b"Hello World!\0")
663            .expect("Can't write to temp file.");
664        file1.flush().expect("Can't flush temp file.");
665        file1
666            .seek(SeekFrom::Start(0))
667            .expect("Couldn't seek the file.");
668        let mut buf = [0u8];
669
670        let (mut sut, mut other) = UnixStream::pair().expect("Can't create UnixStream's");
671        tokio::spawn(async move {
672            other.enqueue(&file1).expect("Can't enqueue fd.");
673            other
674                .write_all(b"1".as_ref())
675                .await
676                .expect("Can't write to UnixStream");
677        });
678        sut.read_exact(buf.as_mut())
679            .await
680            .expect("Can't read from UnixStream");
681        let fd = sut.dequeue().expect("Can't dequeue fd");
682
683        let mut file2 = unsafe { File::from_raw_fd(fd) };
684        let mut buf2 = Vec::new();
685        file2.read_to_end(&mut buf2).expect("Can't read from file");
686        assert_eq!(&buf2[..], b"Hello World!\0".as_ref());
687    }
688
689    #[tokio::test]
690    async fn unix_stream_connects_to_listner() {
691        let dir = tempdir().expect("Can't create temp dir");
692        let sock_addr = dir.as_ref().join("socket");
693        let mut buf: [u8; 12] = [0; 12];
694
695        let mut listener = UnixListener::bind(&sock_addr).expect("Can't bind listener");
696        tokio::spawn(async move {
697            let mut client = UnixStream::connect(sock_addr)
698                .await
699                .expect("Can't connect to listener");
700            client
701                .write_all(b"Hello World!".as_ref())
702                .await
703                .expect("Can't write to client");
704        });
705        let (mut server, _) = listener.accept().await.expect("Can't accept on listener");
706        server
707            .read_exact(buf.as_mut())
708            .await
709            .expect("Can't read from server");
710
711        assert_eq!(&buf, b"Hello World!");
712    }
713}