uds_windows/stdnet/net.rs
1use std::fmt;
2use std::io;
3use std::mem;
4use std::net::Shutdown;
5use std::os::raw::c_int;
6use std::os::windows::io::{
7 AsRawSocket, AsSocket, BorrowedSocket, FromRawSocket, IntoRawSocket, RawSocket,
8};
9use std::path::Path;
10use std::time::Duration;
11
12use winapi::um::winsock2::{
13 bind, connect, getpeername, getsockname, listen, SO_RCVTIMEO, SO_SNDTIMEO,
14};
15
16use super::socket::{init, Socket};
17use super::{c, cvt, from_sockaddr_un, sockaddr_un, SocketAddr};
18
19/// A Unix stream socket
20///
21/// # Examples
22///
23/// ```no_run
24/// use uds_windows::UnixStream;
25/// use std::io::prelude::*;
26///
27/// let mut stream = UnixStream::connect("/path/to/my/socket").unwrap();
28/// stream.write_all(b"hello world").unwrap();
29/// let mut response = String::new();
30/// stream.read_to_string(&mut response).unwrap();
31/// println!("{}", response);
32/// ```
33pub struct UnixStream(Socket);
34
35impl fmt::Debug for UnixStream {
36 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
37 let mut builder = fmt.debug_struct("UnixStream");
38 builder.field("socket", &self.0.as_raw_socket());
39 if let Ok(addr) = self.local_addr() {
40 builder.field("local", &addr);
41 }
42 if let Ok(addr) = self.peer_addr() {
43 builder.field("peer", &addr);
44 }
45 builder.finish()
46 }
47}
48
49impl UnixStream {
50 /// Connects to the socket named by `path`.
51 ///
52 /// # Examples
53 ///
54 /// ```no_run
55 /// use uds_windows::UnixStream;
56 ///
57 /// let socket = match UnixStream::connect("/tmp/sock") {
58 /// Ok(sock) => sock,
59 /// Err(e) => {
60 /// println!("Couldn't connect: {:?}", e);
61 /// return
62 /// }
63 /// };
64 /// ```
65 pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
66 init();
67 fn inner(path: &Path) -> io::Result<UnixStream> {
68 unsafe {
69 let inner = Socket::new()?;
70 let (addr, len) = sockaddr_un(path)?;
71
72 cvt(connect(
73 inner.as_raw_socket() as _,
74 &addr as *const _ as *const _,
75 len,
76 ))?;
77 Ok(UnixStream(inner))
78 }
79 }
80 inner(path.as_ref())
81 }
82
83 /// Creates a new independently owned handle to the underlying socket.
84 ///
85 /// The returned `UnixStream` is a reference to the same stream that this
86 /// object references. Both handles will read and write the same stream of
87 /// data, and options set on one stream will be propagated to the other
88 /// stream.
89 ///
90 /// # Examples
91 ///
92 /// ```no_run
93 /// use uds_windows::UnixStream;
94 ///
95 /// let socket = UnixStream::connect("/tmp/sock").unwrap();
96 /// let sock_copy = socket.try_clone().expect("Couldn't clone socket");
97 /// ```
98 pub fn try_clone(&self) -> io::Result<UnixStream> {
99 self.0.duplicate().map(UnixStream)
100 }
101
102 /// Returns the socket address of the local half of this connection.
103 ///
104 /// # Examples
105 ///
106 /// ```no_run
107 /// use uds_windows::UnixStream;
108 ///
109 /// let socket = UnixStream::connect("/tmp/sock").unwrap();
110 /// let addr = socket.local_addr().expect("Couldn't get local address");
111 /// ```
112 pub fn local_addr(&self) -> io::Result<SocketAddr> {
113 SocketAddr::new(|addr, len| unsafe { getsockname(self.0.as_raw_socket() as _, addr, len) })
114 }
115
116 /// Returns the socket address of the remote half of this connection.
117 ///
118 /// # Examples
119 ///
120 /// ```no_run
121 /// use uds_windows::UnixStream;
122 ///
123 /// let socket = UnixStream::connect("/tmp/sock").unwrap();
124 /// let addr = socket.peer_addr().expect("Couldn't get peer address");
125 /// ```
126 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
127 SocketAddr::new(|addr, len| unsafe { getpeername(self.0.as_raw_socket() as _, addr, len) })
128 }
129
130 /// Moves the socket into or out of nonblocking mode.
131 ///
132 /// # Examples
133 ///
134 /// ```no_run
135 /// use uds_windows::UnixStream;
136 ///
137 /// let socket = UnixStream::connect("/tmp/sock").unwrap();
138 /// socket.set_nonblocking(true).expect("Couldn't set nonblocking");
139 /// ```
140 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
141 self.0.set_nonblocking(nonblocking)
142 }
143
144 /// Returns the value of the `SO_ERROR` option.
145 ///
146 /// # Examples
147 ///
148 /// ```no_run
149 /// use uds_windows::UnixStream;
150 ///
151 /// let socket = UnixStream::connect("/tmp/sock").unwrap();
152 /// if let Ok(Some(err)) = socket.take_error() {
153 /// println!("Got error: {:?}", err);
154 /// }
155 /// ```
156 ///
157 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
158 self.0.take_error()
159 }
160
161 /// Shuts down the read, write, or both halves of this connection.
162 ///
163 /// This function will cause all pending and future I/O calls on the
164 /// specified portions to immediately return with an appropriate value
165 /// (see the documentation for `Shutdown`).
166 ///
167 /// # Examples
168 ///
169 /// ```no_run
170 /// use uds_windows::UnixStream;
171 /// use std::net::Shutdown;
172 ///
173 /// let socket = UnixStream::connect("/tmp/sock").unwrap();
174 /// socket.shutdown(Shutdown::Both).expect("shutdown function failed");
175 /// ```
176 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
177 self.0.shutdown(how)
178 }
179
180 pub fn pair() -> io::Result<(Self, Self)> {
181 use std::sync::{Arc, RwLock};
182 use std::thread::spawn;
183
184 let dir = tempfile::tempdir()?;
185 let file_path = dir.path().join("socket");
186 let a: Arc<RwLock<Option<io::Result<UnixStream>>>> = Arc::new(RwLock::new(None));
187 let ul = UnixListener::bind(&file_path).unwrap();
188 let server = {
189 let a = a.clone();
190 spawn(move || {
191 let mut store = a.write().unwrap();
192 let stream0 = ul.accept().map(|s| s.0);
193 *store = Some(stream0);
194 })
195 };
196 let stream1 = UnixStream::connect(&file_path)?;
197 server
198 .join()
199 .map_err(|_| io::Error::from(io::ErrorKind::ConnectionRefused))?;
200 let stream0 = (*(a.write().unwrap())).take().unwrap()?;
201 Ok((stream0, stream1))
202 }
203
204 /// Sets the read timeout to the timeout specified.
205 ///
206 /// If the value specified is `None`, then `read` calls will block
207 /// indefinitely. An `Err` is returned if the zero `Duration` is
208 /// passed to this method.
209 ///
210 /// # Examples
211 ///
212 /// ```no_run
213 /// use uds_windows::UnixStream;
214 ///
215 /// let socket = UnixStream::connect("/tmp/sock").unwrap();
216 /// socket.set_read_timeout(None).expect("Couldn't set read timeout");
217 /// ```
218 pub fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
219 self.0.set_timeout(dur, SO_RCVTIMEO)
220 }
221
222 /// Sets the write timeout to the timeout specified.
223 ///
224 /// If the value specified is `None`, then `write` calls will block
225 /// indefinitely. An `Err` is returned if the zero `Duration` is
226 /// passed to this method.
227 ///
228 /// # Examples
229 ///
230 /// ```no_run
231 /// use uds_windows::UnixStream;
232 ///
233 /// let socket = UnixStream::connect("/tmp/sock").unwrap();
234 /// socket.set_write_timeout(None).expect("Couldn't set write timeout");
235 /// ```
236 pub fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
237 self.0.set_timeout(dur, SO_SNDTIMEO)
238 }
239
240 /// Returns the read timeout of this socket.
241 ///
242 /// # Examples
243 ///
244 /// ```no_run
245 /// use uds_windows::UnixStream;
246 ///
247 /// let socket = UnixStream::connect("/tmp/sock").unwrap();
248 /// socket.set_read_timeout(None).expect("Couldn't set read timeout");
249 /// assert_eq!(socket.read_timeout().unwrap(), None);
250 /// ```
251 pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
252 self.0.timeout(SO_RCVTIMEO)
253 }
254
255 /// Returns the write timeout of this socket.
256 ///
257 /// # Examples
258 ///
259 /// ```no_run
260 /// use uds_windows::UnixStream;
261 ///
262 /// let socket = UnixStream::connect("/tmp/sock").unwrap();
263 /// socket.set_write_timeout(None).expect("Couldn't set write timeout");
264 /// assert_eq!(socket.write_timeout().unwrap(), None);
265 /// ```
266 pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
267 self.0.timeout(SO_SNDTIMEO)
268 }
269}
270
271impl io::Read for UnixStream {
272 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
273 io::Read::read(&mut &*self, buf)
274 }
275}
276
277impl<'a> io::Read for &'a UnixStream {
278 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
279 self.0.read(buf)
280 }
281}
282
283impl io::Write for UnixStream {
284 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
285 io::Write::write(&mut &*self, buf)
286 }
287
288 fn flush(&mut self) -> io::Result<()> {
289 io::Write::flush(&mut &*self)
290 }
291}
292
293impl<'a> io::Write for &'a UnixStream {
294 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
295 self.0.write(buf)
296 }
297
298 fn flush(&mut self) -> io::Result<()> {
299 Ok(())
300 }
301}
302
303impl AsSocket for UnixStream {
304 fn as_socket(&self) -> BorrowedSocket {
305 self.0.as_socket()
306 }
307}
308
309impl AsRawSocket for UnixStream {
310 fn as_raw_socket(&self) -> RawSocket {
311 self.0.as_raw_socket()
312 }
313}
314
315impl FromRawSocket for UnixStream {
316 unsafe fn from_raw_socket(sock: RawSocket) -> Self {
317 UnixStream(Socket::from_raw_socket(sock))
318 }
319}
320
321impl IntoRawSocket for UnixStream {
322 fn into_raw_socket(self) -> RawSocket {
323 let ret = self.0.as_raw_socket();
324 mem::forget(self);
325 ret
326 }
327}
328
329/// A Unix domain socket server
330///
331/// # Examples
332///
333/// ```no_run
334/// use std::thread;
335/// use uds_windows::{UnixStream, UnixListener};
336///
337/// fn handle_client(stream: UnixStream) {
338/// // ...
339/// }
340///
341/// let listener = UnixListener::bind("/path/to/the/socket").unwrap();
342///
343/// // accept connections and process them, spawning a new thread for each one
344/// for stream in listener.incoming() {
345/// match stream {
346/// Ok(stream) => {
347/// /* connection succeeded */
348/// thread::spawn(|| handle_client(stream));
349/// }
350/// Err(err) => {
351/// /* connection failed */
352/// break;
353/// }
354/// }
355/// }
356/// ```
357pub struct UnixListener(Socket);
358
359impl fmt::Debug for UnixListener {
360 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
361 let mut builder = fmt.debug_struct("UnixListener");
362 builder.field("socket", &self.0.as_raw_socket());
363 if let Ok(addr) = self.local_addr() {
364 builder.field("local", &addr);
365 }
366 builder.finish()
367 }
368}
369
370impl UnixListener {
371 /// Creates a new `UnixListener` bound to the specified socket.
372 ///
373 /// # Examples
374 ///
375 /// ```no_run
376 /// use uds_windows::UnixListener;
377 ///
378 /// let listener = match UnixListener::bind("/path/to/the/socket") {
379 /// Ok(sock) => sock,
380 /// Err(e) => {
381 /// println!("Couldn't connect: {:?}", e);
382 /// return
383 /// }
384 /// };
385 /// ```
386 pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
387 init();
388 fn inner(path: &Path) -> io::Result<UnixListener> {
389 unsafe {
390 let inner = Socket::new()?;
391 let (addr, len) = sockaddr_un(path)?;
392
393 cvt(bind(
394 inner.as_raw_socket() as _,
395 &addr as *const _ as *const _,
396 len as _,
397 ))?;
398 cvt(listen(inner.as_raw_socket() as _, 128))?;
399
400 Ok(UnixListener(inner))
401 }
402 }
403 inner(path.as_ref())
404 }
405
406 /// Accepts a new incoming connection to this listener.
407 ///
408 /// This function will block the calling thread until a new Unix connection
409 /// is established. When established, the corresponding [`UnixStream`] and
410 /// the remote peer's address will be returned.
411 ///
412 /// [`UnixStream`]: struct.UnixStream.html
413 ///
414 /// # Examples
415 ///
416 /// ```no_run
417 /// use uds_windows::UnixListener;
418 ///
419 /// let listener = UnixListener::bind("/path/to/the/socket").unwrap();
420 ///
421 /// match listener.accept() {
422 /// Ok((socket, addr)) => println!("Got a client: {:?}", addr),
423 /// Err(e) => println!("accept function failed: {:?}", e),
424 /// }
425 /// ```
426 pub fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
427 let mut storage: c::sockaddr_un = unsafe { mem::zeroed() };
428 let mut len = mem::size_of_val(&storage) as c_int;
429 let sock = self.0.accept(&mut storage as *mut _ as *mut _, &mut len)?;
430 let addr = from_sockaddr_un(storage, len)?;
431 Ok((UnixStream(sock), addr))
432 }
433
434 /// Creates a new independently owned handle to the underlying socket.
435 ///
436 /// The returned `UnixListener` is a reference to the same socket that this
437 /// object references. Both handles can be used to accept incoming
438 /// connections and options set on one listener will affect the other.
439 ///
440 /// # Examples
441 ///
442 /// ```no_run
443 /// use uds_windows::UnixListener;
444 ///
445 /// let listener = UnixListener::bind("/path/to/the/socket").unwrap();
446 ///
447 /// let listener_copy = listener.try_clone().expect("Couldn't clone socket");
448 /// ```
449 pub fn try_clone(&self) -> io::Result<UnixListener> {
450 self.0.duplicate().map(UnixListener)
451 }
452
453 /// Returns the local socket address of this listener.
454 ///
455 /// # Examples
456 ///
457 /// ```no_run
458 /// use uds_windows::UnixListener;
459 ///
460 /// let listener = UnixListener::bind("/path/to/the/socket").unwrap();
461 ///
462 /// let addr = listener.local_addr().expect("Couldn't get local address");
463 /// ```
464 pub fn local_addr(&self) -> io::Result<SocketAddr> {
465 SocketAddr::new(|addr, len| unsafe { getsockname(self.0.as_raw_socket() as _, addr, len) })
466 }
467
468 /// Moves the socket into or out of nonblocking mode.
469 ///
470 /// # Examples
471 ///
472 /// ```no_run
473 /// use uds_windows::UnixListener;
474 ///
475 /// let listener = UnixListener::bind("/path/to/the/socket").unwrap();
476 ///
477 /// listener.set_nonblocking(true).expect("Couldn't set nonblocking");
478 /// ```
479 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
480 self.0.set_nonblocking(nonblocking)
481 }
482
483 /// Returns the value of the `SO_ERROR` option.
484 ///
485 /// # Examples
486 ///
487 /// ```no_run
488 /// use uds_windows::UnixListener;
489 ///
490 /// let listener = UnixListener::bind("/tmp/sock").unwrap();
491 ///
492 /// if let Ok(Some(err)) = listener.take_error() {
493 /// println!("Got error: {:?}", err);
494 /// }
495 /// ```
496 ///
497 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
498 self.0.take_error()
499 }
500
501 /// Returns an iterator over incoming connections.
502 ///
503 /// The iterator will never return `None` and will also not yield the
504 /// peer's [`SocketAddr`] structure.
505 ///
506 /// [`SocketAddr`]: struct.SocketAddr.html
507 ///
508 /// # Examples
509 ///
510 /// ```no_run
511 /// use std::thread;
512 /// use uds_windows::{UnixStream, UnixListener};
513 ///
514 /// fn handle_client(stream: UnixStream) {
515 /// // ...
516 /// }
517 ///
518 /// let listener = UnixListener::bind("/path/to/the/socket").unwrap();
519 ///
520 /// for stream in listener.incoming() {
521 /// match stream {
522 /// Ok(stream) => {
523 /// thread::spawn(|| handle_client(stream));
524 /// }
525 /// Err(err) => {
526 /// break;
527 /// }
528 /// }
529 /// }
530 /// ```
531 pub fn incoming(&self) -> Incoming<'_> {
532 Incoming { listener: self }
533 }
534}
535
536impl AsRawSocket for UnixListener {
537 fn as_raw_socket(&self) -> RawSocket {
538 self.0.as_raw_socket()
539 }
540}
541
542impl FromRawSocket for UnixListener {
543 unsafe fn from_raw_socket(sock: RawSocket) -> Self {
544 UnixListener(Socket::from_raw_socket(sock))
545 }
546}
547
548impl IntoRawSocket for UnixListener {
549 fn into_raw_socket(self) -> RawSocket {
550 let ret = self.0.as_raw_socket();
551 mem::forget(self);
552 ret
553 }
554}
555
556impl<'a> IntoIterator for &'a UnixListener {
557 type Item = io::Result<UnixStream>;
558 type IntoIter = Incoming<'a>;
559
560 fn into_iter(self) -> Incoming<'a> {
561 self.incoming()
562 }
563}
564
565/// An iterator over incoming connections to a [`UnixListener`].
566///
567/// It will never return `None`.
568///
569/// [`UnixListener`]: struct.UnixListener.html
570///
571/// # Examples
572///
573/// ```no_run
574/// use std::thread;
575/// use uds_windows::{UnixStream, UnixListener};
576///
577/// fn handle_client(stream: UnixStream) {
578/// // ...
579/// }
580///
581/// let listener = UnixListener::bind("/path/to/the/socket").unwrap();
582///
583/// for stream in listener.incoming() {
584/// match stream {
585/// Ok(stream) => {
586/// thread::spawn(|| handle_client(stream));
587/// }
588/// Err(err) => {
589/// break;
590/// }
591/// }
592/// }
593/// ```
594#[derive(Debug)]
595pub struct Incoming<'a> {
596 listener: &'a UnixListener,
597}
598
599impl<'a> Iterator for Incoming<'a> {
600 type Item = io::Result<UnixStream>;
601
602 fn next(&mut self) -> Option<io::Result<UnixStream>> {
603 Some(self.listener.accept().map(|s| s.0))
604 }
605
606 fn size_hint(&self) -> (usize, Option<usize>) {
607 (usize::max_value(), None)
608 }
609}
610
611#[cfg(test)]
612mod test {
613 extern crate tempfile;
614
615 use std::io::{self, Read, Write};
616 use std::path::PathBuf;
617 use std::thread;
618
619 use self::tempfile::TempDir;
620
621 use super::*;
622
623 macro_rules! or_panic {
624 ($e:expr) => {
625 match $e {
626 Ok(e) => e,
627 Err(e) => panic!("{}", e),
628 }
629 };
630 }
631
632 fn tmpdir() -> Result<(TempDir, PathBuf), io::Error> {
633 let dir = tempfile::tempdir()?;
634 let path = dir.path().join("sock");
635 Ok((dir, path))
636 }
637
638 #[test]
639 fn basic() {
640 let (_dir, socket_path) = or_panic!(tmpdir());
641 let msg1 = b"hello";
642 let msg2 = b"world!";
643
644 let listener = or_panic!(UnixListener::bind(&socket_path));
645 let thread = thread::spawn(move || {
646 let mut stream = or_panic!(listener.accept()).0;
647 let mut buf = [0; 5];
648 or_panic!(stream.read(&mut buf));
649 assert_eq!(&msg1[..], &buf[..]);
650 or_panic!(stream.write_all(msg2));
651 });
652
653 let mut stream = or_panic!(UnixStream::connect(&socket_path));
654 assert_eq!(
655 Some(&*socket_path),
656 stream.peer_addr().unwrap().as_pathname()
657 );
658 or_panic!(stream.write_all(msg1));
659 let mut buf = vec![];
660 or_panic!(stream.read_to_end(&mut buf));
661 assert_eq!(&msg2[..], &buf[..]);
662 drop(stream);
663
664 thread.join().unwrap();
665 }
666
667 #[test]
668 fn try_clone() {
669 let (_dir, socket_path) = or_panic!(tmpdir());
670 let msg1 = b"hello";
671 let msg2 = b"world";
672
673 let listener = or_panic!(UnixListener::bind(&socket_path));
674 let thread = thread::spawn(move || {
675 #[allow(unused_mut)]
676 let mut stream = or_panic!(listener.accept()).0;
677 or_panic!(stream.write_all(msg1));
678 or_panic!(stream.write_all(msg2));
679 });
680
681 let mut stream = or_panic!(UnixStream::connect(&socket_path));
682 let mut stream2 = or_panic!(stream.try_clone());
683 assert_eq!(
684 Some(&*socket_path),
685 stream2.peer_addr().unwrap().as_pathname()
686 );
687
688 let mut buf = [0; 5];
689 or_panic!(stream.read(&mut buf));
690 assert_eq!(&msg1[..], &buf[..]);
691 or_panic!(stream2.read(&mut buf));
692 assert_eq!(&msg2[..], &buf[..]);
693
694 thread.join().unwrap();
695 }
696
697 #[test]
698 fn iter() {
699 let (_dir, socket_path) = or_panic!(tmpdir());
700
701 let listener = or_panic!(UnixListener::bind(&socket_path));
702 let thread = thread::spawn(move || {
703 for stream in listener.incoming().take(2) {
704 let mut stream = or_panic!(stream);
705 let mut buf = [0];
706 or_panic!(stream.read(&mut buf));
707 }
708 });
709
710 for _ in 0..2 {
711 let mut stream = or_panic!(UnixStream::connect(&socket_path));
712 or_panic!(stream.write_all(&[0]));
713 }
714
715 thread.join().unwrap();
716 }
717
718 #[test]
719 fn long_path() {
720 let dir = or_panic!(tempfile::tempdir());
721 let socket_path = dir.path().join(
722 "asdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfa\
723 sasdfasdfasdasdfasdfasdfadfasdfasdfasdfasdfasdf",
724 );
725 match UnixStream::connect(&socket_path) {
726 Err(ref e) if e.kind() == io::ErrorKind::InvalidInput => {}
727 Err(e) => panic!("unexpected error {}", e),
728 Ok(_) => panic!("unexpected success"),
729 }
730
731 match UnixListener::bind(&socket_path) {
732 Err(ref e) if e.kind() == io::ErrorKind::InvalidInput => {}
733 Err(e) => panic!("unexpected error {}", e),
734 Ok(_) => panic!("unexpected success"),
735 }
736 }
737
738 #[test]
739 fn abstract_namespace_not_allowed() {
740 assert!(UnixStream::connect("\0asdf").is_err());
741 }
742}