nb_connect/lib.rs
1//! Non-blocking TCP or Unix connect.
2//!
3//! This crate allows you to create a [`TcpStream`] or a [`UnixStream`] in a non-blocking way,
4//! without waiting for the connection to become fully established.
5//!
6//! [`TcpStream`]: https://doc.rust-lang.org/stable/std/net/struct.TcpStream.html
7//! [`UnixStream`]: https://doc.rust-lang.org/stable/std/os/unix/net/struct.UnixStream.html
8//!
9//! # Examples
10//!
11//! ```
12//! use polling::{Event, Poller};
13//! use std::time::Duration;
14//!
15//! // Create a pending TCP connection.
16//! let stream = nb_connect::tcp(([127, 0, 0, 1], 80))?;
17//!
18//! // Create a poller that waits for the stream to become writable.
19//! let poller = Poller::new()?;
20//! poller.add(&stream, Event::writable(0))?;
21//!
22//! // Wait for at most 1 second.
23//! if poller.wait(&mut Vec::new(), Some(Duration::from_secs(1)))? == 0 {
24//! println!("timeout");
25//! } else if let Some(err) = stream.take_error()? {
26//! println!("error: {}", err);
27//! } else {
28//! println!("connected");
29//! }
30//! # std::io::Result::Ok(())
31//! ```
32
33#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
34#."
37)]
38
39use std::io;
40use std::net::{SocketAddr, TcpStream};
41
42use socket2::{Domain, Protocol, SockAddr, Socket, Type};
43
44#[cfg(unix)]
45use std::{os::unix::net::UnixStream, path::Path};
46
47fn connect(addr: SockAddr, domain: Domain, protocol: Option<Protocol>) -> io::Result<Socket> {
48 let sock_type = Type::STREAM;
49 #[cfg(any(
50 target_os = "android",
51 target_os = "dragonfly",
52 target_os = "freebsd",
53 target_os = "fuchsia",
54 target_os = "illumos",
55 target_os = "linux",
56 target_os = "netbsd",
57 target_os = "openbsd"
58 ))]
59 // If we can, set nonblocking at socket creation for unix
60 let sock_type = sock_type.nonblocking();
61 // This automatically handles cloexec on unix, no_inherit on windows and nosigpipe on macos
62 let socket = Socket::new(domain, sock_type, protocol)?;
63 #[cfg(not(any(
64 target_os = "android",
65 target_os = "dragonfly",
66 target_os = "freebsd",
67 target_os = "fuchsia",
68 target_os = "illumos",
69 target_os = "linux",
70 target_os = "netbsd",
71 target_os = "openbsd"
72 )))]
73 // If the current platform doesn't support nonblocking at creation, enable it after creation
74 socket.set_nonblocking(true)?;
75 match socket.connect(&addr) {
76 Ok(_) => {}
77 #[cfg(unix)]
78 Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {}
79 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
80 Err(err) => return Err(err),
81 }
82 Ok(socket)
83}
84
85/// Creates a pending Unix connection to the specified path.
86///
87/// The returned Unix stream will be in non-blocking mode and in the process of connecting to the
88/// specified path.
89///
90/// The stream becomes writable when connected.
91///
92/// # Examples
93///
94/// ```no_run
95/// use polling::{Event, Poller};
96/// use std::time::Duration;
97///
98/// // Create a pending Unix connection.
99/// let stream = nb_connect::unix("/tmp/socket")?;
100///
101/// // Create a poller that waits for the stream to become writable.
102/// let poller = Poller::new()?;
103/// poller.add(&stream, Event::writable(0))?;
104///
105/// // Wait for at most 1 second.
106/// if poller.wait(&mut Vec::new(), Some(Duration::from_secs(1)))? == 0 {
107/// println!("timeout");
108/// } else {
109/// println!("connected");
110/// }
111/// # std::io::Result::Ok(())
112/// ```
113#[cfg(unix)]
114pub fn unix<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
115 let socket = connect(SockAddr::unix(path)?, Domain::UNIX, None)?;
116 Ok(socket.into())
117}
118
119/// Creates a pending TCP connection to the specified address.
120///
121/// The returned TCP stream will be in non-blocking mode and in the process of connecting to the
122/// specified address.
123///
124/// The stream becomes writable when connected.
125///
126/// # Examples
127///
128/// ```
129/// use polling::{Event, Poller};
130/// use std::time::Duration;
131///
132/// // Create a pending TCP connection.
133/// let stream = nb_connect::tcp(([127, 0, 0, 1], 80))?;
134///
135/// // Create a poller that waits for the stream to become writable.
136/// let poller = Poller::new()?;
137/// poller.add(&stream, Event::writable(0))?;
138///
139/// // Wait for at most 1 second.
140/// if poller.wait(&mut Vec::new(), Some(Duration::from_secs(1)))? == 0 {
141/// println!("timeout");
142/// } else if let Some(err) = stream.take_error()? {
143/// println!("error: {}", err);
144/// } else {
145/// println!("connected");
146/// }
147/// # std::io::Result::Ok(())
148/// ```
149pub fn tcp<A: Into<SocketAddr>>(addr: A) -> io::Result<TcpStream> {
150 let addr = addr.into();
151 let domain = Domain::for_address(addr);
152 let socket = connect(addr.into(), domain, Some(Protocol::TCP))?;
153 Ok(socket.into())
154}