use crate::{dialer::NngDialer, listener::NngListener, *};
use bitflags::bitflags;
use runng_sys::*;
use std::{fmt, result, sync::Arc};
bitflags! {
#[derive(Default)]
pub struct Flags: i32 {
const NONBLOCK = NNG_FLAG_NONBLOCK as i32;
const ALLOC = NNG_FLAG_ALLOC as i32;
}
}
bitflags! {
#[derive(Default)]
pub struct SocketFlags: i32 {
const NONBLOCK = NNG_FLAG_NONBLOCK as i32;
}
}
#[derive(Debug)]
pub struct NngSocket {
socket: Arc<InnerSocket>,
}
impl NngSocket {
pub fn new(socket: nng_socket) -> Self {
let socket = Arc::new(InnerSocket { socket });
NngSocket { socket }
}
pub unsafe fn nng_socket(&self) -> nng_socket {
self.socket.socket
}
#[cfg(feature = "pipes")]
pub fn notify(
&self,
event: nng_pipe_ev,
callback: pipe::PipeNotifyCallback,
argument: pipe::PipeNotifyCallbackArg,
) -> Result<()> {
unsafe {
nng_int_to_result(nng_pipe_notify(
self.nng_socket(),
event,
Some(callback),
argument,
))
}
}
}
impl<T> NngWrapper for T
where
T: Socket,
{
type NngType = nng_socket;
unsafe fn get_nng_type(&self) -> Self::NngType {
self.socket().nng_socket()
}
}
impl GetSocket for NngSocket {
fn socket(&self) -> &NngSocket {
self
}
fn socket_mut(&mut self) -> &mut NngSocket {
self
}
}
impl Socket for NngSocket {}
impl SendSocket for NngSocket {}
impl RecvSocket for NngSocket {}
impl Clone for NngSocket {
fn clone(&self) -> Self {
let socket = self.socket.clone();
Self { socket }
}
}
pub trait GetSocket {
fn socket(&self) -> &NngSocket;
fn socket_mut(&mut self) -> &mut NngSocket;
unsafe fn nng_socket(&self) -> nng_socket {
self.socket().nng_socket()
}
}
pub trait Socket: GetSocket + Sized {
fn with<T>(mut self, setup: T) -> Result<Self>
where
T: FnOnce(&mut Self) -> Result<&mut Self>,
{
setup(&mut self)?;
Ok(self)
}
}
pub trait Listen: Socket {
fn listen(&mut self, url: &str) -> Result<&mut Self> {
self.listen_flags(url, Default::default())
}
fn listen_flags(&mut self, url: &str, flags: SocketFlags) -> Result<&mut Self> {
unsafe {
let (_cstring, ptr) = to_cstr(url)?;
let res = nng_listen(self.nng_socket(), ptr, std::ptr::null_mut(), flags.bits());
Error::zero_map(res, || self)
}
}
fn listener_create(&self, url: &str) -> Result<NngListener> {
NngListener::new(self.socket().clone(), url)
}
}
pub trait Dial: Socket {
fn dial(&mut self, url: &str) -> Result<&mut Self> {
self.dial_flags(url, Default::default())
}
fn dial_flags(&mut self, url: &str, flags: SocketFlags) -> Result<&mut Self> {
unsafe {
let (_cstring, ptr) = to_cstr(url)?;
let res = nng_dial(self.nng_socket(), ptr, std::ptr::null_mut(), flags.bits());
Error::zero_map(res, || self)
}
}
fn dialer_create(&self, url: &str) -> Result<NngDialer> {
NngDialer::new(self.socket().clone(), url)
}
}
#[derive(Debug)]
pub struct SendError<T: fmt::Debug> {
pub error: Error,
pub message: T,
}
impl<T: fmt::Debug> SendError<T> {
pub fn into_inner(self) -> T {
self.message
}
}
impl<T: fmt::Debug> std::error::Error for SendError<T> {}
impl<T: fmt::Debug> fmt::Display for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{} {:?}", self.error, self.message)
}
}
pub trait SendSocket: Socket {
fn send(&self, data: &[u8]) -> Result<()> {
self.send_flags(data, Default::default())
}
fn send_flags(&self, data: &[u8], flags: Flags) -> Result<()> {
unsafe {
let ptr = data.as_ptr() as *mut std::os::raw::c_void;
let res = nng_send(self.nng_socket(), ptr, data.len(), flags.bits());
nng_int_to_result(res)
}
}
fn send_zerocopy(&self, data: mem::Alloc) -> result::Result<(), SendError<mem::Alloc>> {
self.send_zerocopy_flags(data, Flags::ALLOC)
}
fn send_zerocopy_flags(
&self,
data: mem::Alloc,
flags: Flags,
) -> result::Result<(), SendError<mem::Alloc>> {
let flags = (flags | Flags::ALLOC).bits();
unsafe {
let (ptr, size) = data.take();
let res = nng_send(self.nng_socket(), ptr, size, flags);
let error = nng_int_to_result(res);
error.map_err(|error| {
let message = mem::Alloc::from_raw_parts(ptr, size);
SendError { error, message }
})
}
}
fn sendmsg(&self, msg: msg::NngMsg) -> Result<()> {
let res = self.sendmsg_flags(msg, Default::default());
res.map_err(|err| err.error)
}
fn sendmsg_flags(
&self,
msg: msg::NngMsg,
flags: Flags,
) -> result::Result<(), SendError<msg::NngMsg>> {
unsafe {
let ptr = msg.take();
assert!(!ptr.is_null());
let res = nng_sendmsg(self.nng_socket(), ptr, flags.bits());
let error = nng_int_to_result(res);
error.map_err(|error| {
let message = msg::NngMsg::from_raw(ptr);
SendError { error, message }
})
}
}
}
pub trait RecvSocket: Socket {
fn recv<'a>(&self, buffer: &'a mut [u8]) -> Result<&'a [u8]> {
self.recv_flags(buffer, Default::default())
}
fn recv_flags<'a>(&self, buffer: &'a mut [u8], flags: Flags) -> Result<&'a [u8]> {
unsafe {
let ptr = buffer.as_mut_ptr() as *mut core::ffi::c_void;
let mut size = buffer.len();
let res = nng_recv(self.nng_socket(), ptr, &mut size, flags.bits());
let res = nng_int_to_result(res);
res.map(|_| std::slice::from_raw_parts(buffer.as_ptr(), size))
}
}
fn recv_zerocopy(&self) -> Result<mem::Alloc> {
self.recv_zerocopy_flags(Default::default())
}
fn recv_zerocopy_flags(&self, flags: Flags) -> Result<mem::Alloc> {
let flags = (flags | Flags::ALLOC).bits();
unsafe {
let mut ptr: *mut core::ffi::c_void = std::ptr::null_mut();
let mut size: usize = 0;
let ptr_ptr = (&mut ptr) as *mut _ as *mut core::ffi::c_void;
let res = nng_recv(self.nng_socket(), ptr_ptr, &mut size, flags);
let res = nng_int_to_result(res);
res.map(|_| mem::Alloc::from_raw_parts(ptr, size))
}
}
fn recvmsg(&self) -> Result<msg::NngMsg> {
self.recvmsg_flags(Default::default())
}
fn recvmsg_flags(&self, flags: Flags) -> Result<msg::NngMsg> {
unsafe {
let mut recv_ptr: *mut nng_msg = std::ptr::null_mut();
let res = nng_recvmsg(self.nng_socket(), &mut recv_ptr, flags.bits());
Error::zero_map(res, || msg::NngMsg::from_raw(recv_ptr))
}
}
}
#[derive(Debug)]
pub struct UnsafeSocket {
socket: nng_socket,
}
impl UnsafeSocket {
pub fn new(socket: nng_socket) -> Self {
Self { socket }
}
pub fn id(&self) -> i32 {
unsafe { nng_socket_id(self.socket) }
}
}
#[derive(Debug)]
struct InnerSocket {
socket: nng_socket,
}
impl Drop for InnerSocket {
fn drop(&mut self) {
unsafe {
trace!("Socket close: {:?}", self.socket);
let res = nng_int_to_result(nng_close(self.socket));
match res {
Ok(()) => {}
Err(Error::Errno(NngErrno::ECLOSED)) => {}
Err(res) => {
debug!("nng_close {:?}", res);
panic!("nng_close {:?}", res);
}
}
}
}
}