fd_queue/net.rs
1// Copyright 2020 Steven Bosnick
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE-2.0 or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms
8
9use std::{
10 io::{self, prelude::*, Error, IoSlice, IoSliceMut},
11 net::Shutdown,
12 os::unix::{
13 io::{AsRawFd, FromRawFd, IntoRawFd, RawFd},
14 net::{SocketAddr, UnixListener as StdUnixListner, UnixStream as StdUnixStream},
15 },
16 path::Path,
17};
18
19// needed until the MSRV is 1.43 when the associated constant becomes available
20use std::usize;
21
22use crate::biqueue::BiQueue;
23
24use crate::{DequeueFd, EnqueueFd, QueueFullError};
25
26/// A structure representing a connected Unix socket with support for passing
27/// [`RawFd`][RawFd].
28///
29/// This is the primary implementation of `EnqueueFd` and `DequeueFd` and it is based
30/// on a blocking, Unix domain socket. Conceptually the key interfaces on
31/// `UnixStream` interact as shown in the following diagram:
32///
33/// ```text
34/// EnqueueFd => Write => Read => DequeueFd
35/// ```
36///
37/// That is, you first enqueue a [`RawFd`][RawFd] to the `UnixStream` and then
38/// `Write` at least one byte. On the other side of the `UnixStream` you then `Read`
39/// at least one byte and then dequeue the [`RawFd`][RawFd].
40///
41/// # Examples
42///
43/// ```
44/// # use fd_queue::{EnqueueFd, DequeueFd, UnixStream};
45/// # use std::io::prelude::*;
46/// # use std::os::unix::io::FromRawFd;
47/// # use tempfile::tempfile;
48/// use std::fs::File;
49///
50/// let (mut sock1, mut sock2) = UnixStream::pair()?;
51///
52/// // sender side
53/// # let file1: File = tempfile()?;
54/// // let file1: File = ...
55/// sock1.enqueue(&file1).expect("Can't endqueue the file descriptor.");
56/// sock1.write(b"a")?;
57/// sock1.flush()?;
58///
59/// // receiver side
60/// let mut buf = [0u8; 1];
61/// sock2.read(&mut buf)?;
62/// let fd = sock2.dequeue().expect("Can't dequeue the file descriptor.");
63/// let file2 = unsafe { File::from_raw_fd(fd) };
64///
65/// # Ok::<(),std::io::Error>(())
66/// ```
67///
68/// [RawFd]: https://doc.rust-lang.org/stable/std/os/unix/io/type.RawFd.html
69#[derive(Debug)]
70pub struct UnixStream {
71 inner: StdUnixStream,
72 biqueue: BiQueue,
73}
74
75/// A structure representing a Unix domain socket server whose connected sockets
76/// have support for passing [`RawFd`][RawFd].
77///
78/// [RawFd]: https://doc.rust-lang.org/stable/std/os/unix/io/type.RawFd.html
79#[derive(Debug)]
80pub struct UnixListener {
81 inner: StdUnixListner,
82}
83
84/// An iterator over incoming connections to a `UnixListener`.
85///
86/// It is an infinite iterator that will never return `None`
87#[derive(Debug)]
88pub struct Incoming<'a> {
89 listener: &'a UnixListener,
90}
91
92// === impl UnixStream ===
93impl UnixStream {
94 /// The size of the bounded queue of outbound [`RawFd`][RawFd].
95 ///
96 /// [RawFd]: https://doc.rust-lang.org/stable/std/os/unix/io/type.RawFd.html
97 pub const FD_QUEUE_SIZE: usize = BiQueue::FD_QUEUE_SIZE;
98
99 /// Connects to the socket named by `path`.
100 ///
101 /// # Examples
102 ///
103 /// ```
104 /// # use std::thread;
105 /// # use fd_queue::UnixListener;
106 /// # use tempfile::tempdir;
107 /// use fd_queue::UnixStream;
108 ///
109 /// # let dir = tempdir()?;
110 /// # let path = dir.path().join("mysock");
111 /// // let path = ...
112 /// # let listener = UnixListener::bind(&path)?;
113 /// # thread::spawn(move || listener.accept());
114 ///
115 /// let sock = match UnixStream::connect(path) {
116 /// Ok(sock) => sock,
117 /// Err(e) => {
118 /// println!("Couldn't connect to a socket: {}", e);
119 /// return Ok(());
120 /// }
121 /// };
122 ///
123 /// # Ok::<(), std::io::Error>(())
124 /// ```
125 pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
126 StdUnixStream::connect(path).map(|s| s.into())
127 }
128
129 /// Creates an unnamed pair of connected sockets.
130 ///
131 /// Returns two `UnixStream`s which are connected to each other.
132 ///
133 /// # Examples
134 ///
135 /// ```
136 /// use fd_queue::UnixStream;
137 ///
138 /// let (sock1, sock2) = match UnixStream::pair() {
139 /// Ok((sock1, sock2)) => (sock1, sock2),
140 /// Err(e) => {
141 /// println!("Couldn't create a pair of sockets: {}", e);
142 /// return;
143 /// }
144 /// };
145 /// ```
146 pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
147 StdUnixStream::pair().map(|(s1, s2)| (s1.into(), s2.into()))
148 }
149
150 /// Creates a new independently owned handle to the underlying socket.
151 ///
152 /// The returned `UnixStream` is a reference to the same stream that this object references.
153 /// Both handles will read and write the same stream of data, and options set on one stream
154 /// will be propagated to the other stream.
155 ///
156 /// # Examples
157 ///
158 /// ```
159 /// use fd_queue::UnixStream;
160 ///
161 /// let (sock1, _) = UnixStream::pair()?;
162 ///
163 /// let sock2 = match sock1.try_clone() {
164 /// Ok(sock) => sock,
165 /// Err(e) => {
166 /// println!("Couldn't clone a socket: {}", e);
167 /// return Ok(());
168 /// }
169 /// };
170 ///
171 /// # Ok::<(),std::io::Error>(())
172 /// ```
173 pub fn try_clone(&self) -> io::Result<UnixStream> {
174 self.inner.try_clone().map(|s| s.into())
175 }
176
177 /// Returns the socket address of the local half of this connection.
178 ///
179 /// # Examples
180 ///
181 /// ```
182 /// # use std::thread;
183 /// # use fd_queue::UnixListener;
184 /// # use tempfile::tempdir;
185 /// use fd_queue::UnixStream;
186 ///
187 /// # let dir = tempdir()?;
188 /// # let path = dir.path().join("mysock");
189 /// // let path = ...
190 /// # let listener = UnixListener::bind(&path)?;
191 /// # thread::spawn(move || listener.accept());
192 /// #
193 /// let sock = UnixStream::connect(path)?;
194 ///
195 /// let addr = match sock.local_addr() {
196 /// Ok(addr) => addr,
197 /// Err(e) => {
198 /// println!("Couldn't get the local address: {}", e);
199 /// return Ok(());
200 /// }
201 /// };
202 ///
203 /// # Ok::<(),std::io::Error>(())
204 /// ```
205 pub fn local_addr(&self) -> io::Result<SocketAddr> {
206 self.inner.local_addr()
207 }
208
209 /// Returns the socket address of the remote half of this connection.
210 ///
211 /// # Examples
212 ///
213 /// ```
214 /// # use std::thread;
215 /// # use fd_queue::UnixListener;
216 /// # use tempfile::tempdir;
217 /// use fd_queue::UnixStream;
218 ///
219 /// # let dir = tempdir()?;
220 /// # let path = dir.path().join("mysock");
221 /// // let path = ...
222 /// # let listener = UnixListener::bind(&path)?;
223 /// # thread::spawn(move || listener.accept());
224 /// #
225 /// let sock = UnixStream::connect(path)?;
226 ///
227 /// let addr = match sock.peer_addr() {
228 /// Ok(addr) => addr,
229 /// Err(e) => {
230 /// println!("Couldn't get the local address: {}", e);
231 /// return Ok(());
232 /// }
233 /// };
234 ///
235 /// # Ok::<(),std::io::Error>(())
236 /// ```
237 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
238 self.inner.peer_addr()
239 }
240
241 /// Returns the value of the `SO_ERROR` option.
242 ///
243 /// # Examples
244 ///
245 /// ```
246 /// use fd_queue::UnixStream;
247 ///
248 /// let (sock, _) = UnixStream::pair()?;
249 ///
250 /// let err = match sock.take_error() {
251 /// Ok(Some(err)) => err,
252 /// Ok(None) => {
253 /// println!("No error found.");
254 /// return Ok(());
255 /// }
256 /// Err(e) => {
257 /// println!("Couldn't take the SO_ERROR option: {}", e);
258 /// return Ok(());
259 /// }
260 /// };
261 ///
262 /// # Ok::<(),std::io::Error>(())
263 /// ```
264 pub fn take_error(&self) -> io::Result<Option<Error>> {
265 self.inner.take_error()
266 }
267
268 /// Shuts down the read, write, or both halves of this connection.
269 ///
270 /// This function will cause all pending and future I/O calls on the specified portions to
271 /// immediately return with an appropriate value.
272 ///
273 /// # Examples
274 ///
275 /// ```
276 /// use fd_queue::UnixStream;
277 /// use std::net::Shutdown;
278 /// use std::io::Read;
279 ///
280 /// let (mut sock, _) = UnixStream::pair()?;
281 ///
282 /// sock.shutdown(Shutdown::Read).expect("Couldn't shutdown.");
283 ///
284 /// let mut buf = [0u8; 256];
285 /// match sock.read(buf.as_mut()) {
286 /// Ok(0) => {},
287 /// _ => panic!("Read unexpectly not shut down."),
288 /// }
289 ///
290 /// # Ok::<(),std::io::Error>(())
291 /// ```
292 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
293 self.inner.shutdown(how)
294 }
295
296 #[allow(dead_code)]
297 pub(crate) fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
298 self.inner.set_nonblocking(nonblocking)
299 }
300}
301
302/// Enqueue a [`RawFd`][RawFd] for later transmission across the `UnixStream`.
303///
304/// The [`RawFd`][RawFd] will be transmitted on a later call to a method of `Write`.
305/// The number of [`RawFd`][RawFd] that can be enqueued before being transmitted is
306/// bounded by `FD_QUEUE_SIZE`.
307///
308/// [RawFd]: https://doc.rust-lang.org/stable/std/os/unix/io/type.RawFd.html
309impl EnqueueFd for UnixStream {
310 fn enqueue(&mut self, fd: &impl AsRawFd) -> std::result::Result<(), QueueFullError> {
311 self.biqueue.enqueue(fd)
312 }
313}
314
315/// Dequeue a [`RawFd`][RawFd] that was previously transmitted across the
316/// `UnixStream`.
317///
318/// The [`RawFd`][RawFd] that are dequeued were transmitted by a previous call to a
319/// method of `Read`.
320///
321/// [RawFd]: https://doc.rust-lang.org/stable/std/os/unix/io/type.RawFd.html
322impl DequeueFd for UnixStream {
323 fn dequeue(&mut self) -> Option<RawFd> {
324 self.biqueue.dequeue()
325 }
326}
327
328/// Receive bytes and [`RawFd`][RawFd] that are transmitted across the `UnixStream`.
329///
330/// The [`RawFd`][RawFd] that are received along with the bytes will be available
331/// through the method of the `DequeueFd` implementation. The number of
332/// [`RawFd`][RawFd] that can be received in a single call to one of the `Read`
333/// methods is bounded by `FD_QUEUE_SIZE`. It is an error if the other side of this
334/// `UnixStream` attempted to send more control messages (including [`RawFd`][RawFd])
335/// than will fit in the buffer that has been sized for receiving up to
336/// `FD_QUEUE_SIZE` [`RawFd`][RawFd].
337///
338/// [RawFd]: https://doc.rust-lang.org/stable/std/os/unix/io/type.RawFd.html
339impl Read for UnixStream {
340 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
341 self.read_vectored(&mut [IoSliceMut::new(buf)])
342 }
343
344 fn read_vectored(&mut self, bufs: &mut [IoSliceMut]) -> io::Result<usize> {
345 self.biqueue.read_vectored(self.as_raw_fd(), bufs)
346 }
347}
348
349/// Transmit bytes and [`RawFd`][RawFd] across the `UnixStream`.
350///
351/// The [`RawFd`][RawFd] that are transmitted along with the bytes are ones that were
352/// previously enqueued for transmission through the method of `EnqueueFd`.
353///
354/// [RawFd]: https://doc.rust-lang.org/stable/std/os/unix/io/type.RawFd.html
355impl Write for UnixStream {
356 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
357 self.write_vectored(&[IoSlice::new(buf)])
358 }
359
360 fn write_vectored(&mut self, bufs: &[IoSlice]) -> io::Result<usize> {
361 self.biqueue.write_vectored(self.as_raw_fd(), bufs)
362 }
363
364 fn flush(&mut self) -> io::Result<()> {
365 self.inner.flush()
366 }
367}
368
369impl AsRawFd for UnixStream {
370 fn as_raw_fd(&self) -> RawFd {
371 self.inner.as_raw_fd()
372 }
373}
374
375impl FromRawFd for UnixStream {
376 unsafe fn from_raw_fd(fd: RawFd) -> Self {
377 StdUnixStream::from_raw_fd(fd).into()
378 }
379}
380
381impl IntoRawFd for UnixStream {
382 fn into_raw_fd(self) -> RawFd {
383 self.inner.into_raw_fd()
384 }
385}
386
387impl From<StdUnixStream> for UnixStream {
388 fn from(inner: StdUnixStream) -> Self {
389 Self {
390 inner,
391 biqueue: BiQueue::new(),
392 }
393 }
394}
395
396// === impl UnixListener ===
397impl UnixListener {
398 /// Create a new `UnixListener` bound to the specified socket.
399 ///
400 /// # Examples
401 ///
402 /// ```
403 /// use fd_queue::UnixListener;
404 /// # use tempfile::tempdir;
405 /// # let dir = tempdir()?;
406 /// # let path = dir.path().join("mysocket");
407 /// // let path = ...
408 /// let listener = match UnixListener::bind(&path) {
409 /// Ok(listener) => listener,
410 /// Err(e) => {
411 /// println!("Can't bind the unix socket libtest: {}", e);
412 /// return Ok(());
413 /// }
414 /// };
415 ///
416 /// # Ok::<(),std::io::Error>(())
417 /// ```
418 pub fn bind(path: impl AsRef<Path>) -> io::Result<UnixListener> {
419 StdUnixListner::bind(path).map(|s| s.into())
420 }
421
422 /// Accepts a new incoming connection to this server.
423 ///
424 /// This function will block the calling thread until a new Unix connection is
425 /// established. When established the corresponding `UnixStream` and the remote
426 /// peer's address will be returned.
427 ///
428 /// # Examples
429 ///
430 /// ```
431 /// use fd_queue::UnixListener;
432 /// # use fd_queue::UnixStream;
433 /// # use std::thread;
434 /// # use tempfile::tempdir;
435 /// # let dir = tempdir()?;
436 /// # let path = dir.path().join("mysocket");
437 ///
438 /// // let path = ...
439 /// let listener = UnixListener::bind(&path)?;
440 /// # thread::spawn(move || UnixStream::connect(path).expect("Can't connect"));
441 ///
442 /// let (sock, addr) = match listener.accept() {
443 /// Ok((sock, addr)) => (sock, addr),
444 /// Err(e) => {
445 /// println!("Can't accept unix stream: {}", e);
446 /// return Ok(());
447 /// }
448 /// };
449 ///
450 /// # Ok::<(),std::io::Error>(())
451 /// ```
452 pub fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
453 self.inner.accept().map(|(s, a)| (s.into(), a))
454 }
455
456 /// Create a new independently owned handle to the underlying socket.
457 ///
458 /// The returned `UnixListener` is a reference to the same socket that this
459 /// object references. Both handles can be used to accept incoming connections
460 /// and options set on one will affect the other.
461 ///
462 /// # Examples
463 ///
464 /// ```
465 /// use fd_queue::UnixListener;
466 /// # use tempfile::tempdir;
467 /// # let dir = tempdir()?;
468 /// # let path = dir.path().join("mysocket");
469 ///
470 /// // let path = ...
471 /// let listener1 = UnixListener::bind(&path)?;
472 ///
473 /// let listener2 = match listener1.try_clone() {
474 /// Ok(listener) => listener,
475 /// Err(e) => {
476 /// println!("Can't clone listener: {}", e);
477 /// return Ok(());
478 /// }
479 /// };
480 ///
481 /// # Ok::<(),std::io::Error>(())
482 /// ```
483 pub fn try_clone(&self) -> io::Result<UnixListener> {
484 self.inner.try_clone().map(|s| s.into())
485 }
486
487 /// Returns the local address of of this listener.
488 ///
489 /// # Examples
490 ///
491 /// ```
492 /// use fd_queue::UnixListener;
493 /// # use tempfile::tempdir;
494 /// # let dir = tempdir()?;
495 /// # let path = dir.path().join("mysocket");
496 ///
497 /// // let path = ...
498 /// let listener = UnixListener::bind(&path)?;
499 ///
500 /// let addr = match listener.local_addr() {
501 /// Ok(addr) => addr,
502 /// Err(e) => {
503 /// println!("Couldn't get local address: {}", e);
504 /// return Ok(());
505 /// }
506 /// };
507 ///
508 /// # Ok::<(),std::io::Error>(())
509 /// ```
510 pub fn local_addr(&self) -> io::Result<SocketAddr> {
511 self.inner.local_addr()
512 }
513
514 /// Return the value of the `SO_ERROR` option.
515 ///
516 /// # Examples
517 ///
518 /// ```
519 /// use fd_queue::UnixListener;
520 /// # use tempfile::tempdir;
521 /// # let dir = tempdir()?;
522 /// # let path = dir.path().join("mysocket");
523 ///
524 /// // let path = ...
525 /// let listener = UnixListener::bind(&path)?;
526 ///
527 /// let err = match listener.take_error() {
528 /// Ok(Some(err)) => err,
529 /// Ok(None) => {
530 /// println!("There was no SO_ERROR option pending.");
531 /// return Ok(());
532 /// }
533 /// Err(e) => {
534 /// println!("Couldn't get the SO_ERROR option: {}", e);
535 /// return Ok(())
536 /// }
537 /// };
538 ///
539 /// # Ok::<(),std::io::Error>(())
540 /// ```
541 pub fn take_error(&self) -> io::Result<Option<Error>> {
542 self.inner.take_error()
543 }
544
545 /// Returns an iterator over incoming connections.
546 ///
547 /// The iterator will never return `None` and also will not yield the peer's
548 /// [`SocketAddr`][SocketAddr] structure.
549 ///
550 /// # Examples
551 ///
552 /// ```
553 /// use fd_queue::UnixListener;
554 /// # use fd_queue::UnixStream;
555 /// # use std::thread;
556 /// # use tempfile::tempdir;
557 /// # let dir = tempdir()?;
558 /// # let path = dir.path().join("mysocket");
559 ///
560 /// // let path = ...
561 /// let listener = UnixListener::bind(&path)?;
562 /// # thread::spawn(move || UnixStream::connect(path).expect("Can't connect"));
563 ///
564 /// let mut incoming = listener.incoming();
565 ///
566 /// let sock = match incoming.next() {
567 /// Some(Ok(sock)) => sock,
568 /// Some(Err(e)) => {
569 /// println!("Can't get the next incoming socket: {}", e);
570 /// return Ok(());
571 /// }
572 /// None => unreachable!(),
573 /// };
574 ///
575 /// # Ok::<(),std::io::Error>(())
576 /// ```
577 ///
578 /// [SocketAddr]: https://doc.rust-lang.org/stable/std/os/unix/net/struct.SocketAddr.html
579 pub fn incoming(&self) -> Incoming {
580 Incoming { listener: self }
581 }
582}
583
584impl AsRawFd for UnixListener {
585 fn as_raw_fd(&self) -> RawFd {
586 self.inner.as_raw_fd()
587 }
588}
589
590impl FromRawFd for UnixListener {
591 unsafe fn from_raw_fd(fd: RawFd) -> Self {
592 StdUnixListner::from_raw_fd(fd).into()
593 }
594}
595
596impl IntoRawFd for UnixListener {
597 fn into_raw_fd(self) -> RawFd {
598 self.inner.into_raw_fd()
599 }
600}
601
602impl<'a> IntoIterator for &'a UnixListener {
603 type Item = io::Result<UnixStream>;
604 type IntoIter = Incoming<'a>;
605
606 fn into_iter(self) -> Self::IntoIter {
607 self.incoming()
608 }
609}
610
611impl From<StdUnixListner> for UnixListener {
612 fn from(inner: StdUnixListner) -> Self {
613 UnixListener { inner }
614 }
615}
616
617// === impl Incoming ===
618impl Iterator for Incoming<'_> {
619 type Item = io::Result<UnixStream>;
620
621 fn next(&mut self) -> Option<Self::Item> {
622 Some(self.listener.accept().map(|(s, _)| s))
623 }
624
625 fn size_hint(&self) -> (usize, Option<usize>) {
626 (usize::MAX, None)
627 }
628}
629
630#[cfg(test)]
631mod test {
632 use super::*;
633
634 use std::convert::AsMut;
635 use std::ffi::c_void;
636 use std::ptr;
637 use std::slice;
638
639 use nix::fcntl::OFlag;
640 use nix::sys::mman::{mmap, munmap, shm_open, shm_unlink, MapFlags, ProtFlags};
641 use nix::sys::stat::Mode;
642 use nix::unistd::{close, ftruncate};
643
644 struct Shm {
645 fd: RawFd,
646 ptr: *mut u8,
647 len: usize,
648 name: String,
649 }
650
651 impl Shm {
652 fn new(name: &str, size: i64) -> Shm {
653 let oflag = OFlag::O_CREAT | OFlag::O_RDWR;
654 let fd =
655 shm_open(name, oflag, Mode::S_IRUSR | Mode::S_IWUSR).expect("Can't create shm.");
656 ftruncate(fd, size).expect("Can't ftruncate");
657 let len: usize = size as usize;
658
659 let prot = ProtFlags::PROT_READ | ProtFlags::PROT_WRITE;
660 let flags = MapFlags::MAP_SHARED;
661
662 let ptr = unsafe {
663 mmap(ptr::null_mut(), len, prot, flags, fd, 0).expect("Can't mmap") as *mut u8
664 };
665
666 Shm {
667 fd,
668 ptr,
669 len,
670 name: name.to_string(),
671 }
672 }
673
674 fn from_raw_fd(fd: RawFd, size: usize) -> Shm {
675 let prot = ProtFlags::PROT_READ | ProtFlags::PROT_WRITE;
676 let flags = MapFlags::MAP_SHARED;
677
678 let ptr = unsafe {
679 mmap(ptr::null_mut(), size, prot, flags, fd, 0).expect("Can't mmap") as *mut u8
680 };
681
682 Shm {
683 fd,
684 ptr,
685 len: size,
686 name: String::new(),
687 }
688 }
689 }
690
691 impl Drop for Shm {
692 fn drop(&mut self) {
693 unsafe {
694 munmap(self.ptr as *mut c_void, self.len).expect("Can't munmap");
695 }
696 close(self.fd).expect("Can't close");
697 if !self.name.is_empty() {
698 let name: &str = self.name.as_ref();
699 shm_unlink(name).expect("Can't shm_unlink");
700 }
701 }
702 }
703
704 impl AsMut<[u8]> for Shm {
705 fn as_mut(&mut self) -> &mut [u8] {
706 unsafe { slice::from_raw_parts_mut(self.ptr, self.len) }
707 }
708 }
709
710 impl AsRawFd for Shm {
711 fn as_raw_fd(&self) -> RawFd {
712 self.fd
713 }
714 }
715
716 fn make_hello(name: &str) -> Shm {
717 let hello = b"Hello World!\0";
718 let mut shm = Shm::new(name, hello.len() as i64);
719
720 shm.as_mut().copy_from_slice(hello.as_ref());
721
722 shm
723 }
724
725 fn compare_hello(fd: RawFd) -> bool {
726 let hello = b"Hello World!\0";
727 let mut shm = Shm::from_raw_fd(fd, hello.len());
728
729 &shm.as_mut()[..hello.len()] == hello.as_ref()
730 }
731
732 #[test]
733 fn unix_stream_passes_fd() {
734 let shm = make_hello("/unix_stream_passes_fd");
735 let mut buf = vec![0; 20];
736
737 let (mut sut1, mut sut2) = UnixStream::pair().expect("Can't make pair");
738 sut1.enqueue(&shm).expect("Can't enqueue");
739 sut1.write(b"abc").expect("Can't write");
740 sut1.flush().expect("Can't flush");
741 sut2.read(&mut buf).expect("Can't read");
742 let fd = sut2.dequeue().expect("Empty fd queue");
743
744 assert!(fd != shm.fd, "fd's unexpectedly equal");
745 assert!(compare_hello(fd), "fd didn't contain expect contents");
746 }
747}