nng 0.3.0

A safe wrapper for nanomsg-next-generation (nng)
Documentation
//! Nanomsg-next-generation dialers.
//!
//! A dialer is responsible for establishing and maintaining outgoing
//! connections. If a connection is ever broken, or fails, the dialer object
//! automatically attempts to reconnect.
//!
//! Directly creating a dialer object is only necessary when one wishes to
//! configure the connection before opening it or if one wants to close the
//! outgoing connection without closing the socket. Otherwise, `Socket::dial`
//! can be used.
//!
//! Note that the client/server relationship described by a dialer/listener is
//! completely orthogonal to any similar relationship in the protocols. For
//! example, a _rep_ socket may use a dialer to connect to a listener on a
//! _req_ socket. This orthogonality can lead to innovative solutions to
//! otherwise challenging communications problems.
//!
//! See the [nng documentation][1] for more information.
//!
//! [1]: https://nanomsg.github.io/nng/man/v1.0.0/nng_dialer.5.html
use std::ffi::CString;
use crate::error::{Error, ErrorKind, Result};
use crate::socket::Socket;

/// A constructed and running dialer.
///
/// This dialer has already been started on the socket and will continue
/// serving the connection until either it or the socket is dropped.
pub struct Dialer
{
	/// The handle to the underlying
	handle: nng_sys::nng_dialer,
}
impl Dialer
{
	/// Creates a new dialer object associated with the given socket.
	///
	/// Note that this will immediately start the dialer so no configuration
	/// will be possible. Use `DialerOptions` to change the dialer options
	/// before starting it.
	pub fn new(socket: &Socket, url: &str, nonblocking: bool) -> Result<Self>
	{
		// We take a Rust string instead of a c-string because the cost of
		// creating the dialer will far outweigh the cost of allocating a
		// single string. Having a full Rust interface will make it easier to
		// work with.
		let addr = CString::new(url).map_err(|_| ErrorKind::AddressInvalid)?;
		let mut handle = nng_sys::NNG_DIALER_INITIALIZER;
		let flags = if nonblocking { nng_sys::NNG_FLAG_NONBLOCK } else { 0 };

		let rv = unsafe {
			nng_sys::nng_dial(socket.handle(), addr.as_ptr(), &mut handle as *mut _, flags)
		};

		rv2res!(rv, Dialer { handle })
	}

	/// Returns the positive identifier for the dialer.
	pub fn id(&self) -> i32
	{
		let id = unsafe { nng_sys::nng_dialer_id(self.handle) };
		assert!(id > 0, "Invalid dialer ID returned from valid socket");

		id
	}
}

expose_options!{
	Dialer :: handle -> nng_sys::nng_dialer;

	GETOPT_BOOL = nng_sys::nng_dialer_getopt_bool;
	GETOPT_INT = nng_sys::nng_dialer_getopt_int;
	GETOPT_MS = nng_sys::nng_dialer_getopt_ms;
	GETOPT_SIZE = nng_sys::nng_dialer_getopt_size;
	GETOPT_SOCKADDR = nng_sys::nng_dialer_getopt_sockaddr;
	GETOPT_STRING = nng_sys::nng_dialer_getopt_string;

	SETOPT = nng_sys::nng_dialer_setopt;
	SETOPT_BOOL = nng_sys::nng_dialer_setopt_bool;
	SETOPT_INT = nng_sys::nng_dialer_setopt_int;
	SETOPT_MS = nng_sys::nng_dialer_setopt_ms;
	SETOPT_SIZE = nng_sys::nng_dialer_setopt_size;
	SETOPT_STRING = nng_sys::nng_dialer_setopt_string;

	Gets -> [LocalAddr, Raw, ReconnectMinTime,
	         ReconnectMaxTime, RecvBufferSize,
	         RecvMaxSize, RecvTimeout,
	         SendBufferSize, SendTimeout,
	         SocketName, MaxTtl, Url,
	         protocol::reqrep::ResendTime,
	         protocol::survey::SurveyTime,
	         transport::tcp::NoDelay,
	         transport::tcp::KeepAlive];
	Sets -> [];
}

impl Drop for Dialer
{
	fn drop(&mut self)
	{
		// Closing the dialer should only ever result in success or ECLOSED and
		// both of those mean that the drop was successful.
		let rv = unsafe { nng_sys::nng_dialer_close(self.handle) };
		assert!(
			rv == 0 || rv == nng_sys::NNG_ECLOSED,
			"Unexpected error code while closing dialer ({})", rv
		);
	}
}

