use std::{
cmp::{Eq, Ordering, PartialEq, PartialOrd},
convert::TryFrom,
error,
ffi::CString,
fmt,
hash::{Hash, Hasher},
num::NonZeroU32,
os::raw::{c_int, c_void},
ptr,
sync::{Arc, RwLock},
};
use crate::{
aio::Aio,
error::{Error, Result, SendResult},
message::Message,
pipe::{Pipe, PipeEvent},
protocol::Protocol,
util::{abort_unwind, validate_ptr},
};
type PipeNotifyFn = dyn Fn(Pipe, PipeEvent) + Send + Sync + 'static;
#[derive(Clone, Debug)]
pub struct Socket
{
inner: Arc<Inner>,
}
impl Socket
{
pub fn new(t: Protocol) -> Result<Socket>
{
let mut socket = nng_sys::nng_socket::NNG_SOCKET_INITIALIZER;
let rv = unsafe {
match t {
Protocol::Bus0 => nng_sys::nng_bus0_open(&mut socket as *mut _),
Protocol::Pair0 => nng_sys::nng_pair0_open(&mut socket as *mut _),
Protocol::Pair1 => nng_sys::nng_pair1_open(&mut socket as *mut _),
Protocol::Pub0 => nng_sys::nng_pub0_open(&mut socket as *mut _),
Protocol::Pull0 => nng_sys::nng_pull0_open(&mut socket as *mut _),
Protocol::Push0 => nng_sys::nng_push0_open(&mut socket as *mut _),
Protocol::Rep0 => nng_sys::nng_rep0_open(&mut socket as *mut _),
Protocol::Req0 => nng_sys::nng_req0_open(&mut socket as *mut _),
Protocol::Respondent0 => nng_sys::nng_respondent0_open(&mut socket as *mut _),
Protocol::Sub0 => nng_sys::nng_sub0_open(&mut socket as *mut _),
Protocol::Surveyor0 => nng_sys::nng_surveyor0_open(&mut socket as *mut _),
}
};
rv2res!(rv, Socket {
inner: Arc::new(Inner { handle: socket, pipe_notify: RwLock::new(None) }),
})
}
pub fn dial(&self, url: &str) -> Result<()>
{
let addr = CString::new(url).map_err(|_| Error::AddressInvalid)?;
let rv = unsafe { nng_sys::nng_dial(self.inner.handle, addr.as_ptr(), ptr::null_mut(), 0) };
rv2res!(rv)
}
pub fn listen(&self, url: &str) -> Result<()>
{
let addr = CString::new(url).map_err(|_| Error::AddressInvalid)?;
let rv =
unsafe { nng_sys::nng_listen(self.inner.handle, addr.as_ptr(), ptr::null_mut(), 0) };
rv2res!(rv)
}
pub fn dial_async(&self, url: &str) -> Result<()>
{
let addr = CString::new(url).map_err(|_| Error::AddressInvalid)?;
let flags = nng_sys::NNG_FLAG_NONBLOCK as c_int;
let rv =
unsafe { nng_sys::nng_dial(self.inner.handle, addr.as_ptr(), ptr::null_mut(), flags) };
rv2res!(rv)
}
#[doc(hidden)]
#[deprecated(since = "1.0.0-rc.1", note = "This is equivalent to `Socket::listen`")]
pub fn listen_async(&self, url: &str) -> Result<()> { self.listen(url) }
pub fn recv(&self) -> Result<Message>
{
let mut msgp: *mut nng_sys::nng_msg = ptr::null_mut();
let rv = unsafe { nng_sys::nng_recvmsg(self.inner.handle, &mut msgp as _, 0) };
let msgp = validate_ptr(rv, msgp)?;
Ok(Message::from_ptr(msgp))
}
pub fn send<M: Into<Message>>(&self, msg: M) -> SendResult<()>
{
let msg = msg.into();
unsafe {
let msgp = msg.into_ptr();
let rv = nng_sys::nng_sendmsg(self.inner.handle, msgp.as_ptr(), 0);
if let Some(e) = NonZeroU32::new(rv as u32) {
Err((Message::from_ptr(msgp), Error::from(e)))
}
else {
Ok(())
}
}
}
pub fn try_recv(&self) -> Result<Message>
{
let mut msgp: *mut nng_sys::nng_msg = ptr::null_mut();
let flags = nng_sys::NNG_FLAG_NONBLOCK as c_int;
let rv = unsafe { nng_sys::nng_recvmsg(self.inner.handle, &mut msgp as _, flags) };
let msgp = validate_ptr(rv, msgp)?;
Ok(Message::from_ptr(msgp))
}
pub fn try_send<M: Into<Message>>(&self, msg: M) -> SendResult<()>
{
let msg = msg.into();
let flags = nng_sys::NNG_FLAG_NONBLOCK as c_int;
unsafe {
let msgp = msg.into_ptr();
let rv = nng_sys::nng_sendmsg(self.inner.handle, msgp.as_ptr(), flags);
if let Some(e) = NonZeroU32::new(rv as u32) {
Err((Message::from_ptr(msgp), Error::from(e)))
}
else {
Ok(())
}
}
}
pub fn recv_async(&self, aio: &Aio) -> Result<()> { aio.recv_socket(self) }
pub fn send_async<M: Into<Message>>(&self, aio: &Aio, msg: M) -> SendResult<()>
{
let msg = msg.into();
aio.send_socket(self, msg)
}
pub fn pipe_notify<F>(&self, callback: F) -> Result<()>
where
F: Fn(Pipe, PipeEvent) + Send + Sync + 'static,
{
{
let mut l = self.inner.pipe_notify.write().unwrap();
*l = Some(Box::new(callback));
}
let events = [
nng_sys::NNG_PIPE_EV_ADD_PRE,
nng_sys::NNG_PIPE_EV_ADD_POST,
nng_sys::NNG_PIPE_EV_REM_POST,
];
events
.iter()
.map(|&ev| unsafe {
nng_sys::nng_pipe_notify(
self.inner.handle,
ev,
Some(Self::trampoline),
&*self.inner as *const _ as _,
)
})
.map(|rv| rv2res!(rv))
.fold(Ok(()), std::result::Result::and)
}
#[doc(hidden)]
#[deprecated(since = "1.0.0-rc.1", note = "Use `TryFrom` instead")]
pub fn into_raw(self) -> Option<RawSocket> { RawSocket::try_from(self).ok() }
pub fn close(&self) { self.inner.close() }
pub(crate) fn handle(&self) -> nng_sys::nng_socket { self.inner.handle }
unsafe extern "C" fn trampoline(
pipe: nng_sys::nng_pipe,
ev: nng_sys::nng_pipe_ev,
arg: *mut c_void,
)
{
abort_unwind(|| {
let pipe = Pipe::from_nng_sys(pipe);
let ev = PipeEvent::from_code(ev);
assert!(!arg.is_null(), "Null pointer passed as argument to trampoline");
let inner = &*(arg as *const _ as *const Inner);
if let Some(callback) = &*inner.pipe_notify.read().unwrap() {
(*callback)(pipe, ev);
}
});
}
}
#[cfg(feature = "ffi-module")]
impl Socket
{
pub fn nng_socket(&self) -> nng_sys::nng_socket { self.inner.handle }
}
impl PartialEq for Socket
{
fn eq(&self, other: &Socket) -> bool
{
unsafe {
nng_sys::nng_socket_id(self.inner.handle) == nng_sys::nng_socket_id(other.inner.handle)
}
}
}
impl Eq for Socket {}
impl PartialOrd for Socket
{
fn partial_cmp(&self, other: &Socket) -> Option<Ordering> { Some(self.cmp(other)) }
}
impl Ord for Socket
{
fn cmp(&self, other: &Socket) -> Ordering
{
unsafe {
let us = nng_sys::nng_socket_id(self.inner.handle);
let them = nng_sys::nng_socket_id(other.inner.handle);
us.cmp(&them)
}
}
}
impl Hash for Socket
{
fn hash<H: Hasher>(&self, state: &mut H)
{
let id = unsafe { nng_sys::nng_socket_id(self.inner.handle) };
id.hash(state);
}
}
#[rustfmt::skip]
expose_options!{
Socket :: inner.handle -> nng_sys::nng_socket;
GETOPT_BOOL = nng_sys::nng_socket_get_bool;
GETOPT_INT = nng_sys::nng_socket_get_int;
GETOPT_MS = nng_sys::nng_socket_get_ms;
GETOPT_SIZE = nng_sys::nng_socket_get_size;
GETOPT_SOCKADDR = nng_sys::nng_socket_get_addr;
GETOPT_STRING = nng_sys::nng_socket_get_string;
GETOPT_UINT64 = nng_sys::nng_socket_get_uint64;
SETOPT = nng_sys::nng_socket_set;
SETOPT_BOOL = nng_sys::nng_socket_set_bool;
SETOPT_INT = nng_sys::nng_socket_set_int;
SETOPT_MS = nng_sys::nng_socket_set_ms;
SETOPT_PTR = nng_sys::nng_socket_set_ptr;
SETOPT_SIZE = nng_sys::nng_socket_set_size;
SETOPT_STRING = nng_sys::nng_socket_set_string;
Gets -> [Raw, MaxTtl, RecvBufferSize,
RecvTimeout, SendBufferSize,
SendTimeout, SocketName,
protocol::pair::Polyamorous,
protocol::reqrep::ResendTime,
protocol::survey::SurveyTime];
Sets -> [ReconnectMinTime, ReconnectMaxTime,
RecvBufferSize, RecvMaxSize,
RecvTimeout, SendBufferSize,
SendTimeout, SocketName, MaxTtl,
protocol::pair::Polyamorous,
protocol::reqrep::ResendTime,
protocol::pubsub::Subscribe,
protocol::pubsub::Unsubscribe,
protocol::survey::SurveyTime,
transport::tcp::NoDelay,
transport::tcp::KeepAlive,
transport::tls::CaFile,
transport::tls::CertKeyFile,
transport::websocket::RequestHeaders,
transport::websocket::ResponseHeaders];
}
#[cfg(unix)]
mod unix_impls
{
use super::*;
use crate::options::{GetOpt, RecvFd, SendFd};
impl GetOpt<RecvFd> for Socket {}
impl GetOpt<SendFd> for Socket {}
}
struct Inner
{
handle: nng_sys::nng_socket,
pipe_notify: RwLock<Option<Box<PipeNotifyFn>>>,
}
impl Inner
{
fn close(&self)
{
let rv = unsafe { nng_sys::nng_close(self.handle) };
assert!(
rv == 0 || rv == nng_sys::NNG_ECLOSED as i32,
"Unexpected error code while closing socket ({})",
rv
);
}
}
impl fmt::Debug for Inner
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result
{
f.debug_struct("Inner")
.field("handle", &self.handle)
.field("pipe_notify", &self.pipe_notify.read().unwrap().is_some())
.finish()
}
}
impl Drop for Inner
{
fn drop(&mut self) { self.close() }
}
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct RawSocket
{
pub socket: Socket,
_priv: (),
}
impl RawSocket
{
pub fn new(t: Protocol) -> Result<RawSocket>
{
let mut socket = nng_sys::nng_socket::NNG_SOCKET_INITIALIZER;
let rv = unsafe {
match t {
Protocol::Bus0 => nng_sys::nng_bus0_open_raw(&mut socket as *mut _),
Protocol::Pair0 => nng_sys::nng_pair0_open_raw(&mut socket as *mut _),
Protocol::Pair1 => nng_sys::nng_pair1_open_raw(&mut socket as *mut _),
Protocol::Pub0 => nng_sys::nng_pub0_open_raw(&mut socket as *mut _),
Protocol::Pull0 => nng_sys::nng_pull0_open_raw(&mut socket as *mut _),
Protocol::Push0 => nng_sys::nng_push0_open_raw(&mut socket as *mut _),
Protocol::Rep0 => nng_sys::nng_rep0_open_raw(&mut socket as *mut _),
Protocol::Req0 => nng_sys::nng_req0_open_raw(&mut socket as *mut _),
Protocol::Respondent0 => nng_sys::nng_respondent0_open_raw(&mut socket as *mut _),
Protocol::Sub0 => nng_sys::nng_sub0_open_raw(&mut socket as *mut _),
Protocol::Surveyor0 => nng_sys::nng_surveyor0_open_raw(&mut socket as *mut _),
}
};
if let Some(e) = NonZeroU32::new(rv as u32) {
return Err(Error::from(e));
}
let socket =
Socket { inner: Arc::new(Inner { handle: socket, pipe_notify: RwLock::new(None) }) };
Ok(RawSocket { socket, _priv: () })
}
}
impl TryFrom<Socket> for RawSocket
{
type Error = CookedSocketError;
fn try_from(socket: Socket) -> std::result::Result<Self, Self::Error>
{
use crate::options::{Options, Raw};
if socket.get_opt::<Raw>().expect("Socket should have \"raw\" option available") {
Ok(RawSocket { socket, _priv: () })
}
else {
Err(CookedSocketError)
}
}
}
#[derive(Debug)]
pub struct CookedSocketError;
impl fmt::Display for CookedSocketError
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result
{
write!(f, "Socket is in \"cooked\" (not \"raw\") mode")
}
}
impl error::Error for CookedSocketError
{
fn description(&self) -> &str { "Socket is in \"cooked\" (not \"raw\") mode" }
}