uds_windows/stdnet/
net.rs

1use std::fmt;
2use std::io;
3use std::mem;
4use std::net::Shutdown;
5use std::os::raw::c_int;
6use std::os::windows::io::{
7    AsRawSocket, AsSocket, BorrowedSocket, FromRawSocket, IntoRawSocket, RawSocket,
8};
9use std::path::Path;
10use std::time::Duration;
11
12use winapi::um::winsock2::{
13    bind, connect, getpeername, getsockname, listen, SO_RCVTIMEO, SO_SNDTIMEO,
14};
15
16use super::socket::{init, Socket};
17use super::{c, cvt, from_sockaddr_un, sockaddr_un, SocketAddr};
18
19/// A Unix stream socket
20///
21/// # Examples
22///
23/// ```no_run
24/// use uds_windows::UnixStream;
25/// use std::io::prelude::*;
26///
27/// let mut stream = UnixStream::connect("/path/to/my/socket").unwrap();
28/// stream.write_all(b"hello world").unwrap();
29/// let mut response = String::new();
30/// stream.read_to_string(&mut response).unwrap();
31/// println!("{}", response);
32/// ```
33pub struct UnixStream(Socket);
34
35impl fmt::Debug for UnixStream {
36    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
37        let mut builder = fmt.debug_struct("UnixStream");
38        builder.field("socket", &self.0.as_raw_socket());
39        if let Ok(addr) = self.local_addr() {
40            builder.field("local", &addr);
41        }
42        if let Ok(addr) = self.peer_addr() {
43            builder.field("peer", &addr);
44        }
45        builder.finish()
46    }
47}
48
49impl UnixStream {
50    /// Connects to the socket named by `path`.
51    ///
52    /// # Examples
53    ///
54    /// ```no_run
55    /// use uds_windows::UnixStream;
56    ///
57    /// let socket = match UnixStream::connect("/tmp/sock") {
58    ///     Ok(sock) => sock,
59    ///     Err(e) => {
60    ///         println!("Couldn't connect: {:?}", e);
61    ///         return
62    ///     }
63    /// };
64    /// ```
65    pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
66        init();
67        fn inner(path: &Path) -> io::Result<UnixStream> {
68            unsafe {
69                let inner = Socket::new()?;
70                let (addr, len) = sockaddr_un(path)?;
71
72                cvt(connect(
73                    inner.as_raw_socket() as _,
74                    &addr as *const _ as *const _,
75                    len,
76                ))?;
77                Ok(UnixStream(inner))
78            }
79        }
80        inner(path.as_ref())
81    }
82
83    /// Creates a new independently owned handle to the underlying socket.
84    ///
85    /// The returned `UnixStream` is a reference to the same stream that this
86    /// object references. Both handles will read and write the same stream of
87    /// data, and options set on one stream will be propagated to the other
88    /// stream.
89    ///
90    /// # Examples
91    ///
92    /// ```no_run
93    /// use uds_windows::UnixStream;
94    ///
95    /// let socket = UnixStream::connect("/tmp/sock").unwrap();
96    /// let sock_copy = socket.try_clone().expect("Couldn't clone socket");
97    /// ```
98    pub fn try_clone(&self) -> io::Result<UnixStream> {
99        self.0.duplicate().map(UnixStream)
100    }
101
102    /// Returns the socket address of the local half of this connection.
103    ///
104    /// # Examples
105    ///
106    /// ```no_run
107    /// use uds_windows::UnixStream;
108    ///
109    /// let socket = UnixStream::connect("/tmp/sock").unwrap();
110    /// let addr = socket.local_addr().expect("Couldn't get local address");
111    /// ```
112    pub fn local_addr(&self) -> io::Result<SocketAddr> {
113        SocketAddr::new(|addr, len| unsafe { getsockname(self.0.as_raw_socket() as _, addr, len) })
114    }
115
116    /// Returns the socket address of the remote half of this connection.
117    ///
118    /// # Examples
119    ///
120    /// ```no_run
121    /// use uds_windows::UnixStream;
122    ///
123    /// let socket = UnixStream::connect("/tmp/sock").unwrap();
124    /// let addr = socket.peer_addr().expect("Couldn't get peer address");
125    /// ```
126    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
127        SocketAddr::new(|addr, len| unsafe { getpeername(self.0.as_raw_socket() as _, addr, len) })
128    }
129
130    /// Moves the socket into or out of nonblocking mode.
131    ///
132    /// # Examples
133    ///
134    /// ```no_run
135    /// use uds_windows::UnixStream;
136    ///
137    /// let socket = UnixStream::connect("/tmp/sock").unwrap();
138    /// socket.set_nonblocking(true).expect("Couldn't set nonblocking");
139    /// ```
140    pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
141        self.0.set_nonblocking(nonblocking)
142    }
143
144    /// Returns the value of the `SO_ERROR` option.
145    ///
146    /// # Examples
147    ///
148    /// ```no_run
149    /// use uds_windows::UnixStream;
150    ///
151    /// let socket = UnixStream::connect("/tmp/sock").unwrap();
152    /// if let Ok(Some(err)) = socket.take_error() {
153    ///     println!("Got error: {:?}", err);
154    /// }
155    /// ```
156    ///
157    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
158        self.0.take_error()
159    }
160
161    /// Shuts down the read, write, or both halves of this connection.
162    ///
163    /// This function will cause all pending and future I/O calls on the
164    /// specified portions to immediately return with an appropriate value
165    /// (see the documentation for `Shutdown`).
166    ///
167    /// # Examples
168    ///
169    /// ```no_run
170    /// use uds_windows::UnixStream;
171    /// use std::net::Shutdown;
172    ///
173    /// let socket = UnixStream::connect("/tmp/sock").unwrap();
174    /// socket.shutdown(Shutdown::Both).expect("shutdown function failed");
175    /// ```
176    pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
177        self.0.shutdown(how)
178    }
179
180    pub fn pair() -> io::Result<(Self, Self)> {
181        use std::sync::{Arc, RwLock};
182        use std::thread::spawn;
183
184        let dir = tempfile::tempdir()?;
185        let file_path = dir.path().join("socket");
186        let a: Arc<RwLock<Option<io::Result<UnixStream>>>> = Arc::new(RwLock::new(None));
187        let ul = UnixListener::bind(&file_path).unwrap();
188        let server = {
189            let a = a.clone();
190            spawn(move || {
191                let mut store = a.write().unwrap();
192                let stream0 = ul.accept().map(|s| s.0);
193                *store = Some(stream0);
194            })
195        };
196        let stream1 = UnixStream::connect(&file_path)?;
197        server
198            .join()
199            .map_err(|_| io::Error::from(io::ErrorKind::ConnectionRefused))?;
200        let stream0 = (*(a.write().unwrap())).take().unwrap()?;
201        Ok((stream0, stream1))
202    }
203
204    /// Sets the read timeout to the timeout specified.
205    ///
206    /// If the value specified is `None`, then `read` calls will block
207    /// indefinitely. An `Err` is returned if the zero `Duration` is
208    /// passed to this method.
209    ///
210    /// # Examples
211    ///
212    /// ```no_run
213    /// use uds_windows::UnixStream;
214    ///
215    /// let socket = UnixStream::connect("/tmp/sock").unwrap();
216    /// socket.set_read_timeout(None).expect("Couldn't set read timeout");
217    /// ```
218    pub fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
219        self.0.set_timeout(dur, SO_RCVTIMEO)
220    }
221
222    /// Sets the write timeout to the timeout specified.
223    ///
224    /// If the value specified is `None`, then `write` calls will block
225    /// indefinitely. An `Err` is returned if the zero `Duration` is
226    /// passed to this method.
227    ///
228    /// # Examples
229    ///
230    /// ```no_run
231    /// use uds_windows::UnixStream;
232    ///
233    /// let socket = UnixStream::connect("/tmp/sock").unwrap();
234    /// socket.set_write_timeout(None).expect("Couldn't set write timeout");
235    /// ```
236    pub fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
237        self.0.set_timeout(dur, SO_SNDTIMEO)
238    }
239
240    /// Returns the read timeout of this socket.
241    ///
242    /// # Examples
243    ///
244    /// ```no_run
245    /// use uds_windows::UnixStream;
246    ///
247    /// let socket = UnixStream::connect("/tmp/sock").unwrap();
248    /// socket.set_read_timeout(None).expect("Couldn't set read timeout");
249    /// assert_eq!(socket.read_timeout().unwrap(), None);
250    /// ```
251    pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
252        self.0.timeout(SO_RCVTIMEO)
253    }
254
255    /// Returns the write timeout of this socket.
256    ///
257    /// # Examples
258    ///
259    /// ```no_run
260    /// use uds_windows::UnixStream;
261    ///
262    /// let socket = UnixStream::connect("/tmp/sock").unwrap();
263    /// socket.set_write_timeout(None).expect("Couldn't set write timeout");
264    /// assert_eq!(socket.write_timeout().unwrap(), None);
265    /// ```
266    pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
267        self.0.timeout(SO_SNDTIMEO)
268    }
269}
270
271impl io::Read for UnixStream {
272    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
273        io::Read::read(&mut &*self, buf)
274    }
275}
276
277impl<'a> io::Read for &'a UnixStream {
278    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
279        self.0.read(buf)
280    }
281}
282
283impl io::Write for UnixStream {
284    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
285        io::Write::write(&mut &*self, buf)
286    }
287
288    fn flush(&mut self) -> io::Result<()> {
289        io::Write::flush(&mut &*self)
290    }
291}
292
293impl<'a> io::Write for &'a UnixStream {
294    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
295        self.0.write(buf)
296    }
297
298    fn flush(&mut self) -> io::Result<()> {
299        Ok(())
300    }
301}
302
303impl AsSocket for UnixStream {
304    fn as_socket(&self) -> BorrowedSocket {
305        self.0.as_socket()
306    }
307}
308
309impl AsRawSocket for UnixStream {
310    fn as_raw_socket(&self) -> RawSocket {
311        self.0.as_raw_socket()
312    }
313}
314
315impl FromRawSocket for UnixStream {
316    unsafe fn from_raw_socket(sock: RawSocket) -> Self {
317        UnixStream(Socket::from_raw_socket(sock))
318    }
319}
320
321impl IntoRawSocket for UnixStream {
322    fn into_raw_socket(self) -> RawSocket {
323        let ret = self.0.as_raw_socket();
324        mem::forget(self);
325        ret
326    }
327}
328
329/// A Unix domain socket server
330///
331/// # Examples
332///
333/// ```no_run
334/// use std::thread;
335/// use uds_windows::{UnixStream, UnixListener};
336///
337/// fn handle_client(stream: UnixStream) {
338///     // ...
339/// }
340///
341/// let listener = UnixListener::bind("/path/to/the/socket").unwrap();
342///
343/// // accept connections and process them, spawning a new thread for each one
344/// for stream in listener.incoming() {
345///     match stream {
346///         Ok(stream) => {
347///             /* connection succeeded */
348///             thread::spawn(|| handle_client(stream));
349///         }
350///         Err(err) => {
351///             /* connection failed */
352///             break;
353///         }
354///     }
355/// }
356/// ```
357pub struct UnixListener(Socket);
358
359impl fmt::Debug for UnixListener {
360    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
361        let mut builder = fmt.debug_struct("UnixListener");
362        builder.field("socket", &self.0.as_raw_socket());
363        if let Ok(addr) = self.local_addr() {
364            builder.field("local", &addr);
365        }
366        builder.finish()
367    }
368}
369
370impl UnixListener {
371    /// Creates a new `UnixListener` bound to the specified socket.
372    ///
373    /// # Examples
374    ///
375    /// ```no_run
376    /// use uds_windows::UnixListener;
377    ///
378    /// let listener = match UnixListener::bind("/path/to/the/socket") {
379    ///     Ok(sock) => sock,
380    ///     Err(e) => {
381    ///         println!("Couldn't connect: {:?}", e);
382    ///         return
383    ///     }
384    /// };
385    /// ```
386    pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
387        init();
388        fn inner(path: &Path) -> io::Result<UnixListener> {
389            unsafe {
390                let inner = Socket::new()?;
391                let (addr, len) = sockaddr_un(path)?;
392
393                cvt(bind(
394                    inner.as_raw_socket() as _,
395                    &addr as *const _ as *const _,
396                    len as _,
397                ))?;
398                cvt(listen(inner.as_raw_socket() as _, 128))?;
399
400                Ok(UnixListener(inner))
401            }
402        }
403        inner(path.as_ref())
404    }
405
406    /// Accepts a new incoming connection to this listener.
407    ///
408    /// This function will block the calling thread until a new Unix connection
409    /// is established. When established, the corresponding [`UnixStream`] and
410    /// the remote peer's address will be returned.
411    ///
412    /// [`UnixStream`]: struct.UnixStream.html
413    ///
414    /// # Examples
415    ///
416    /// ```no_run
417    /// use uds_windows::UnixListener;
418    ///
419    /// let listener = UnixListener::bind("/path/to/the/socket").unwrap();
420    ///
421    /// match listener.accept() {
422    ///     Ok((socket, addr)) => println!("Got a client: {:?}", addr),
423    ///     Err(e) => println!("accept function failed: {:?}", e),
424    /// }
425    /// ```
426    pub fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
427        let mut storage: c::sockaddr_un = unsafe { mem::zeroed() };
428        let mut len = mem::size_of_val(&storage) as c_int;
429        let sock = self.0.accept(&mut storage as *mut _ as *mut _, &mut len)?;
430        let addr = from_sockaddr_un(storage, len)?;
431        Ok((UnixStream(sock), addr))
432    }
433
434    /// Creates a new independently owned handle to the underlying socket.
435    ///
436    /// The returned `UnixListener` is a reference to the same socket that this
437    /// object references. Both handles can be used to accept incoming
438    /// connections and options set on one listener will affect the other.
439    ///
440    /// # Examples
441    ///
442    /// ```no_run
443    /// use uds_windows::UnixListener;
444    ///
445    /// let listener = UnixListener::bind("/path/to/the/socket").unwrap();
446    ///
447    /// let listener_copy = listener.try_clone().expect("Couldn't clone socket");
448    /// ```
449    pub fn try_clone(&self) -> io::Result<UnixListener> {
450        self.0.duplicate().map(UnixListener)
451    }
452
453    /// Returns the local socket address of this listener.
454    ///
455    /// # Examples
456    ///
457    /// ```no_run
458    /// use uds_windows::UnixListener;
459    ///
460    /// let listener = UnixListener::bind("/path/to/the/socket").unwrap();
461    ///
462    /// let addr = listener.local_addr().expect("Couldn't get local address");
463    /// ```
464    pub fn local_addr(&self) -> io::Result<SocketAddr> {
465        SocketAddr::new(|addr, len| unsafe { getsockname(self.0.as_raw_socket() as _, addr, len) })
466    }
467
468    /// Moves the socket into or out of nonblocking mode.
469    ///
470    /// # Examples
471    ///
472    /// ```no_run
473    /// use uds_windows::UnixListener;
474    ///
475    /// let listener = UnixListener::bind("/path/to/the/socket").unwrap();
476    ///
477    /// listener.set_nonblocking(true).expect("Couldn't set nonblocking");
478    /// ```
479    pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
480        self.0.set_nonblocking(nonblocking)
481    }
482
483    /// Returns the value of the `SO_ERROR` option.
484    ///
485    /// # Examples
486    ///
487    /// ```no_run
488    /// use uds_windows::UnixListener;
489    ///
490    /// let listener = UnixListener::bind("/tmp/sock").unwrap();
491    ///
492    /// if let Ok(Some(err)) = listener.take_error() {
493    ///     println!("Got error: {:?}", err);
494    /// }
495    /// ```
496    ///
497    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
498        self.0.take_error()
499    }
500
501    /// Returns an iterator over incoming connections.
502    ///
503    /// The iterator will never return `None` and will also not yield the
504    /// peer's [`SocketAddr`] structure.
505    ///
506    /// [`SocketAddr`]: struct.SocketAddr.html
507    ///
508    /// # Examples
509    ///
510    /// ```no_run
511    /// use std::thread;
512    /// use uds_windows::{UnixStream, UnixListener};
513    ///
514    /// fn handle_client(stream: UnixStream) {
515    ///     // ...
516    /// }
517    ///
518    /// let listener = UnixListener::bind("/path/to/the/socket").unwrap();
519    ///
520    /// for stream in listener.incoming() {
521    ///     match stream {
522    ///         Ok(stream) => {
523    ///             thread::spawn(|| handle_client(stream));
524    ///         }
525    ///         Err(err) => {
526    ///             break;
527    ///         }
528    ///     }
529    /// }
530    /// ```
531    pub fn incoming(&self) -> Incoming<'_> {
532        Incoming { listener: self }
533    }
534}
535
536impl AsRawSocket for UnixListener {
537    fn as_raw_socket(&self) -> RawSocket {
538        self.0.as_raw_socket()
539    }
540}
541
542impl FromRawSocket for UnixListener {
543    unsafe fn from_raw_socket(sock: RawSocket) -> Self {
544        UnixListener(Socket::from_raw_socket(sock))
545    }
546}
547
548impl IntoRawSocket for UnixListener {
549    fn into_raw_socket(self) -> RawSocket {
550        let ret = self.0.as_raw_socket();
551        mem::forget(self);
552        ret
553    }
554}
555
556impl<'a> IntoIterator for &'a UnixListener {
557    type Item = io::Result<UnixStream>;
558    type IntoIter = Incoming<'a>;
559
560    fn into_iter(self) -> Incoming<'a> {
561        self.incoming()
562    }
563}
564
565/// An iterator over incoming connections to a [`UnixListener`].
566///
567/// It will never return `None`.
568///
569/// [`UnixListener`]: struct.UnixListener.html
570///
571/// # Examples
572///
573/// ```no_run
574/// use std::thread;
575/// use uds_windows::{UnixStream, UnixListener};
576///
577/// fn handle_client(stream: UnixStream) {
578///     // ...
579/// }
580///
581/// let listener = UnixListener::bind("/path/to/the/socket").unwrap();
582///
583/// for stream in listener.incoming() {
584///     match stream {
585///         Ok(stream) => {
586///             thread::spawn(|| handle_client(stream));
587///         }
588///         Err(err) => {
589///             break;
590///         }
591///     }
592/// }
593/// ```
594#[derive(Debug)]
595pub struct Incoming<'a> {
596    listener: &'a UnixListener,
597}
598
599impl<'a> Iterator for Incoming<'a> {
600    type Item = io::Result<UnixStream>;
601
602    fn next(&mut self) -> Option<io::Result<UnixStream>> {
603        Some(self.listener.accept().map(|s| s.0))
604    }
605
606    fn size_hint(&self) -> (usize, Option<usize>) {
607        (usize::max_value(), None)
608    }
609}
610
611#[cfg(test)]
612mod test {
613    extern crate tempfile;
614
615    use std::io::{self, Read, Write};
616    use std::path::PathBuf;
617    use std::thread;
618
619    use self::tempfile::TempDir;
620
621    use super::*;
622
623    macro_rules! or_panic {
624        ($e:expr) => {
625            match $e {
626                Ok(e) => e,
627                Err(e) => panic!("{}", e),
628            }
629        };
630    }
631
632    fn tmpdir() -> Result<(TempDir, PathBuf), io::Error> {
633        let dir = tempfile::tempdir()?;
634        let path = dir.path().join("sock");
635        Ok((dir, path))
636    }
637
638    #[test]
639    fn basic() {
640        let (_dir, socket_path) = or_panic!(tmpdir());
641        let msg1 = b"hello";
642        let msg2 = b"world!";
643
644        let listener = or_panic!(UnixListener::bind(&socket_path));
645        let thread = thread::spawn(move || {
646            let mut stream = or_panic!(listener.accept()).0;
647            let mut buf = [0; 5];
648            or_panic!(stream.read(&mut buf));
649            assert_eq!(&msg1[..], &buf[..]);
650            or_panic!(stream.write_all(msg2));
651        });
652
653        let mut stream = or_panic!(UnixStream::connect(&socket_path));
654        assert_eq!(
655            Some(&*socket_path),
656            stream.peer_addr().unwrap().as_pathname()
657        );
658        or_panic!(stream.write_all(msg1));
659        let mut buf = vec![];
660        or_panic!(stream.read_to_end(&mut buf));
661        assert_eq!(&msg2[..], &buf[..]);
662        drop(stream);
663
664        thread.join().unwrap();
665    }
666
667    #[test]
668    fn try_clone() {
669        let (_dir, socket_path) = or_panic!(tmpdir());
670        let msg1 = b"hello";
671        let msg2 = b"world";
672
673        let listener = or_panic!(UnixListener::bind(&socket_path));
674        let thread = thread::spawn(move || {
675            #[allow(unused_mut)]
676            let mut stream = or_panic!(listener.accept()).0;
677            or_panic!(stream.write_all(msg1));
678            or_panic!(stream.write_all(msg2));
679        });
680
681        let mut stream = or_panic!(UnixStream::connect(&socket_path));
682        let mut stream2 = or_panic!(stream.try_clone());
683        assert_eq!(
684            Some(&*socket_path),
685            stream2.peer_addr().unwrap().as_pathname()
686        );
687
688        let mut buf = [0; 5];
689        or_panic!(stream.read(&mut buf));
690        assert_eq!(&msg1[..], &buf[..]);
691        or_panic!(stream2.read(&mut buf));
692        assert_eq!(&msg2[..], &buf[..]);
693
694        thread.join().unwrap();
695    }
696
697    #[test]
698    fn iter() {
699        let (_dir, socket_path) = or_panic!(tmpdir());
700
701        let listener = or_panic!(UnixListener::bind(&socket_path));
702        let thread = thread::spawn(move || {
703            for stream in listener.incoming().take(2) {
704                let mut stream = or_panic!(stream);
705                let mut buf = [0];
706                or_panic!(stream.read(&mut buf));
707            }
708        });
709
710        for _ in 0..2 {
711            let mut stream = or_panic!(UnixStream::connect(&socket_path));
712            or_panic!(stream.write_all(&[0]));
713        }
714
715        thread.join().unwrap();
716    }
717
718    #[test]
719    fn long_path() {
720        let dir = or_panic!(tempfile::tempdir());
721        let socket_path = dir.path().join(
722            "asdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfa\
723             sasdfasdfasdasdfasdfasdfadfasdfasdfasdfasdfasdf",
724        );
725        match UnixStream::connect(&socket_path) {
726            Err(ref e) if e.kind() == io::ErrorKind::InvalidInput => {}
727            Err(e) => panic!("unexpected error {}", e),
728            Ok(_) => panic!("unexpected success"),
729        }
730
731        match UnixListener::bind(&socket_path) {
732            Err(ref e) if e.kind() == io::ErrorKind::InvalidInput => {}
733            Err(e) => panic!("unexpected error {}", e),
734            Ok(_) => panic!("unexpected success"),
735        }
736    }
737
738    #[test]
739    fn abstract_namespace_not_allowed() {
740        assert!(UnixStream::connect("\0asdf").is_err());
741    }
742}