1#![warn(missing_docs)]
3#![doc(html_root_url="https://doc.rust-lang.org/unix-socket/doc/v0.5.0")]
4
5extern crate libc;
6
7use std::ascii;
8use std::cmp::{self, Ordering};
9use std::convert::AsRef;
10use std::ffi::OsStr;
11use std::fmt;
12use std::io;
13use std::iter::IntoIterator;
14use std::mem;
15use std::net::Shutdown;
16use std::os::unix::ffi::OsStrExt;
17use std::os::unix::io::{RawFd, AsRawFd, FromRawFd, IntoRawFd};
18use std::path::Path;
19use std::time::{Duration, Instant};
20
21use libc::c_int;
22
23fn sun_path_offset() -> usize {
24 unsafe {
25 let addr: libc::sockaddr_un = mem::uninitialized();
27 let base = &addr as *const _ as usize;
28 let path = &addr.sun_path as *const _ as usize;
29 path - base
30 }
31}
32
33fn cvt(v: libc::c_int) -> io::Result<libc::c_int> {
34 if v < 0 {
35 Err(io::Error::last_os_error())
36 } else {
37 Ok(v)
38 }
39}
40
41fn cvt_s(v: libc::ssize_t) -> io::Result<libc::ssize_t> {
42 if v < 0 {
43 Err(io::Error::last_os_error())
44 } else {
45 Ok(v)
46 }
47}
48
49struct Inner(RawFd);
50
51impl Drop for Inner {
52 fn drop(&mut self) {
53 unsafe {
54 libc::close(self.0);
55 }
56 }
57}
58
59impl Inner {
60 fn new(kind: libc::c_int) -> io::Result<Inner> {
61 unsafe { cvt(libc::socket(libc::AF_UNIX, kind, 0)).map(Inner) }
62 }
63
64 fn new_pair(kind: libc::c_int) -> io::Result<(Inner, Inner)> {
65 unsafe {
66 let mut fds = [0, 0];
67 try!(cvt(libc::socketpair(libc::AF_UNIX, kind, 0, fds.as_mut_ptr())));
68 Ok((Inner(fds[0]), Inner(fds[1])))
69 }
70 }
71
72 fn try_clone(&self) -> io::Result<Inner> {
73 unsafe { cvt(libc::dup(self.0)).map(Inner) }
74 }
75
76 fn shutdown(&self, how: Shutdown) -> io::Result<()> {
77 let how = match how {
78 Shutdown::Read => libc::SHUT_RD,
79 Shutdown::Write => libc::SHUT_WR,
80 Shutdown::Both => libc::SHUT_RDWR,
81 };
82
83 unsafe { cvt(libc::shutdown(self.0, how)).map(|_| ()) }
84 }
85
86 fn timeout(&self, kind: libc::c_int) -> io::Result<Option<Duration>> {
87 let timeout = unsafe {
88 let mut timeout: libc::timeval = mem::zeroed();
89 let mut size = mem::size_of::<libc::timeval>() as libc::socklen_t;
90 try!(cvt(libc::getsockopt(self.0,
91 libc::SOL_SOCKET,
92 kind,
93 &mut timeout as *mut _ as *mut _,
94 &mut size as *mut _ as *mut _)));
95 timeout
96 };
97
98 if timeout.tv_sec == 0 && timeout.tv_usec == 0 {
99 Ok(None)
100 } else {
101 Ok(Some(Duration::new(timeout.tv_sec as u64, (timeout.tv_usec as u32) * 1000)))
102 }
103 }
104
105 fn set_timeout(&self, dur: Option<Duration>, kind: libc::c_int) -> io::Result<()> {
106 let timeout = match dur {
107 Some(dur) => {
108 if dur.as_secs() == 0 && dur.subsec_nanos() == 0 {
109 return Err(io::Error::new(io::ErrorKind::InvalidInput,
110 "cannot set a 0 duration timeout"));
111 }
112
113 let (secs, usecs) = if dur.as_secs() > libc::time_t::max_value() as u64 {
114 (libc::time_t::max_value(), 999_999)
115 } else {
116 (dur.as_secs() as libc::time_t,
117 (dur.subsec_nanos() / 1000) as libc::suseconds_t)
118 };
119 let mut timeout = libc::timeval {
120 tv_sec: secs,
121 tv_usec: usecs,
122 };
123 if timeout.tv_sec == 0 && timeout.tv_usec == 0 {
124 timeout.tv_usec = 1;
125 }
126 timeout
127 }
128 None => {
129 libc::timeval {
130 tv_sec: 0,
131 tv_usec: 0,
132 }
133 }
134 };
135
136 unsafe {
137 cvt(libc::setsockopt(self.0,
138 libc::SOL_SOCKET,
139 kind,
140 &timeout as *const _ as *const _,
141 mem::size_of::<libc::timeval>() as libc::socklen_t))
142 .map(|_| ())
143 }
144 }
145
146 fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
147 let mut nonblocking = nonblocking as libc::c_ulong;
148 unsafe { cvt(libc::ioctl(self.0, libc::FIONBIO, &mut nonblocking)).map(|_| ()) }
149 }
150
151 fn take_error(&self) -> io::Result<Option<io::Error>> {
152 let mut errno: libc::c_int = 0;
153
154 unsafe {
155 try!(cvt(libc::getsockopt(self.0,
156 libc::SOL_SOCKET,
157 libc::SO_ERROR,
158 &mut errno as *mut _ as *mut _,
159 &mut mem::size_of_val(&errno) as *mut _ as *mut _)));
160 }
161
162 if errno == 0 {
163 Ok(None)
164 } else {
165 Ok(Some(io::Error::from_raw_os_error(errno)))
166 }
167 }
168}
169
170unsafe fn sockaddr_un<P: AsRef<Path>>(path: P) -> io::Result<(libc::sockaddr_un, libc::socklen_t)> {
171 let mut addr: libc::sockaddr_un = mem::zeroed();
172 addr.sun_family = libc::AF_UNIX as libc::sa_family_t;
173
174 let bytes = path.as_ref().as_os_str().as_bytes();
175
176 match (bytes.get(0), bytes.len().cmp(&addr.sun_path.len())) {
177 (Some(&0), Ordering::Greater) => {
179 return Err(io::Error::new(io::ErrorKind::InvalidInput,
180 "path must be no longer than SUN_LEN"));
181 }
182 (Some(&0), _) => {},
183 (_, Ordering::Greater) | (_, Ordering::Equal) => {
184 return Err(io::Error::new(io::ErrorKind::InvalidInput,
185 "path must be shorter than SUN_LEN"));
186 }
187 _ => {}
188 }
189 for (dst, src) in addr.sun_path.iter_mut().zip(bytes.iter()) {
190 *dst = *src as libc::c_char;
191 }
192 let mut len = sun_path_offset() + bytes.len();
196 match bytes.get(0) {
197 Some(&0) | None => {}
198 Some(_) => len += 1,
199 }
200 Ok((addr, len as libc::socklen_t))
201}
202
203enum AddressKind<'a> {
204 Unnamed,
205 Pathname(&'a Path),
206 Abstract(&'a [u8]),
207}
208
209#[derive(Clone)]
211pub struct SocketAddr {
212 addr: libc::sockaddr_un,
213 len: libc::socklen_t,
214}
215
216impl SocketAddr {
217 fn new<F>(f: F) -> io::Result<SocketAddr>
218 where F: FnOnce(*mut libc::sockaddr, *mut libc::socklen_t) -> libc::c_int
219 {
220 unsafe {
221 let mut addr: libc::sockaddr_un = mem::zeroed();
222 let mut len = mem::size_of::<libc::sockaddr_un>() as libc::socklen_t;
223 try!(cvt(f(&mut addr as *mut _ as *mut _, &mut len)));
224
225 if len == 0 {
226 len = sun_path_offset() as libc::socklen_t; } else if addr.sun_family != libc::AF_UNIX as libc::sa_family_t {
230 return Err(io::Error::new(io::ErrorKind::InvalidInput,
231 "file descriptor did not correspond to a Unix socket"));
232 }
233
234 Ok(SocketAddr {
235 addr: addr,
236 len: len,
237 })
238 }
239 }
240
241 pub fn is_unnamed(&self) -> bool {
243 if let AddressKind::Unnamed = self.address() {
244 true
245 } else {
246 false
247 }
248 }
249
250 pub fn as_pathname(&self) -> Option<&Path> {
252 if let AddressKind::Pathname(path) = self.address() {
253 Some(path)
254 } else {
255 None
256 }
257 }
258
259 fn address<'a>(&'a self) -> AddressKind<'a> {
260 let len = self.len as usize - sun_path_offset();
261 let path = unsafe { mem::transmute::<&[libc::c_char], &[u8]>(&self.addr.sun_path) };
262
263 if len == 0 || (cfg!(not(target_os = "linux")) && self.addr.sun_path[0] == 0) {
265 AddressKind::Unnamed
266 } else if self.addr.sun_path[0] == 0 {
267 AddressKind::Abstract(&path[1..len])
268 } else {
269 AddressKind::Pathname(OsStr::from_bytes(&path[..len - 1]).as_ref())
270 }
271 }
272}
273
274impl fmt::Debug for SocketAddr {
275 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
276 match self.address() {
277 AddressKind::Unnamed => write!(fmt, "(unnamed)"),
278 AddressKind::Abstract(name) => write!(fmt, "{} (abstract)", AsciiEscaped(name)),
279 AddressKind::Pathname(path) => write!(fmt, "{:?} (pathname)", path),
280 }
281 }
282}
283
284struct AsciiEscaped<'a>(&'a [u8]);
285
286impl<'a> fmt::Display for AsciiEscaped<'a> {
287 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
288 try!(write!(fmt, "\""));
289 for byte in self.0.iter().cloned().flat_map(ascii::escape_default) {
290 try!(write!(fmt, "{}", byte as char));
291 }
292 write!(fmt, "\"")
293 }
294}
295
296pub mod os {
298 #[cfg(target_os = "linux")]
300 pub mod linux {
301 use {AddressKind, SocketAddr};
302
303 pub trait SocketAddrExt {
305 fn as_abstract(&self) -> Option<&[u8]>;
308 }
309
310 impl SocketAddrExt for SocketAddr {
311 fn as_abstract(&self) -> Option<&[u8]> {
312 if let AddressKind::Abstract(path) = self.address() {
313 Some(path)
314 } else {
315 None
316 }
317 }
318 }
319 }
320}
321
322pub struct UnixStream {
337 inner: Inner,
338}
339
340impl fmt::Debug for UnixStream {
341 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
342 let mut builder = fmt.debug_struct("UnixStream");
343 builder.field("fd", &self.inner.0);
344 if let Ok(addr) = self.local_addr() {
345 builder.field("local", &addr);
346 }
347 if let Ok(addr) = self.peer_addr() {
348 builder.field("peer", &addr);
349 }
350 builder.finish()
351 }
352}
353
354impl UnixStream {
355 pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
371 unsafe {
372 let inner = try!(Inner::new(libc::SOCK_STREAM));
373 let (addr, len) = try!(sockaddr_un(path));
374
375 let ret = libc::connect(inner.0, &addr as *const _ as *const _, len);
376 if ret < 0 {
377 Err(io::Error::last_os_error())
378 } else {
379 Ok(UnixStream { inner: inner })
380 }
381 }
382 }
383
384 pub fn connect_timeout<P: AsRef<Path>>(path: P, timeout: Duration) -> io::Result<UnixStream> {
386 let inner = try!(Inner::new(libc::SOCK_STREAM));
387
388 inner.set_nonblocking(true)?;
389 let r = unsafe {
390 let (addr, len) = try!(sockaddr_un(path));
391 let ret = libc::connect(inner.0, &addr as *const _ as *const _, len);
392 if ret < 0 {
393 Err(io::Error::last_os_error())
394 } else {
395 Ok(())
396 }
397 };
398 inner.set_nonblocking(false)?;
399
400 match r {
401 Ok(_) => return Ok(UnixStream { inner: inner }),
402 Err(ref e) if e.raw_os_error() == Some(libc::EINPROGRESS) => {}
404 Err(e) => return Err(e),
405 }
406
407 let mut pollfd = libc::pollfd {
408 fd: inner.0,
409 events: libc::POLLOUT,
410 revents: 0,
411 };
412
413 if timeout.as_secs() == 0 && timeout.subsec_nanos() == 0 {
414 return Err(io::Error::new(io::ErrorKind::InvalidInput,
415 "cannot set a 0 duration timeout"));
416 }
417
418 let start = Instant::now();
419
420 loop {
421 let elapsed = start.elapsed();
422 if elapsed >= timeout {
423 return Err(io::Error::new(io::ErrorKind::TimedOut, "connection timed out"));
424 }
425
426 let timeout = timeout - elapsed;
427 let mut timeout = timeout.as_secs()
428 .saturating_mul(1_000)
429 .saturating_add(timeout.subsec_nanos() as u64 / 1_000_000);
430 if timeout == 0 {
431 timeout = 1;
432 }
433
434 let timeout = cmp::min(timeout, c_int::max_value() as u64) as c_int;
435
436 match unsafe { libc::poll(&mut pollfd, 1, timeout) } {
437 -1 => {
438 let err = io::Error::last_os_error();
439 if err.kind() != io::ErrorKind::Interrupted {
440 return Err(err);
441 }
442 }
443 0 => {}
444 _ => {
445 if pollfd.revents & libc::POLLHUP != 0 {
448 let e = inner.take_error()?
449 .unwrap_or_else(|| {
450 io::Error::new(io::ErrorKind::Other, "no error set after POLLHUP")
451 });
452 return Err(e);
453 }
454
455 return Ok(UnixStream { inner: inner });
456 }
457 }
458 }
459 }
460
461
462 pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
474 let (i1, i2) = try!(Inner::new_pair(libc::SOCK_STREAM));
475 Ok((UnixStream { inner: i1 }, UnixStream { inner: i2 }))
476 }
477
478 pub fn try_clone(&self) -> io::Result<UnixStream> {
494 Ok(UnixStream { inner: try!(self.inner.try_clone()) })
495 }
496
497 pub fn local_addr(&self) -> io::Result<SocketAddr> {
511 SocketAddr::new(|addr, len| unsafe { libc::getsockname(self.inner.0, addr, len) })
512 }
513
514 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
528 SocketAddr::new(|addr, len| unsafe { libc::getpeername(self.inner.0, addr, len) })
529 }
530
531 pub fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
547 self.inner.set_timeout(timeout, libc::SO_RCVTIMEO)
548 }
549
550 pub fn set_write_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
566 self.inner.set_timeout(timeout, libc::SO_SNDTIMEO)
567 }
568
569 pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
583 self.inner.timeout(libc::SO_RCVTIMEO)
584 }
585
586 pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
600 self.inner.timeout(libc::SO_SNDTIMEO)
601 }
602
603 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
614 self.inner.set_nonblocking(nonblocking)
615 }
616
617 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
631 self.inner.take_error()
632 }
633
634 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
650 self.inner.shutdown(how)
651 }
652}
653
654impl io::Read for UnixStream {
655 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
656 io::Read::read(&mut &*self, buf)
657 }
658}
659
660impl<'a> io::Read for &'a UnixStream {
661 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
662 unsafe {
663 cvt_s(libc::recv(self.inner.0, buf.as_mut_ptr() as *mut _, buf.len(), 0))
664 .map(|r| r as usize)
665 }
666 }
667}
668
669impl io::Write for UnixStream {
670 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
671 io::Write::write(&mut &*self, buf)
672 }
673
674 fn flush(&mut self) -> io::Result<()> {
675 io::Write::flush(&mut &*self)
676 }
677}
678
679impl<'a> io::Write for &'a UnixStream {
680 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
681 unsafe {
682 cvt_s(libc::send(self.inner.0, buf.as_ptr() as *const _, buf.len(), 0))
683 .map(|r| r as usize)
684 }
685 }
686
687 fn flush(&mut self) -> io::Result<()> {
688 Ok(())
689 }
690}
691
692impl AsRawFd for UnixStream {
693 fn as_raw_fd(&self) -> RawFd {
694 self.inner.0
695 }
696}
697
698impl FromRawFd for UnixStream {
699 unsafe fn from_raw_fd(fd: RawFd) -> UnixStream {
700 UnixStream { inner: Inner(fd) }
701 }
702}
703
704impl IntoRawFd for UnixStream {
705 fn into_raw_fd(self) -> RawFd {
706 let fd = self.inner.0;
707 mem::forget(self);
708 fd
709 }
710}
711
712pub struct UnixListener {
744 inner: Inner,
745}
746
747impl fmt::Debug for UnixListener {
748 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
749 let mut builder = fmt.debug_struct("UnixListener");
750 builder.field("fd", &self.inner.0);
751 if let Ok(addr) = self.local_addr() {
752 builder.field("local", &addr);
753 }
754 builder.finish()
755 }
756}
757
758impl UnixListener {
759 pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
775 unsafe {
776 let inner = try!(Inner::new(libc::SOCK_STREAM));
777 let (addr, len) = try!(sockaddr_un(path));
778
779 try!(cvt(libc::bind(inner.0, &addr as *const _ as *const _, len)));
780 try!(cvt(libc::listen(inner.0, 128)));
781
782 Ok(UnixListener { inner: inner })
783 }
784 }
785
786 pub fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
801 unsafe {
802 let mut fd = 0;
803 let addr = try!(SocketAddr::new(|addr, len| {
804 fd = libc::accept(self.inner.0, addr, len);
805 fd
806 }));
807
808 Ok((UnixStream { inner: Inner(fd) }, addr))
809 }
810 }
811
812 pub fn try_clone(&self) -> io::Result<UnixListener> {
827 Ok(UnixListener { inner: try!(self.inner.try_clone()) })
828 }
829
830 pub fn local_addr(&self) -> io::Result<SocketAddr> {
844 SocketAddr::new(|addr, len| unsafe { libc::getsockname(self.inner.0, addr, len) })
845 }
846
847 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
858 self.inner.set_nonblocking(nonblocking)
859 }
860
861 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
875 self.inner.take_error()
876 }
877
878 pub fn incoming<'a>(&'a self) -> Incoming<'a> {
911 Incoming { listener: self }
912 }
913}
914
915impl AsRawFd for UnixListener {
916 fn as_raw_fd(&self) -> RawFd {
917 self.inner.0
918 }
919}
920
921impl FromRawFd for UnixListener {
922 unsafe fn from_raw_fd(fd: RawFd) -> UnixListener {
923 UnixListener { inner: Inner(fd) }
924 }
925}
926
927impl IntoRawFd for UnixListener {
928 fn into_raw_fd(self) -> RawFd {
929 let fd = self.inner.0;
930 mem::forget(self);
931 fd
932 }
933}
934
935impl<'a> IntoIterator for &'a UnixListener {
936 type Item = io::Result<UnixStream>;
937 type IntoIter = Incoming<'a>;
938
939 fn into_iter(self) -> Incoming<'a> {
940 self.incoming()
941 }
942}
943
944#[derive(Debug)]
948pub struct Incoming<'a> {
949 listener: &'a UnixListener,
950}
951
952impl<'a> Iterator for Incoming<'a> {
953 type Item = io::Result<UnixStream>;
954
955 fn next(&mut self) -> Option<io::Result<UnixStream>> {
956 Some(self.listener.accept().map(|s| s.0))
957 }
958
959 fn size_hint(&self) -> (usize, Option<usize>) {
960 (usize::max_value(), None)
961 }
962}
963
964pub struct UnixDatagram {
978 inner: Inner,
979}
980
981impl fmt::Debug for UnixDatagram {
982 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
983 let mut builder = fmt.debug_struct("UnixDatagram");
984 builder.field("fd", &self.inner.0);
985 if let Ok(addr) = self.local_addr() {
986 builder.field("local", &addr);
987 }
988 if let Ok(addr) = self.peer_addr() {
989 builder.field("peer", &addr);
990 }
991 builder.finish()
992 }
993}
994
995impl UnixDatagram {
996 pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixDatagram> {
1012 unsafe {
1013 let inner = try!(Inner::new(libc::SOCK_DGRAM));
1014 let (addr, len) = try!(sockaddr_un(path));
1015
1016 try!(cvt(libc::bind(inner.0, &addr as *const _ as *const _, len)));
1017
1018 Ok(UnixDatagram { inner: inner })
1019 }
1020 }
1021
1022 pub fn unbound() -> io::Result<UnixDatagram> {
1032 let inner = try!(Inner::new(libc::SOCK_DGRAM));
1033 Ok(UnixDatagram { inner: inner })
1034 }
1035
1036 pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
1048 let (i1, i2) = try!(Inner::new_pair(libc::SOCK_DGRAM));
1049 Ok((UnixDatagram { inner: i1 }, UnixDatagram { inner: i2 }))
1050 }
1051
1052 pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
1066 unsafe {
1067 let (addr, len) = try!(sockaddr_un(path));
1068
1069 try!(cvt(libc::connect(self.inner.0, &addr as *const _ as *const _, len)));
1070
1071 Ok(())
1072 }
1073 }
1074
1075 pub fn try_clone(&self) -> io::Result<UnixDatagram> {
1090 Ok(UnixDatagram { inner: try!(self.inner.try_clone()) })
1091 }
1092
1093 pub fn local_addr(&self) -> io::Result<SocketAddr> {
1107 SocketAddr::new(|addr, len| unsafe { libc::getsockname(self.inner.0, addr, len) })
1108 }
1109
1110 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
1126 SocketAddr::new(|addr, len| unsafe { libc::getpeername(self.inner.0, addr, len) })
1127 }
1128
1129 pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1145 let mut count = 0;
1146 let addr = try!(SocketAddr::new(|addr, len| {
1147 unsafe {
1148 count = libc::recvfrom(self.inner.0,
1149 buf.as_mut_ptr() as *mut _,
1150 buf.len(),
1151 0,
1152 addr,
1153 len);
1154 if count > 0 {
1155 1
1156 } else if count == 0 {
1157 0
1158 } else {
1159 -1
1160 }
1161 }
1162 }));
1163
1164 Ok((count as usize, addr))
1165 }
1166
1167 pub fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
1183 unsafe {
1184 let count = try!(cvt_s(libc::recv(self.inner.0,
1185 buf.as_mut_ptr() as *mut _,
1186 buf.len(),
1187 0)));
1188 Ok(count as usize)
1189 }
1190 }
1191
1192 pub fn send_to<P: AsRef<Path>>(&self, buf: &[u8], path: P) -> io::Result<usize> {
1216 unsafe {
1217 let (addr, len) = try!(sockaddr_un(path));
1218
1219 let count = try!(cvt_s(libc::sendto(self.inner.0,
1220 buf.as_ptr() as *const _,
1221 buf.len(),
1222 0,
1223 &addr as *const _ as *const _,
1224 len)));
1225 Ok(count as usize)
1226 }
1227 }
1228
1229 pub fn send(&self, buf: &[u8]) -> io::Result<usize> {
1257 unsafe {
1258 let count = try!(cvt_s(libc::send(self.inner.0,
1259 buf.as_ptr() as *const _,
1260 buf.len(),
1261 0)));
1262 Ok(count as usize)
1263 }
1264 }
1265
1266 pub fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
1282 self.inner.set_timeout(timeout, libc::SO_RCVTIMEO)
1283 }
1284
1285 pub fn set_write_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
1301 self.inner.set_timeout(timeout, libc::SO_SNDTIMEO)
1302 }
1303
1304 pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
1318 self.inner.timeout(libc::SO_RCVTIMEO)
1319 }
1320
1321 pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
1335 self.inner.timeout(libc::SO_SNDTIMEO)
1336 }
1337
1338 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
1349 self.inner.set_nonblocking(nonblocking)
1350 }
1351
1352 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
1366 self.inner.take_error()
1367 }
1368
1369 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
1386 self.inner.shutdown(how)
1387 }
1388}
1389
1390impl AsRawFd for UnixDatagram {
1391 fn as_raw_fd(&self) -> RawFd {
1392 self.inner.0
1393 }
1394}
1395
1396impl FromRawFd for UnixDatagram {
1397 unsafe fn from_raw_fd(fd: RawFd) -> UnixDatagram {
1398 UnixDatagram { inner: Inner(fd) }
1399 }
1400}
1401
1402impl IntoRawFd for UnixDatagram {
1403 fn into_raw_fd(self) -> RawFd {
1404 let fd = self.inner.0;
1405 mem::forget(self);
1406 fd
1407 }
1408}
1409
1410#[cfg(test)]
1411mod test {
1412 extern crate tempdir;
1413 extern crate libc;
1414
1415 use std::thread;
1416 use std::io;
1417 use std::io::prelude::*;
1418 use std::time::Duration;
1419 use self::tempdir::TempDir;
1420 use std::net::Shutdown;
1421
1422 use super::*;
1423
1424 macro_rules! or_panic {
1425 ($e:expr) => {
1426 match $e {
1427 Ok(e) => e,
1428 Err(e) => panic!("{}", e),
1429 }
1430 }
1431 }
1432
1433 #[test]
1434 fn basic() {
1435 let dir = or_panic!(TempDir::new("unix_socket"));
1436 let socket_path = dir.path().join("sock");
1437 let msg1 = b"hello";
1438 let msg2 = b"world!";
1439
1440 let listener = or_panic!(UnixListener::bind(&socket_path));
1441 let thread = thread::spawn(move || {
1442 let mut stream = or_panic!(listener.accept()).0;
1443 let mut buf = [0; 5];
1444 or_panic!(stream.read(&mut buf));
1445 assert_eq!(&msg1[..], &buf[..]);
1446 or_panic!(stream.write_all(msg2));
1447 });
1448
1449 let mut stream = or_panic!(UnixStream::connect(&socket_path));
1450 assert_eq!(Some(&*socket_path),
1451 stream.peer_addr().unwrap().as_pathname());
1452 or_panic!(stream.write_all(msg1));
1453 let mut buf = vec![];
1454 or_panic!(stream.read_to_end(&mut buf));
1455 assert_eq!(&msg2[..], &buf[..]);
1456 drop(stream);
1457
1458 thread.join().unwrap();
1459 }
1460
1461 #[test]
1462 fn pair() {
1463 let msg1 = b"hello";
1464 let msg2 = b"world!";
1465
1466 let (mut s1, mut s2) = or_panic!(UnixStream::pair());
1467 let thread = thread::spawn(move || {
1468 let mut buf = [0; 5];
1470 or_panic!(s1.read(&mut buf));
1471 assert_eq!(&msg1[..], &buf[..]);
1472 or_panic!(s1.write_all(msg2));
1473 });
1474
1475 or_panic!(s2.write_all(msg1));
1476 let mut buf = vec![];
1477 or_panic!(s2.read_to_end(&mut buf));
1478 assert_eq!(&msg2[..], &buf[..]);
1479 drop(s2);
1480
1481 thread.join().unwrap();
1482 }
1483
1484 #[test]
1485 #[cfg(target_os = "linux")]
1486 fn abstract_address() {
1487 use os::linux::SocketAddrExt;
1488
1489 let socket_path = "\0the path";
1490 let msg1 = b"hello";
1491 let msg2 = b"world!";
1492
1493 let listener = or_panic!(UnixListener::bind(&socket_path));
1494 let thread = thread::spawn(move || {
1495 let mut stream = or_panic!(listener.accept()).0;
1496 let mut buf = [0; 5];
1497 or_panic!(stream.read(&mut buf));
1498 assert_eq!(&msg1[..], &buf[..]);
1499 or_panic!(stream.write_all(msg2));
1500 });
1501
1502 let mut stream = or_panic!(UnixStream::connect(&socket_path));
1503 assert_eq!(Some(&b"the path"[..]),
1504 stream.peer_addr().unwrap().as_abstract());
1505 or_panic!(stream.write_all(msg1));
1506 let mut buf = vec![];
1507 or_panic!(stream.read_to_end(&mut buf));
1508 assert_eq!(&msg2[..], &buf[..]);
1509 drop(stream);
1510
1511 thread.join().unwrap();
1512 }
1513
1514 #[test]
1515 #[cfg(target_os = "linux")]
1516 fn abstract_address_max_len() {
1517 use os::linux::SocketAddrExt;
1518 use std::ffi::OsStr;
1519 use std::io::Write;
1520 use std::mem;
1521 use std::os::unix::ffi::OsStrExt;
1522
1523 let len = unsafe {
1524 let addr: libc::sockaddr_un = mem::zeroed();
1525 addr.sun_path.len()
1526 };
1527
1528 let mut socket_path = vec![0; len];
1529 (&mut socket_path[1..9]).write_all(b"the path").unwrap();
1530 let socket_path: &OsStr = OsStr::from_bytes(&socket_path).into();
1531
1532 let msg1 = b"hello";
1533 let msg2 = b"world!";
1534
1535 let listener = or_panic!(UnixListener::bind(&socket_path));
1536 let thread = thread::spawn(move || {
1537 let mut stream = or_panic!(listener.accept()).0;
1538 let mut buf = [0; 5];
1539 or_panic!(stream.read(&mut buf));
1540 assert_eq!(&msg1[..], &buf[..]);
1541 or_panic!(stream.write_all(msg2));
1542 });
1543
1544 let mut stream = or_panic!(UnixStream::connect(&socket_path));
1545 assert_eq!(Some(&socket_path.as_bytes()[1..]),
1546 stream.peer_addr().unwrap().as_abstract());
1547 or_panic!(stream.write_all(msg1));
1548 let mut buf = vec![];
1549 or_panic!(stream.read_to_end(&mut buf));
1550 assert_eq!(&msg2[..], &buf[..]);
1551 drop(stream);
1552
1553 thread.join().unwrap();
1554 }
1555
1556 #[test]
1557 fn try_clone() {
1558 let dir = or_panic!(TempDir::new("unix_socket"));
1559 let socket_path = dir.path().join("sock");
1560 let msg1 = b"hello";
1561 let msg2 = b"world";
1562
1563 let listener = or_panic!(UnixListener::bind(&socket_path));
1564 let thread = thread::spawn(move || {
1565 let mut stream = or_panic!(listener.accept()).0;
1566 or_panic!(stream.write_all(msg1));
1567 or_panic!(stream.write_all(msg2));
1568 });
1569
1570 let mut stream = or_panic!(UnixStream::connect(&socket_path));
1571 let mut stream2 = or_panic!(stream.try_clone());
1572
1573 let mut buf = [0; 5];
1574 or_panic!(stream.read(&mut buf));
1575 assert_eq!(&msg1[..], &buf[..]);
1576 or_panic!(stream2.read(&mut buf));
1577 assert_eq!(&msg2[..], &buf[..]);
1578
1579 thread.join().unwrap();
1580 }
1581
1582 #[test]
1583 fn iter() {
1584 let dir = or_panic!(TempDir::new("unix_socket"));
1585 let socket_path = dir.path().join("sock");
1586
1587 let listener = or_panic!(UnixListener::bind(&socket_path));
1588 let thread = thread::spawn(move || {
1589 for stream in listener.incoming().take(2) {
1590 let mut stream = or_panic!(stream);
1591 let mut buf = [0];
1592 or_panic!(stream.read(&mut buf));
1593 }
1594 });
1595
1596 for _ in 0..2 {
1597 let mut stream = or_panic!(UnixStream::connect(&socket_path));
1598 or_panic!(stream.write_all(&[0]));
1599 }
1600
1601 thread.join().unwrap();
1602 }
1603
1604 #[test]
1605 fn long_path() {
1606 let dir = or_panic!(TempDir::new("unix_socket"));
1607 let socket_path = dir.path()
1608 .join("asdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfa\
1609 sasdfasdfasdasdfasdfasdfadfasdfasdfasdfasdfasdf");
1610 match UnixStream::connect(&socket_path) {
1611 Err(ref e) if e.kind() == io::ErrorKind::InvalidInput => {}
1612 Err(e) => panic!("unexpected error {}", e),
1613 Ok(_) => panic!("unexpected success"),
1614 }
1615
1616 match UnixListener::bind(&socket_path) {
1617 Err(ref e) if e.kind() == io::ErrorKind::InvalidInput => {}
1618 Err(e) => panic!("unexpected error {}", e),
1619 Ok(_) => panic!("unexpected success"),
1620 }
1621
1622 match UnixDatagram::bind(&socket_path) {
1623 Err(ref e) if e.kind() == io::ErrorKind::InvalidInput => {}
1624 Err(e) => panic!("unexpected error {}", e),
1625 Ok(_) => panic!("unexpected success"),
1626 }
1627 }
1628
1629 #[test]
1630 fn timeouts() {
1631 let dir = or_panic!(TempDir::new("unix_socket"));
1632 let socket_path = dir.path().join("sock");
1633
1634 let _listener = or_panic!(UnixListener::bind(&socket_path));
1635
1636 let stream = or_panic!(UnixStream::connect(&socket_path));
1637 let dur = Duration::new(15410, 0);
1638
1639 assert_eq!(None, or_panic!(stream.read_timeout()));
1640
1641 or_panic!(stream.set_read_timeout(Some(dur)));
1642 assert_eq!(Some(dur), or_panic!(stream.read_timeout()));
1643
1644 assert_eq!(None, or_panic!(stream.write_timeout()));
1645
1646 or_panic!(stream.set_write_timeout(Some(dur)));
1647 assert_eq!(Some(dur), or_panic!(stream.write_timeout()));
1648
1649 or_panic!(stream.set_read_timeout(None));
1650 assert_eq!(None, or_panic!(stream.read_timeout()));
1651
1652 or_panic!(stream.set_write_timeout(None));
1653 assert_eq!(None, or_panic!(stream.write_timeout()));
1654 }
1655
1656 #[test]
1657 fn test_read_timeout() {
1658 let dir = or_panic!(TempDir::new("unix_socket"));
1659 let socket_path = dir.path().join("sock");
1660
1661 let _listener = or_panic!(UnixListener::bind(&socket_path));
1662
1663 let mut stream = or_panic!(UnixStream::connect(&socket_path));
1664 or_panic!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
1665
1666 let mut buf = [0; 10];
1667 let kind = stream.read(&mut buf).err().expect("expected error").kind();
1668 assert!(kind == io::ErrorKind::WouldBlock || kind == io::ErrorKind::TimedOut);
1669 }
1670
1671 #[test]
1672 fn test_read_with_timeout() {
1673 let dir = or_panic!(TempDir::new("unix_socket"));
1674 let socket_path = dir.path().join("sock");
1675
1676 let listener = or_panic!(UnixListener::bind(&socket_path));
1677
1678 let mut stream = or_panic!(UnixStream::connect(&socket_path));
1679 or_panic!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
1680
1681 let mut other_end = or_panic!(listener.accept()).0;
1682 or_panic!(other_end.write_all(b"hello world"));
1683
1684 let mut buf = [0; 11];
1685 or_panic!(stream.read(&mut buf));
1686 assert_eq!(b"hello world", &buf[..]);
1687
1688 let kind = stream.read(&mut buf).err().expect("expected error").kind();
1689 assert!(kind == io::ErrorKind::WouldBlock || kind == io::ErrorKind::TimedOut);
1690 }
1691
1692 #[test]
1693 fn test_unix_datagram() {
1694 let dir = or_panic!(TempDir::new("unix_socket"));
1695 let path1 = dir.path().join("sock1");
1696 let path2 = dir.path().join("sock2");
1697
1698 let sock1 = or_panic!(UnixDatagram::bind(&path1));
1699 let sock2 = or_panic!(UnixDatagram::bind(&path2));
1700
1701 let msg = b"hello world";
1702 or_panic!(sock1.send_to(msg, &path2));
1703 let mut buf = [0; 11];
1704 or_panic!(sock2.recv_from(&mut buf));
1705 assert_eq!(msg, &buf[..]);
1706 }
1707
1708 #[test]
1709 fn test_unnamed_unix_datagram() {
1710 let dir = or_panic!(TempDir::new("unix_socket"));
1711 let path1 = dir.path().join("sock1");
1712
1713 let sock1 = or_panic!(UnixDatagram::bind(&path1));
1714 let sock2 = or_panic!(UnixDatagram::unbound());
1715
1716 let msg = b"hello world";
1717 or_panic!(sock2.send_to(msg, &path1));
1718 let mut buf = [0; 11];
1719 let (usize, addr) = or_panic!(sock1.recv_from(&mut buf));
1720 assert_eq!(usize, 11);
1721 assert!(addr.is_unnamed());
1722 assert_eq!(msg, &buf[..]);
1723 }
1724
1725 #[test]
1726 fn test_connect_unix_datagram() {
1727 let dir = or_panic!(TempDir::new("unix_socket"));
1728 let path1 = dir.path().join("sock1");
1729 let path2 = dir.path().join("sock2");
1730
1731 let bsock1 = or_panic!(UnixDatagram::bind(&path1));
1732 let bsock2 = or_panic!(UnixDatagram::bind(&path2));
1733 let sock = or_panic!(UnixDatagram::unbound());
1734 or_panic!(sock.connect(&path1));
1735
1736 let msg = b"hello there";
1738 or_panic!(sock.send(msg));
1739 let mut buf = [0; 11];
1740 let (usize, addr) = or_panic!(bsock1.recv_from(&mut buf));
1741 assert_eq!(usize, 11);
1742 assert!(addr.is_unnamed());
1743 assert_eq!(msg, &buf[..]);
1744
1745 or_panic!(sock.connect(&path2));
1747 or_panic!(sock.send(msg));
1748 or_panic!(bsock2.recv_from(&mut buf));
1749 }
1750
1751 #[test]
1752 fn test_unix_datagram_recv() {
1753 let dir = or_panic!(TempDir::new("unix_socket"));
1754 let path1 = dir.path().join("sock1");
1755
1756 let sock1 = or_panic!(UnixDatagram::bind(&path1));
1757 let sock2 = or_panic!(UnixDatagram::unbound());
1758 or_panic!(sock2.connect(&path1));
1759
1760 let msg = b"hello world";
1761 or_panic!(sock2.send(msg));
1762 let mut buf = [0; 11];
1763 let size = or_panic!(sock1.recv(&mut buf));
1764 assert_eq!(size, 11);
1765 assert_eq!(msg, &buf[..]);
1766 }
1767
1768 #[test]
1769 fn datagram_pair() {
1770 let msg1 = b"hello";
1771 let msg2 = b"world!";
1772
1773 let (s1, s2) = or_panic!(UnixDatagram::pair());
1774 let thread = thread::spawn(move || {
1775 let mut buf = [0; 5];
1777 or_panic!(s1.recv(&mut buf));
1778 assert_eq!(&msg1[..], &buf[..]);
1779 or_panic!(s1.send(msg2));
1780 });
1781
1782 or_panic!(s2.send(msg1));
1783 let mut buf = [0; 6];
1784 or_panic!(s2.recv(&mut buf));
1785 assert_eq!(&msg2[..], &buf[..]);
1786 drop(s2);
1787
1788 thread.join().unwrap();
1789 }
1790
1791 #[test]
1792 fn datagram_shutdown() {
1793 let s1 = UnixDatagram::unbound().unwrap();
1794 let s2 = s1.try_clone().unwrap();
1795
1796 let thread = thread::spawn(move || {
1797 let mut buf = [0; 1];
1798 assert_eq!(0, s1.recv_from(&mut buf).unwrap().0);
1799 });
1800
1801 thread::sleep(Duration::from_millis(100));
1802 s2.shutdown(Shutdown::Read).unwrap();;
1803
1804 thread.join().unwrap();
1805 }
1806}