fd_queue/tokio.rs
1// Copyright 2020 Steven Bosnick
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE-2.0 or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms
8
9//! An implementation of [`EnqueueFd`] and [`DequeueFd`] that is integrated with tokio.
10
11use std::{
12 convert::TryFrom,
13 io::{ErrorKind, IoSlice, IoSliceMut},
14 net::Shutdown,
15 os::unix::{
16 io::{AsRawFd, RawFd},
17 net::{SocketAddr, UnixStream as StdUnixStream},
18 },
19 path::Path,
20 pin::Pin,
21 task::{Context, Poll},
22};
23
24use futures_core::stream::Stream;
25use futures_util::ready;
26use pin_project::pin_project;
27
28use tokio::{
29 io::{self, AsyncRead, AsyncWrite, Interest, ReadBuf},
30 net::{
31 unix::SocketAddr as TokioSocketAddr, UnixListener as TokioUnixListener,
32 UnixStream as TokioUnixStream,
33 },
34};
35
36use crate::{biqueue::BiQueue, DequeueFd, EnqueueFd, QueueFullError};
37
38/// A structure representing a connected Unix socket with support for passing
39/// [`RawFd`].
40///
41/// This is the implementation of [`EnqueueFd`] and [`DequeueFd`] that is based
42/// on `tokio` [`UnixStream`][TokioUnixStream]. Conceptually the key interfaces
43/// on `UnixStream` interact as shown in the following diagram:
44///
45/// ```text
46/// EnqueueFd => AsyncWrite => AsyncRead => DequeueFd
47/// ```
48///
49/// That is, you first enqueue a [`RawFd`] to the `UnixStream` and then
50/// [`AsyncWrite`] at least one byte. On the other side of the `UnixStream` you
51/// then [`AsyncRead`] at least one byte and then dequeue the [`RawFd`].
52///
53/// This socket can be connected directly with [`UnixStream::connect`] or accepted
54/// from a listener with [`UnixListener::accept`]. Additionally, a pair of
55/// anonymous Unix sockets can be created with [`UnixStream::pair`].
56///
57/// # Examples
58///
59/// ```
60/// # use fd_queue::{EnqueueFd, DequeueFd, tokio::UnixStream};
61/// # use std::io::prelude::*;
62/// # use std::os::unix::io::FromRawFd;
63/// # use tempfile::tempfile;
64/// use tokio::io::{AsyncReadExt, AsyncWriteExt};
65/// use std::fs::File;
66///
67/// # tokio_test::block_on(async {
68/// #
69/// let (mut sock1, mut sock2) = UnixStream::pair()?;
70///
71/// // sender side
72/// # let file1: File = tempfile()?;
73/// // let file1: File = ...
74/// sock1.enqueue(&file1).expect("Can't enqueue the file descriptor.");
75/// sock1.write(b"a").await?;
76/// sock1.flush().await?;
77///
78/// // receiver side
79/// let mut buf = [0u8; 1];
80/// sock2.read(&mut buf).await?;
81/// let fd = sock2.dequeue().expect("Can't dequeue the file descriptor.");
82/// let file2 = unsafe { File::from_raw_fd(fd) };
83/// #
84/// # Ok::<(), std::io::Error>(())
85/// #
86/// # });
87/// # Ok::<(), std::io::Error>(())
88/// ```
89#[pin_project]
90#[derive(Debug)]
91pub struct UnixStream {
92 #[pin]
93 inner: TokioUnixStream,
94 biqueue: BiQueue,
95}
96
97/// A Unix socket which can accept connections from other Unix sockets.
98///
99/// You can accept a new connection by using the accept method. Alternatively
100/// UnixListener implements the Stream trait, which allows you to use the
101/// listener in places that want a stream. The stream will never return None and
102/// will also not yield the peer's SocketAddr structure. Iterating over it is
103/// equivalent to calling accept in a loop.
104///
105/// # Examples
106///
107/// ```
108/// # use tempfile::tempdir;
109/// use fd_queue::tokio::{UnixStream, UnixListener};
110/// use futures_util::stream::StreamExt;
111/// use tokio::io::{AsyncReadExt, AsyncWriteExt};
112///
113/// # tokio_test::block_on(async {
114/// # let dir = tempdir()?;
115/// # let path = dir.path().join("mysock");
116/// // let path: Path = ...
117/// let mut listener = UnixListener::bind(&path)?;
118///
119/// tokio::spawn(async move {
120/// let mut sock1 = UnixStream::connect(path).await?;
121/// sock1.write(b"Hello World!").await?;
122/// # Ok::<(), std::io::Error>(())
123/// });
124///
125/// let mut sock2 = listener.next().await.expect("Listener stream unexpectedly empty")?;
126///
127/// let mut buf = [0u8; 256];
128/// sock2.read(&mut buf).await?;
129///
130/// assert!(buf.starts_with(b"Hello World!"));
131/// #
132/// # Ok::<(), std::io::Error>(())
133/// # });
134#[derive(Debug)]
135pub struct UnixListener {
136 inner: TokioUnixListener,
137}
138
139// === impl UnixStream ===
140
141impl UnixStream {
142 /// Connects to the socket named by path.
143 ///
144 /// This function will create a new socket and connect the the path specified,
145 /// associating the returned stream with the default event loop's handle.
146 ///
147 /// # Examples
148 ///
149 /// ```
150 /// # use tempfile::tempdir;
151 /// # use fd_queue::tokio::UnixListener;
152 /// use fd_queue::tokio::UnixStream;
153 /// # tokio_test::block_on(async {
154 /// # let dir = tempdir()?;
155 /// # let path = dir.path().join("mysock");
156 /// // let path: Path = ...
157 /// # let mut listener = UnixListener::bind(&path)?;
158 /// # tokio::spawn(async move { listener.accept().await.expect("Can't accept")});
159 ///
160 /// UnixStream::connect(path).await?;
161 /// #
162 /// # Ok::<(), std::io::Error>(())
163 /// # });
164 /// ```
165 pub async fn connect(path: impl AsRef<Path>) -> io::Result<UnixStream> {
166 TokioUnixStream::connect(path).await.map(|s| s.into())
167 }
168
169 /// Creates an unnamed pair of connected sockets.
170 ///
171 /// This function will create an unnamed pair of interconnected Unix sockets for
172 /// communicating back and forth between one another. Each socket will be
173 /// associated with the default event loop's handle.
174 ///
175 /// # Examples
176 ///
177 /// ```
178 /// use fd_queue::tokio::UnixStream;
179 ///
180 /// # tokio_test::block_on(async {
181 /// let (sock1, sock2) = UnixStream::pair()?;
182 /// # Ok::<(), std::io::Error>(())
183 /// # });
184 /// ```
185 pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
186 TokioUnixStream::pair().map(|(s1, s2)| (s1.into(), s2.into()))
187 }
188
189 /// Returns the socket address of the local half of this connection.
190 ///
191 /// # Examples
192 ///
193 /// ```
194 /// # use tempfile::tempdir;
195 /// # use fd_queue::tokio::UnixListener;
196 /// use fd_queue::tokio::UnixStream;
197 ///
198 /// # tokio_test::block_on(async {
199 /// # let dir = tempdir()?;
200 /// # let path = dir.path().join("mysock");
201 /// // let path: Path = ...
202 /// # let mut listener = UnixListener::bind(&path)?;
203 /// # tokio::spawn(async move { listener.accept().await.expect("Can't accept")});
204 ///
205 /// let sock = UnixStream::connect(path).await?;
206 ///
207 /// sock.local_addr()?;
208 /// #
209 /// # Ok::<(), std::io::Error>(())
210 /// # });
211 /// ```
212 pub fn local_addr(&self) -> io::Result<SocketAddr> {
213 to_addr(self.inner.local_addr()?)
214 }
215
216 /// Returns the socket address of the remote half of this connection.
217 ///
218 /// # Examples
219 ///
220 /// ```
221 /// # use tempfile::tempdir;
222 /// # use fd_queue::tokio::UnixListener;
223 /// use fd_queue::tokio::UnixStream;
224 ///
225 /// # tokio_test::block_on(async {
226 /// # let dir = tempdir()?;
227 /// # let path = dir.path().join("mysock");
228 /// // let path: Path = ...
229 /// # let mut listener = UnixListener::bind(&path)?;
230 /// # tokio::spawn(async move { listener.accept().await.expect("Can't accept")});
231 ///
232 /// let sock = UnixStream::connect(path).await?;
233 ///
234 /// sock.peer_addr()?;
235 /// #
236 /// # Ok::<(), std::io::Error>(())
237 /// # });
238 /// ```
239 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
240 to_addr(self.inner.peer_addr()?)
241 }
242
243 /// Returns the value of the SO_ERROR option.
244 ///
245 /// # Examples
246 ///
247 /// ```
248 /// # use tempfile::tempdir;
249 /// # use fd_queue::tokio::UnixListener;
250 /// use fd_queue::tokio::UnixStream;
251 ///
252 /// # tokio_test::block_on(async {
253 /// # let dir = tempdir()?;
254 /// # let path = dir.path().join("mysock");
255 /// // let path: Path = ...
256 /// # let mut listener = UnixListener::bind(&path)?;
257 /// # tokio::spawn(async move { listener.accept().await.expect("Can't accept")});
258 ///
259 /// let sock = UnixStream::connect(path).await?;
260 ///
261 /// let err = match sock.take_error() {
262 /// Ok(Some(err)) => err,
263 /// Ok(None) => {
264 /// println!("No error found.");
265 /// return Ok(());
266 /// }
267 /// Err(e) => {
268 /// println!("Couldn't take the SO_ERROR option: {}", e);
269 /// return Ok(());
270 /// }
271 /// };
272 /// #
273 /// # Ok::<(), std::io::Error>(())
274 /// # });
275 /// ```
276 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
277 self.inner.take_error()
278 }
279
280 /// Shuts down the read, write, or both halves of this connection.
281 ///
282 /// This function will cause all pending and future I/O calls on the specified
283 /// portions to immediately return with an appropriate value (see the
284 /// documentation of `Shutdown`).
285 ///
286 /// # Examples
287 ///
288 /// ```
289 /// use std::net::Shutdown;
290 /// use tokio::io::AsyncReadExt;
291 /// use fd_queue::tokio::UnixStream;
292 ///
293 /// # tokio_test::block_on(async {
294 /// let (mut sock, _) = UnixStream::pair()?;
295 ///
296 /// sock.shutdown(Shutdown::Read)?;
297 ///
298 /// let mut buf = [0u8; 256];
299 /// match sock.read(&mut buf).await {
300 /// Ok(0) => {},
301 /// _ => panic!("Read unexpectedly not shut down."),
302 /// }
303 /// #
304 /// # Ok::<(), std::io::Error>(())
305 /// # });
306 /// ```
307 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
308 shutdown(self, how)
309 }
310}
311
312impl EnqueueFd for UnixStream {
313 fn enqueue(&mut self, fd: &impl AsRawFd) -> Result<(), QueueFullError> {
314 self.biqueue.enqueue(fd)
315 }
316}
317
318impl DequeueFd for UnixStream {
319 fn dequeue(&mut self) -> Option<RawFd> {
320 self.biqueue.dequeue()
321 }
322}
323
324impl AsRawFd for UnixStream {
325 fn as_raw_fd(&self) -> RawFd {
326 self.inner.as_raw_fd()
327 }
328}
329
330impl AsyncRead for UnixStream {
331 fn poll_read(
332 self: Pin<&mut Self>,
333 cx: &mut Context,
334 buf: &mut ReadBuf,
335 ) -> Poll<io::Result<()>> {
336 let this = self.project();
337 let inner = this.inner;
338 let biqueue = this.biqueue;
339 let fd = inner.as_raw_fd();
340
341 loop {
342 ready!(inner.poll_read_ready(cx))?;
343
344 match inner.try_io(Interest::READABLE, || {
345 // TODO: find a way to handle uninitialized memory on buf
346 biqueue.read_vectored(fd, &mut [IoSliceMut::new(buf.initialize_unfilled())])
347 }) {
348 Ok(count) => {
349 buf.advance(count);
350 return Poll::Ready(Ok(()));
351 }
352 Err(e) if e.kind() == ErrorKind::WouldBlock => {}
353 Err(e) => return Poll::Ready(Err(e)),
354 }
355 }
356 }
357}
358
359impl AsyncWrite for UnixStream {
360 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
361 self.poll_write_vectored(cx, &[IoSlice::new(buf)])
362 }
363
364 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
365 self.project().inner.poll_flush(cx)
366 }
367
368 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
369 self.project().inner.poll_shutdown(cx)
370 }
371
372 fn poll_write_vectored(
373 self: Pin<&mut Self>,
374 cx: &mut Context<'_>,
375 bufs: &[std::io::IoSlice<'_>],
376 ) -> Poll<Result<usize, std::io::Error>> {
377 let this = self.project();
378 let inner = this.inner;
379 let biqueue = this.biqueue;
380 let fd = inner.as_raw_fd();
381
382 loop {
383 ready!(inner.poll_write_ready(cx))?;
384
385 match inner.try_io(Interest::WRITABLE, || biqueue.write_vectored(fd, bufs)) {
386 Ok(count) => return Poll::Ready(Ok(count)),
387 Err(e) if e.kind() == ErrorKind::WouldBlock => {}
388 Err(e) => return Poll::Ready(Err(e)),
389 }
390 }
391 }
392
393 fn is_write_vectored(&self) -> bool {
394 true
395 }
396}
397
398impl From<TokioUnixStream> for UnixStream {
399 fn from(inner: TokioUnixStream) -> UnixStream {
400 UnixStream {
401 inner,
402 biqueue: BiQueue::new(),
403 }
404 }
405}
406
407impl TryFrom<StdUnixStream> for UnixStream {
408 type Error = io::Error;
409
410 fn try_from(inner: StdUnixStream) -> Result<Self, Self::Error> {
411 inner.set_nonblocking(true)?;
412 TokioUnixStream::from_std(inner).map(|stream| stream.into())
413 }
414}
415
416// === impl UnixListener ===
417
418impl UnixListener {
419 /// Creates a new UnixListener bound to the specified path.
420 ///
421 /// This function will bind a UnixListener to the specified path and associate it
422 /// with the default event loop's handler.
423 ///
424 /// # Panics
425 ///
426 /// This function panics if thread-local runtime is not set.
427 ///
428 /// The runtime is usually set implicitly when this function is called from a
429 /// future driven by a tokio runtime, otherwise runtime can be set explicitly
430 /// with `Handle::enter` function.
431 ///
432 /// # Examples
433 ///
434 /// ```
435 /// # use tempfile::tempdir;
436 /// use fd_queue::tokio::UnixListener;
437 ///
438 /// # tokio_test::block_on(async {
439 /// # let dir = tempdir()?;
440 /// # let path = dir.path().join("mysock");
441 /// // let path: Path = ...
442 /// let listener = UnixListener::bind(&path)?;
443 /// #
444 /// # Ok::<(), std::io::Error>(())
445 /// # });
446 pub fn bind(path: impl AsRef<Path>) -> io::Result<UnixListener> {
447 TokioUnixListener::bind(path).map(|l| l.into())
448 }
449
450 /// Returns the local socket address of this listener.
451 ///
452 /// # Examples
453 ///
454 /// ```
455 /// # use tempfile::tempdir;
456 /// use fd_queue::tokio::UnixListener;
457 ///
458 /// # tokio_test::block_on(async {
459 /// # let dir = tempdir()?;
460 /// # let path = dir.path().join("mysock");
461 /// // let path: Path = ...
462 /// let listener = UnixListener::bind(&path)?;
463 ///
464 /// let addr = listener.local_addr()?;
465 ///
466 /// match addr.as_pathname() {
467 /// Some(path) => println!("The local address is {}.", path.display()),
468 /// None => println!("The local address does not have a pathname"),
469 /// }
470 /// #
471 /// # Ok::<(), std::io::Error>(())
472 /// # });
473 pub fn local_addr(&self) -> io::Result<SocketAddr> {
474 to_addr(self.inner.local_addr()?)
475 }
476
477 /// Returns the value of the `SO_ERROR` option.
478 ///
479 /// # Examples
480 ///
481 /// ```
482 /// # use tempfile::tempdir;
483 /// use fd_queue::tokio::UnixListener;
484 ///
485 /// # tokio_test::block_on(async {
486 /// # let dir = tempdir()?;
487 /// # let path = dir.path().join("mysock");
488 /// // let path: Path = ...
489 /// let listener = UnixListener::bind(&path)?;
490 ///
491 /// let so_error = listener.take_error()?;
492 ///
493 /// match so_error {
494 /// Some(err) => println!("The SO_ERROR was {}.", err),
495 /// None => println!("There was no SO_ERROR."),
496 /// }
497 /// #
498 /// # Ok::<(), std::io::Error>(())
499 /// # });
500 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
501 self.inner.take_error()
502 }
503
504 /// Accepts a new incoming connection on this listener.
505 ///
506 /// # Panics
507 ///
508 /// This function panics if thread-local runtime is not set.
509 ///
510 /// The runtime is usually set implicitly when this function is called from a
511 /// future driven by a tokio runtime, otherwise runtime can be set explicitly
512 /// with `Handle::enter` function.
513 ///
514 /// # Examples
515 ///
516 /// ```
517 /// # use tempfile::tempdir;
518 /// # use fd_queue::tokio::UnixStream;
519 /// use fd_queue::tokio::UnixListener;
520 ///
521 /// # tokio_test::block_on(async {
522 /// # let dir = tempdir()?;
523 /// # let path = dir.path().join("mysock");
524 /// // let path: Path = ...
525 /// let mut listener = UnixListener::bind(&path)?;
526 /// # tokio::spawn(async move { UnixStream::connect(path).await });
527 ///
528 /// let (sock, addr) = listener.accept().await?;
529 /// #
530 /// # Ok::<(), std::io::Error>(())
531 /// # });
532 pub async fn accept(&mut self) -> io::Result<(UnixStream, SocketAddr)> {
533 self.inner
534 .accept()
535 .await
536 .and_then(|(stream, addr)| to_addr(addr).map(|addr| (stream.into(), addr)))
537 }
538
539 fn poll_accept(&self, cx: &mut Context) -> Poll<io::Result<(UnixStream, SocketAddr)>> {
540 self.inner.poll_accept(cx).map(|result| {
541 result.and_then(|(stream, addr)| to_addr(addr).map(|addr| (stream.into(), addr)))
542 })
543 }
544}
545
546impl AsRawFd for UnixListener {
547 fn as_raw_fd(&self) -> RawFd {
548 self.inner.as_raw_fd()
549 }
550}
551
552/// Produces a continuous stream of accepted connections.
553///
554/// This is the equivalent of calling `accept()` in a loop. It will never be ready
555/// with `None`.
556///
557/// # Panics
558///
559/// Polling the stream panics if thread-local runtime is not set.
560///
561/// The runtime is usually set implicitly when this function is called from a
562/// future driven by a tokio runtime, otherwise runtime can be set explicitly
563/// with `Handle::enter` function.
564///
565/// # Examples
566///
567/// ```
568/// # use tempfile::tempdir;
569/// # use fd_queue::tokio::UnixStream;
570/// use fd_queue::tokio::UnixListener;
571/// use futures_util::stream::StreamExt;
572///
573/// # tokio_test::block_on(async {
574/// # let dir = tempdir()?;
575/// # let path = dir.path().join("mysock");
576/// // let path: Path = ...
577/// let mut listener = UnixListener::bind(&path)?;
578/// # tokio::spawn(async move { UnixStream::connect(path).await });
579///
580/// let sock = listener.next().await.expect("Listener stream unexpectedly empty");
581/// #
582/// # Ok::<(), std::io::Error>(())
583/// # });
584impl Stream for UnixListener {
585 type Item = io::Result<UnixStream>;
586
587 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
588 use Poll::{Pending, Ready};
589
590 match self.poll_accept(cx) {
591 Pending => Pending,
592 Ready(Ok((stream, _))) => Ready(Some(Ok(stream))),
593 Ready(Err(err)) => Ready(Some(Err(err))),
594 }
595 }
596}
597
598impl From<TokioUnixListener> for UnixListener {
599 fn from(inner: TokioUnixListener) -> UnixListener {
600 UnixListener { inner }
601 }
602}
603
604// === utility functions ===
605
606fn to_addr(addr: TokioSocketAddr) -> io::Result<SocketAddr> {
607 addr.as_pathname()
608 .map_or(SocketAddr::from_pathname(""), |path| {
609 SocketAddr::from_pathname(path)
610 })
611}
612
613fn shutdown(socket: &impl AsRawFd, how: Shutdown) -> io::Result<()> {
614 let how = match how {
615 Shutdown::Write => libc::SHUT_WR,
616 Shutdown::Read => libc::SHUT_RD,
617 Shutdown::Both => libc::SHUT_RDWR,
618 };
619 // Safety: complies with the FFI documentation and the system call
620 // check the validity of the parameters.
621 let code = unsafe { libc::shutdown(socket.as_raw_fd(), how) };
622 if code == -1 {
623 Err(io::Error::last_os_error())
624 } else {
625 Ok(())
626 }
627}
628
629#[cfg(test)]
630mod tests {
631 use super::*;
632
633 use std::fs::File;
634 use std::io::{prelude::*, SeekFrom};
635 use std::os::unix::io::FromRawFd as _;
636
637 use tempfile::{tempdir, tempfile};
638 use tokio::io::{AsyncReadExt, AsyncWriteExt};
639
640 #[tokio::test]
641 async fn unix_stream_reads_other_sides_writes() {
642 let mut buf: [u8; 12] = [0; 12];
643
644 let (mut sut, mut other) = UnixStream::pair().expect("Can't create UnixStream's");
645 tokio::spawn(async move {
646 other
647 .write_all(b"Hello World!".as_ref())
648 .await
649 .expect("Can't write to UnixStream");
650 });
651 sut.read_exact(buf.as_mut())
652 .await
653 .expect("Can't read from UnixStream");
654
655 assert_eq!(&buf, b"Hello World!");
656 }
657
658 #[tokio::test]
659 async fn unix_stream_passes_fd() {
660 let mut file1 = tempfile().expect("Can't create temp file.");
661 file1
662 .write_all(b"Hello World!\0")
663 .expect("Can't write to temp file.");
664 file1.flush().expect("Can't flush temp file.");
665 file1
666 .seek(SeekFrom::Start(0))
667 .expect("Couldn't seek the file.");
668 let mut buf = [0u8];
669
670 let (mut sut, mut other) = UnixStream::pair().expect("Can't create UnixStream's");
671 tokio::spawn(async move {
672 other.enqueue(&file1).expect("Can't enqueue fd.");
673 other
674 .write_all(b"1".as_ref())
675 .await
676 .expect("Can't write to UnixStream");
677 });
678 sut.read_exact(buf.as_mut())
679 .await
680 .expect("Can't read from UnixStream");
681 let fd = sut.dequeue().expect("Can't dequeue fd");
682
683 let mut file2 = unsafe { File::from_raw_fd(fd) };
684 let mut buf2 = Vec::new();
685 file2.read_to_end(&mut buf2).expect("Can't read from file");
686 assert_eq!(&buf2[..], b"Hello World!\0".as_ref());
687 }
688
689 #[tokio::test]
690 async fn unix_stream_connects_to_listner() {
691 let dir = tempdir().expect("Can't create temp dir");
692 let sock_addr = dir.as_ref().join("socket");
693 let mut buf: [u8; 12] = [0; 12];
694
695 let mut listener = UnixListener::bind(&sock_addr).expect("Can't bind listener");
696 tokio::spawn(async move {
697 let mut client = UnixStream::connect(sock_addr)
698 .await
699 .expect("Can't connect to listener");
700 client
701 .write_all(b"Hello World!".as_ref())
702 .await
703 .expect("Can't write to client");
704 });
705 let (mut server, _) = listener.accept().await.expect("Can't accept on listener");
706 server
707 .read_exact(buf.as_mut())
708 .await
709 .expect("Can't read from server");
710
711 assert_eq!(&buf, b"Hello World!");
712 }
713}