fd_queue/
net.rs

1// Copyright 2020 Steven Bosnick
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE-2.0 or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms
8
9use std::{
10    io::{self, prelude::*, Error, IoSlice, IoSliceMut},
11    net::Shutdown,
12    os::unix::{
13        io::{AsRawFd, FromRawFd, IntoRawFd, RawFd},
14        net::{SocketAddr, UnixListener as StdUnixListner, UnixStream as StdUnixStream},
15    },
16    path::Path,
17};
18
19// needed until the MSRV is 1.43 when the associated constant becomes available
20use std::usize;
21
22use crate::biqueue::BiQueue;
23
24use crate::{DequeueFd, EnqueueFd, QueueFullError};
25
26/// A structure representing a connected Unix socket with support for passing
27/// [`RawFd`][RawFd].
28///
29/// This is the primary implementation of `EnqueueFd` and `DequeueFd` and it is based
30/// on a blocking, Unix domain socket. Conceptually the key interfaces on
31/// `UnixStream` interact as shown in the following diagram:
32///
33/// ```text
34/// EnqueueFd => Write => Read => DequeueFd
35/// ```
36///
37/// That is, you first enqueue a [`RawFd`][RawFd] to the `UnixStream` and then
38/// `Write` at least one byte. On the other side of the `UnixStream` you then `Read`
39/// at least one byte and then dequeue the [`RawFd`][RawFd].
40///
41/// # Examples
42///
43/// ```
44/// # use fd_queue::{EnqueueFd, DequeueFd, UnixStream};
45/// # use std::io::prelude::*;
46/// # use std::os::unix::io::FromRawFd;
47/// # use tempfile::tempfile;
48/// use std::fs::File;
49///
50/// let (mut sock1, mut sock2) = UnixStream::pair()?;
51///
52/// // sender side
53/// # let file1: File = tempfile()?;
54/// // let file1: File = ...
55/// sock1.enqueue(&file1).expect("Can't endqueue the file descriptor.");
56/// sock1.write(b"a")?;
57/// sock1.flush()?;
58///
59/// // receiver side
60/// let mut buf = [0u8; 1];
61/// sock2.read(&mut buf)?;
62/// let fd = sock2.dequeue().expect("Can't dequeue the file descriptor.");
63/// let file2 = unsafe { File::from_raw_fd(fd) };
64///
65/// # Ok::<(),std::io::Error>(())
66/// ```
67///
68/// [RawFd]: https://doc.rust-lang.org/stable/std/os/unix/io/type.RawFd.html
69#[derive(Debug)]
70pub struct UnixStream {
71    inner: StdUnixStream,
72    biqueue: BiQueue,
73}
74
75/// A structure representing a Unix domain socket server whose connected sockets
76/// have support for passing [`RawFd`][RawFd].
77///
78/// [RawFd]: https://doc.rust-lang.org/stable/std/os/unix/io/type.RawFd.html
79#[derive(Debug)]
80pub struct UnixListener {
81    inner: StdUnixListner,
82}
83
84/// An iterator over incoming connections to a `UnixListener`.
85///
86/// It is an infinite iterator that will never return `None`
87#[derive(Debug)]
88pub struct Incoming<'a> {
89    listener: &'a UnixListener,
90}
91
92// === impl UnixStream ===
93impl UnixStream {
94    /// The size of the bounded queue of outbound [`RawFd`][RawFd].
95    ///
96    /// [RawFd]: https://doc.rust-lang.org/stable/std/os/unix/io/type.RawFd.html
97    pub const FD_QUEUE_SIZE: usize = BiQueue::FD_QUEUE_SIZE;
98
99    /// Connects to the socket named by `path`.
100    ///
101    /// # Examples
102    ///
103    /// ```
104    /// # use std::thread;
105    /// # use fd_queue::UnixListener;
106    /// # use tempfile::tempdir;
107    /// use fd_queue::UnixStream;
108    ///
109    /// # let dir = tempdir()?;
110    /// # let path = dir.path().join("mysock");
111    /// // let path = ...
112    /// # let listener = UnixListener::bind(&path)?;
113    /// # thread::spawn(move || listener.accept());
114    ///
115    /// let sock = match UnixStream::connect(path) {
116    ///     Ok(sock) => sock,
117    ///     Err(e) => {
118    ///         println!("Couldn't connect to a socket: {}", e);
119    ///         return Ok(());
120    ///     }
121    /// };
122    ///
123    /// # Ok::<(), std::io::Error>(())
124    /// ```
125    pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
126        StdUnixStream::connect(path).map(|s| s.into())
127    }
128
129    /// Creates an unnamed pair of connected sockets.
130    ///
131    /// Returns two `UnixStream`s which are connected to each other.
132    ///
133    /// # Examples
134    ///
135    /// ```
136    /// use fd_queue::UnixStream;
137    ///
138    /// let (sock1, sock2) = match UnixStream::pair() {
139    ///     Ok((sock1, sock2)) => (sock1, sock2),
140    ///     Err(e) => {
141    ///         println!("Couldn't create a pair of sockets: {}", e);
142    ///         return;
143    ///     }
144    /// };
145    /// ```
146    pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
147        StdUnixStream::pair().map(|(s1, s2)| (s1.into(), s2.into()))
148    }
149
150    /// Creates a new independently owned handle to the underlying socket.
151    ///
152    /// The returned `UnixStream` is a reference to the same stream that this object references.
153    /// Both handles will read and write the same stream of data, and options set on one stream
154    /// will be propagated to the other stream.
155    ///
156    /// # Examples
157    ///
158    /// ```
159    /// use fd_queue::UnixStream;
160    ///
161    /// let (sock1, _) = UnixStream::pair()?;
162    ///
163    /// let sock2 = match sock1.try_clone() {
164    ///     Ok(sock) => sock,
165    ///     Err(e) => {
166    ///         println!("Couldn't clone a socket: {}", e);
167    ///         return Ok(());
168    ///     }
169    /// };
170    ///
171    /// # Ok::<(),std::io::Error>(())
172    /// ```
173    pub fn try_clone(&self) -> io::Result<UnixStream> {
174        self.inner.try_clone().map(|s| s.into())
175    }
176
177    /// Returns the socket address of the local half of this connection.
178    ///
179    /// # Examples
180    ///
181    /// ```
182    /// # use std::thread;
183    /// # use fd_queue::UnixListener;
184    /// # use tempfile::tempdir;
185    /// use fd_queue::UnixStream;
186    ///
187    /// # let dir = tempdir()?;
188    /// # let path = dir.path().join("mysock");
189    /// // let path = ...
190    /// # let listener = UnixListener::bind(&path)?;
191    /// # thread::spawn(move || listener.accept());
192    /// #
193    /// let sock = UnixStream::connect(path)?;
194    ///
195    /// let addr = match sock.local_addr() {
196    ///     Ok(addr) => addr,
197    ///     Err(e) => {
198    ///         println!("Couldn't get the local address: {}", e);
199    ///         return Ok(());
200    ///     }
201    /// };
202    ///
203    /// # Ok::<(),std::io::Error>(())
204    /// ```
205    pub fn local_addr(&self) -> io::Result<SocketAddr> {
206        self.inner.local_addr()
207    }
208
209    /// Returns the socket address of the remote half of this connection.
210    ///
211    /// # Examples
212    ///
213    /// ```
214    /// # use std::thread;
215    /// # use fd_queue::UnixListener;
216    /// # use tempfile::tempdir;
217    /// use fd_queue::UnixStream;
218    ///
219    /// # let dir = tempdir()?;
220    /// # let path = dir.path().join("mysock");
221    /// // let path = ...
222    /// # let listener = UnixListener::bind(&path)?;
223    /// # thread::spawn(move || listener.accept());
224    /// #
225    /// let sock = UnixStream::connect(path)?;
226    ///
227    /// let addr = match sock.peer_addr() {
228    ///     Ok(addr) => addr,
229    ///     Err(e) => {
230    ///         println!("Couldn't get the local address: {}", e);
231    ///         return Ok(());
232    ///     }
233    /// };
234    ///
235    /// # Ok::<(),std::io::Error>(())
236    /// ```
237    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
238        self.inner.peer_addr()
239    }
240
241    /// Returns the value of the `SO_ERROR` option.
242    ///
243    /// # Examples
244    ///
245    /// ```
246    /// use fd_queue::UnixStream;
247    ///
248    /// let (sock, _) = UnixStream::pair()?;
249    ///
250    /// let err = match sock.take_error() {
251    ///     Ok(Some(err)) => err,
252    ///     Ok(None) => {
253    ///         println!("No error found.");
254    ///         return Ok(());
255    ///     }
256    ///     Err(e) => {
257    ///         println!("Couldn't take the SO_ERROR option: {}", e);
258    ///         return Ok(());
259    ///     }
260    /// };
261    ///
262    /// # Ok::<(),std::io::Error>(())
263    /// ```
264    pub fn take_error(&self) -> io::Result<Option<Error>> {
265        self.inner.take_error()
266    }
267
268    /// Shuts down the read, write, or both halves of this connection.
269    ///
270    /// This function will cause all pending and future I/O calls on the specified portions to
271    /// immediately return with an appropriate value.
272    ///
273    /// # Examples
274    ///
275    /// ```
276    /// use fd_queue::UnixStream;
277    /// use std::net::Shutdown;
278    /// use std::io::Read;
279    ///
280    /// let (mut sock, _) = UnixStream::pair()?;
281    ///
282    /// sock.shutdown(Shutdown::Read).expect("Couldn't shutdown.");
283    ///
284    /// let mut buf = [0u8; 256];
285    /// match sock.read(buf.as_mut()) {
286    ///     Ok(0) => {},
287    ///     _ => panic!("Read unexpectly not shut down."),
288    /// }
289    ///
290    /// # Ok::<(),std::io::Error>(())
291    /// ```
292    pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
293        self.inner.shutdown(how)
294    }
295
296    #[allow(dead_code)]
297    pub(crate) fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
298        self.inner.set_nonblocking(nonblocking)
299    }
300}
301
302/// Enqueue a [`RawFd`][RawFd] for later transmission across the `UnixStream`.
303///
304/// The [`RawFd`][RawFd] will be transmitted on a later call to a method of `Write`.
305/// The number of [`RawFd`][RawFd] that can be enqueued before being transmitted is
306/// bounded by `FD_QUEUE_SIZE`.
307///
308/// [RawFd]: https://doc.rust-lang.org/stable/std/os/unix/io/type.RawFd.html
309impl EnqueueFd for UnixStream {
310    fn enqueue(&mut self, fd: &impl AsRawFd) -> std::result::Result<(), QueueFullError> {
311        self.biqueue.enqueue(fd)
312    }
313}
314
315/// Dequeue a [`RawFd`][RawFd] that was previously transmitted across the
316/// `UnixStream`.
317///
318/// The [`RawFd`][RawFd] that are dequeued were transmitted by a previous call to a
319/// method of `Read`.
320///
321/// [RawFd]: https://doc.rust-lang.org/stable/std/os/unix/io/type.RawFd.html
322impl DequeueFd for UnixStream {
323    fn dequeue(&mut self) -> Option<RawFd> {
324        self.biqueue.dequeue()
325    }
326}
327
328/// Receive bytes and [`RawFd`][RawFd] that are transmitted across the `UnixStream`.
329///
330/// The [`RawFd`][RawFd] that are received along with the bytes will be available
331/// through the method of the `DequeueFd` implementation. The number of
332/// [`RawFd`][RawFd] that can be received in a single call to one of the `Read`
333/// methods is bounded by `FD_QUEUE_SIZE`. It is an error if the other side of this
334/// `UnixStream` attempted to send more control messages (including [`RawFd`][RawFd])
335/// than will fit in the buffer that has been sized for receiving up to
336/// `FD_QUEUE_SIZE` [`RawFd`][RawFd].
337///
338/// [RawFd]: https://doc.rust-lang.org/stable/std/os/unix/io/type.RawFd.html
339impl Read for UnixStream {
340    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
341        self.read_vectored(&mut [IoSliceMut::new(buf)])
342    }
343
344    fn read_vectored(&mut self, bufs: &mut [IoSliceMut]) -> io::Result<usize> {
345        self.biqueue.read_vectored(self.as_raw_fd(), bufs)
346    }
347}
348
349/// Transmit bytes and [`RawFd`][RawFd] across the `UnixStream`.
350///
351/// The [`RawFd`][RawFd] that are transmitted along with the bytes are ones that were
352/// previously enqueued for transmission through the method of `EnqueueFd`.
353///
354/// [RawFd]: https://doc.rust-lang.org/stable/std/os/unix/io/type.RawFd.html
355impl Write for UnixStream {
356    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
357        self.write_vectored(&[IoSlice::new(buf)])
358    }
359
360    fn write_vectored(&mut self, bufs: &[IoSlice]) -> io::Result<usize> {
361        self.biqueue.write_vectored(self.as_raw_fd(), bufs)
362    }
363
364    fn flush(&mut self) -> io::Result<()> {
365        self.inner.flush()
366    }
367}
368
369impl AsRawFd for UnixStream {
370    fn as_raw_fd(&self) -> RawFd {
371        self.inner.as_raw_fd()
372    }
373}
374
375impl FromRawFd for UnixStream {
376    unsafe fn from_raw_fd(fd: RawFd) -> Self {
377        StdUnixStream::from_raw_fd(fd).into()
378    }
379}
380
381impl IntoRawFd for UnixStream {
382    fn into_raw_fd(self) -> RawFd {
383        self.inner.into_raw_fd()
384    }
385}
386
387impl From<StdUnixStream> for UnixStream {
388    fn from(inner: StdUnixStream) -> Self {
389        Self {
390            inner,
391            biqueue: BiQueue::new(),
392        }
393    }
394}
395
396// === impl UnixListener ===
397impl UnixListener {
398    /// Create a new `UnixListener` bound to the specified socket.
399    ///
400    /// # Examples
401    ///
402    /// ```
403    /// use fd_queue::UnixListener;
404    /// # use tempfile::tempdir;
405    /// # let dir = tempdir()?;
406    /// # let path = dir.path().join("mysocket");
407    /// // let path = ...
408    /// let listener = match UnixListener::bind(&path) {
409    ///     Ok(listener) => listener,
410    ///     Err(e) => {
411    ///         println!("Can't bind the unix socket libtest: {}", e);
412    ///         return Ok(());
413    ///     }
414    /// };
415    ///
416    /// # Ok::<(),std::io::Error>(())
417    /// ```
418    pub fn bind(path: impl AsRef<Path>) -> io::Result<UnixListener> {
419        StdUnixListner::bind(path).map(|s| s.into())
420    }
421
422    /// Accepts a new incoming connection to this server.
423    ///
424    /// This function will block the calling thread until a new Unix connection is
425    /// established. When established the corresponding `UnixStream` and the remote
426    /// peer's address will be returned.
427    ///
428    /// # Examples
429    ///
430    /// ```
431    /// use fd_queue::UnixListener;
432    /// # use fd_queue::UnixStream;
433    /// # use std::thread;
434    /// # use tempfile::tempdir;
435    /// # let dir = tempdir()?;
436    /// # let path = dir.path().join("mysocket");
437    ///
438    /// // let path = ...
439    /// let listener = UnixListener::bind(&path)?;
440    /// # thread::spawn(move || UnixStream::connect(path).expect("Can't connect"));
441    ///
442    /// let (sock, addr) = match listener.accept() {
443    ///     Ok((sock, addr)) => (sock, addr),
444    ///     Err(e) => {
445    ///         println!("Can't accept unix stream: {}", e);
446    ///         return Ok(());
447    ///     }
448    /// };
449    ///
450    /// # Ok::<(),std::io::Error>(())
451    /// ```
452    pub fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
453        self.inner.accept().map(|(s, a)| (s.into(), a))
454    }
455
456    /// Create a new independently owned handle to the underlying socket.
457    ///
458    /// The returned `UnixListener` is a reference to the same socket that this
459    /// object references. Both handles can be used to accept incoming connections
460    /// and options set on one will affect the other.
461    ///
462    /// # Examples
463    ///
464    /// ```
465    /// use fd_queue::UnixListener;
466    /// # use tempfile::tempdir;
467    /// # let dir = tempdir()?;
468    /// # let path = dir.path().join("mysocket");
469    ///
470    /// // let path = ...
471    /// let listener1 = UnixListener::bind(&path)?;
472    ///
473    /// let listener2 = match listener1.try_clone() {
474    ///     Ok(listener) => listener,
475    ///     Err(e) => {
476    ///         println!("Can't clone listener: {}", e);
477    ///         return Ok(());
478    ///     }
479    /// };
480    ///
481    /// # Ok::<(),std::io::Error>(())
482    /// ```
483    pub fn try_clone(&self) -> io::Result<UnixListener> {
484        self.inner.try_clone().map(|s| s.into())
485    }
486
487    /// Returns the local address of of this listener.
488    ///
489    /// # Examples
490    ///
491    /// ```
492    /// use fd_queue::UnixListener;
493    /// # use tempfile::tempdir;
494    /// # let dir = tempdir()?;
495    /// # let path = dir.path().join("mysocket");
496    ///
497    /// // let path = ...
498    /// let listener = UnixListener::bind(&path)?;
499    ///
500    /// let addr = match listener.local_addr() {
501    ///     Ok(addr) => addr,
502    ///     Err(e) => {
503    ///         println!("Couldn't get local address: {}", e);
504    ///         return Ok(());
505    ///     }
506    /// };
507    ///
508    /// # Ok::<(),std::io::Error>(())
509    /// ```
510    pub fn local_addr(&self) -> io::Result<SocketAddr> {
511        self.inner.local_addr()
512    }
513
514    /// Return the value of the `SO_ERROR` option.
515    ///
516    /// # Examples
517    ///
518    /// ```
519    /// use fd_queue::UnixListener;
520    /// # use tempfile::tempdir;
521    /// # let dir = tempdir()?;
522    /// # let path = dir.path().join("mysocket");
523    ///
524    /// // let path = ...
525    /// let listener = UnixListener::bind(&path)?;
526    ///
527    /// let err = match listener.take_error() {
528    ///     Ok(Some(err)) => err,
529    ///     Ok(None) => {
530    ///         println!("There was no SO_ERROR option pending.");
531    ///         return Ok(());
532    ///     }
533    ///     Err(e) => {
534    ///         println!("Couldn't get the SO_ERROR option: {}", e);
535    ///         return Ok(())
536    ///     }
537    /// };
538    ///
539    /// # Ok::<(),std::io::Error>(())
540    /// ```
541    pub fn take_error(&self) -> io::Result<Option<Error>> {
542        self.inner.take_error()
543    }
544
545    /// Returns an iterator over incoming connections.
546    ///
547    /// The iterator will never return `None` and also will not yield the peer's
548    /// [`SocketAddr`][SocketAddr] structure.
549    ///
550    /// # Examples
551    ///
552    /// ```
553    /// use fd_queue::UnixListener;
554    /// # use fd_queue::UnixStream;
555    /// # use std::thread;
556    /// # use tempfile::tempdir;
557    /// # let dir = tempdir()?;
558    /// # let path = dir.path().join("mysocket");
559    ///
560    /// // let path = ...
561    /// let listener = UnixListener::bind(&path)?;
562    /// # thread::spawn(move || UnixStream::connect(path).expect("Can't connect"));
563    ///
564    /// let mut incoming = listener.incoming();
565    ///
566    /// let sock = match incoming.next() {
567    ///     Some(Ok(sock)) => sock,
568    ///     Some(Err(e)) => {
569    ///         println!("Can't get the next incoming socket: {}", e);
570    ///         return Ok(());
571    ///     }
572    ///     None => unreachable!(),
573    /// };
574    ///
575    /// # Ok::<(),std::io::Error>(())
576    /// ```
577    ///
578    /// [SocketAddr]: https://doc.rust-lang.org/stable/std/os/unix/net/struct.SocketAddr.html
579    pub fn incoming(&self) -> Incoming {
580        Incoming { listener: self }
581    }
582}
583
584impl AsRawFd for UnixListener {
585    fn as_raw_fd(&self) -> RawFd {
586        self.inner.as_raw_fd()
587    }
588}
589
590impl FromRawFd for UnixListener {
591    unsafe fn from_raw_fd(fd: RawFd) -> Self {
592        StdUnixListner::from_raw_fd(fd).into()
593    }
594}
595
596impl IntoRawFd for UnixListener {
597    fn into_raw_fd(self) -> RawFd {
598        self.inner.into_raw_fd()
599    }
600}
601
602impl<'a> IntoIterator for &'a UnixListener {
603    type Item = io::Result<UnixStream>;
604    type IntoIter = Incoming<'a>;
605
606    fn into_iter(self) -> Self::IntoIter {
607        self.incoming()
608    }
609}
610
611impl From<StdUnixListner> for UnixListener {
612    fn from(inner: StdUnixListner) -> Self {
613        UnixListener { inner }
614    }
615}
616
617// === impl Incoming ===
618impl Iterator for Incoming<'_> {
619    type Item = io::Result<UnixStream>;
620
621    fn next(&mut self) -> Option<Self::Item> {
622        Some(self.listener.accept().map(|(s, _)| s))
623    }
624
625    fn size_hint(&self) -> (usize, Option<usize>) {
626        (usize::MAX, None)
627    }
628}
629
630#[cfg(test)]
631mod test {
632    use super::*;
633
634    use std::convert::AsMut;
635    use std::ffi::c_void;
636    use std::ptr;
637    use std::slice;
638
639    use nix::fcntl::OFlag;
640    use nix::sys::mman::{mmap, munmap, shm_open, shm_unlink, MapFlags, ProtFlags};
641    use nix::sys::stat::Mode;
642    use nix::unistd::{close, ftruncate};
643
644    struct Shm {
645        fd: RawFd,
646        ptr: *mut u8,
647        len: usize,
648        name: String,
649    }
650
651    impl Shm {
652        fn new(name: &str, size: i64) -> Shm {
653            let oflag = OFlag::O_CREAT | OFlag::O_RDWR;
654            let fd =
655                shm_open(name, oflag, Mode::S_IRUSR | Mode::S_IWUSR).expect("Can't create shm.");
656            ftruncate(fd, size).expect("Can't ftruncate");
657            let len: usize = size as usize;
658
659            let prot = ProtFlags::PROT_READ | ProtFlags::PROT_WRITE;
660            let flags = MapFlags::MAP_SHARED;
661
662            let ptr = unsafe {
663                mmap(ptr::null_mut(), len, prot, flags, fd, 0).expect("Can't mmap") as *mut u8
664            };
665
666            Shm {
667                fd,
668                ptr,
669                len,
670                name: name.to_string(),
671            }
672        }
673
674        fn from_raw_fd(fd: RawFd, size: usize) -> Shm {
675            let prot = ProtFlags::PROT_READ | ProtFlags::PROT_WRITE;
676            let flags = MapFlags::MAP_SHARED;
677
678            let ptr = unsafe {
679                mmap(ptr::null_mut(), size, prot, flags, fd, 0).expect("Can't mmap") as *mut u8
680            };
681
682            Shm {
683                fd,
684                ptr,
685                len: size,
686                name: String::new(),
687            }
688        }
689    }
690
691    impl Drop for Shm {
692        fn drop(&mut self) {
693            unsafe {
694                munmap(self.ptr as *mut c_void, self.len).expect("Can't munmap");
695            }
696            close(self.fd).expect("Can't close");
697            if !self.name.is_empty() {
698                let name: &str = self.name.as_ref();
699                shm_unlink(name).expect("Can't shm_unlink");
700            }
701        }
702    }
703
704    impl AsMut<[u8]> for Shm {
705        fn as_mut(&mut self) -> &mut [u8] {
706            unsafe { slice::from_raw_parts_mut(self.ptr, self.len) }
707        }
708    }
709
710    impl AsRawFd for Shm {
711        fn as_raw_fd(&self) -> RawFd {
712            self.fd
713        }
714    }
715
716    fn make_hello(name: &str) -> Shm {
717        let hello = b"Hello World!\0";
718        let mut shm = Shm::new(name, hello.len() as i64);
719
720        shm.as_mut().copy_from_slice(hello.as_ref());
721
722        shm
723    }
724
725    fn compare_hello(fd: RawFd) -> bool {
726        let hello = b"Hello World!\0";
727        let mut shm = Shm::from_raw_fd(fd, hello.len());
728
729        &shm.as_mut()[..hello.len()] == hello.as_ref()
730    }
731
732    #[test]
733    fn unix_stream_passes_fd() {
734        let shm = make_hello("/unix_stream_passes_fd");
735        let mut buf = vec![0; 20];
736
737        let (mut sut1, mut sut2) = UnixStream::pair().expect("Can't make pair");
738        sut1.enqueue(&shm).expect("Can't enqueue");
739        sut1.write(b"abc").expect("Can't write");
740        sut1.flush().expect("Can't flush");
741        sut2.read(&mut buf).expect("Can't read");
742        let fd = sut2.dequeue().expect("Empty fd queue");
743
744        assert!(fd != shm.fd, "fd's unexpectedly equal");
745        assert!(compare_hello(fd), "fd didn't contain expect contents");
746    }
747}