/// Configuration utility for nanomsg-next-generation dialers.
///
/// This object allows for the configuration of dialers before they are
/// started. If it is not necessary to change dialer settings or to close the
/// dialer without closing the socket, then `Socket::dial` provides a simpler
/// interface and does not require tracking an object.
pub struct DialerOptions
{
	/// The underlying dialer object that we are configuring
	handle: nng_sys::nng_dialer,
}
impl DialerOptions
{
	/// Creates a new dialer object associated with the given socket.
	///
	/// Note that this does not start the dialer. In order to start the dialer,
	/// this object must be consumed by `DialerOptions::start`.
	pub fn new(socket: &Socket, url: &str) -> Result<Self>
	{
		// We take a Rust string instead of a c-string because the cost of
		// creating the dialer will far outweigh the cost of allocating a
		// single string. Having a full Rust interface will make it easier to
		// work with.
		let addr = CString::new(url).map_err(|_| ErrorKind::AddressInvalid)?;
		let mut handle = nng_sys::NNG_DIALER_INITIALIZER;
		let rv = unsafe { nng_sys::nng_dialer_create(&mut handle as *mut _, socket.handle(), addr.as_ptr()) };

		rv2res!(rv, DialerOptions { handle })
	}

	/// Cause the dialer to start connecting to the address with which it was created.
	///
	/// Normally, the first attempt to connect to the dialer's address is done
	/// synchronously, including any necessary name resolution. As a result, a
	/// failure, such as if the connection is refused, will be returned
	/// immediately, and no further action will be taken.
	///
	/// However, if `nonblocking` is specified, then the connection attempt is
	/// made asynchronously.
	///
	/// Furthermore, if the connection was closed for a synchronously dialed
	/// connection, the dialer will still attempt to redial asynchronously.
	///
	/// The returned handle controls the life of the dialer. If it is dropped,
	/// the dialer is shut down and no more messages will be received on it.
	pub fn start(self, nonblocking: bool) -> std::result::Result<Dialer, (Self, Error)>
	{
		let flags = if nonblocking { nng_sys::NNG_FLAG_NONBLOCK } else { 0 };

		// If there is an error starting the dialer, we don't want to consume
		// it. Instead, we'll return it to the user and they can decide what to
		// do.
		let rv = unsafe {
			nng_sys::nng_dialer_start(self.handle, flags)
		};

		match rv {
			0 => {
				let handle = Dialer { handle: self.handle };
				std::mem::forget(self);
				Ok(handle)
			},
			e => Err((self, ErrorKind::from_code(e).into())),
		}
	}
}

expose_options!{
	DialerOptions :: handle -> nng_sys::nng_dialer;

	GETOPT_BOOL = nng_sys::nng_dialer_getopt_bool;
	GETOPT_INT = nng_sys::nng_dialer_getopt_int;
	GETOPT_MS = nng_sys::nng_dialer_getopt_ms;
	GETOPT_SIZE = nng_sys::nng_dialer_getopt_size;
	GETOPT_SOCKADDR = nng_sys::nng_dialer_getopt_sockaddr;
	GETOPT_STRING = nng_sys::nng_dialer_getopt_string;

	SETOPT = nng_sys::nng_dialer_setopt;
	SETOPT_BOOL = nng_sys::nng_dialer_setopt_bool;
	SETOPT_INT = nng_sys::nng_dialer_setopt_int;
	SETOPT_MS = nng_sys::nng_dialer_setopt_ms;
	SETOPT_SIZE = nng_sys::nng_dialer_setopt_size;
	SETOPT_STRING = nng_sys::nng_dialer_setopt_string;

	Gets -> [LocalAddr, Raw, ReconnectMinTime,
	         ReconnectMaxTime, RecvBufferSize,
	         RecvMaxSize, RecvTimeout,
	         SendBufferSize, SendTimeout,
	         SocketName, MaxTtl, Url,
	         protocol::reqrep::ResendTime,
	         protocol::survey::SurveyTime,
	         transport::tcp::NoDelay,
	         transport::tcp::KeepAlive];
	Sets -> [ReconnectMinTime, ReconnectMaxTime,
	         RecvMaxSize, transport::tcp::NoDelay,
	         transport::tcp::KeepAlive,
	         transport::tls::CaFile,
	         transport::tls::CertKeyFile,
	         transport::websocket::RequestHeaders];
}

impl Drop for DialerOptions
{
	fn drop(&mut self)
	{
		// Closing the dialer should only ever result in success or ECLOSED and
		// both of those mean that the drop was successful.
		let rv = unsafe { nng_sys::nng_dialer_close(self.handle) };
		assert!(
			rv == 0 || rv == nng_sys::NNG_ECLOSED,
			"Unexpected error code while closing dialer ({})", rv
		);
	}
}