use std::time::Duration;
use std::sync::{Arc, Mutex};
use std::panic::{catch_unwind, RefUnwindSafe};
use std::os::raw::{c_int, c_void};
use std::ptr;
use crate::error::{ErrorKind, Result, SendResult};
use crate::message::Message;
use crate::aio::Context;
pub struct Aio
{
inner: SharedInner,
callback: Option<Box<FnMut() + Send + RefUnwindSafe + 'static>>,
}
impl Aio
{
pub fn new() -> Result<Aio>
{
Ok(Aio { inner: Inner::new()? , callback: None })
}
pub fn with_callback<F>(callback: F) -> Result<Aio>
where F: FnMut(&Aio) + Send + RefUnwindSafe + 'static
{
let (inner, box_cb) = Inner::with_callback(callback)?;
Ok(Aio { inner, callback: Some(box_cb) })
}
pub fn cancel(&self)
{
unsafe {
let l = self.inner.lock().unwrap();
nng_sys::nng_aio_cancel(*l.aio);
}
}
pub fn get_msg(&self) -> Option<Message>
{
let mut l = self.inner.lock().unwrap();
if let State::Inactive(ref mut m) = l.state {
m.take()
} else { None }
}
pub fn result(&self) -> Option<Result<()>>
{
let l = self.inner.lock().unwrap();
match l.state {
State::Sending | State::Receiving | State::Sleeping => None,
State::Inactive(_) => unsafe { Some(rv2res!(nng_sys::nng_aio_result(*l.aio))) },
}
}
pub fn set_timeout(&self, dur: Option<Duration>)
{
let ms = crate::duration_to_nng(dur);
unsafe {
let l = self.inner.lock().unwrap();
nng_sys::nng_aio_set_timeout(*l.aio, ms);
}
}
pub fn wait(&self)
{
if self.callback.is_none() && Arc::strong_count(&self.inner) > 1 {
return;
}
let ptr = {
*self.inner.lock().unwrap().aio
};
unsafe { nng_sys::nng_aio_wait(ptr) }
if self.callback.is_none() {
self.event_update_state();
}
}
pub fn send(&self, ctx: &Context, msg: Message) -> SendResult<()>
{
let mut l = self.inner.lock().unwrap();
if let State::Inactive(_) = l.state {
unsafe {
nng_sys::nng_aio_set_msg(*l.aio, msg.into_ptr());
nng_sys::nng_ctx_send(ctx.handle(), *l.aio);
l.state = State::Sending;
Ok(())
}
} else { Err((msg, ErrorKind::TryAgain.into())) }
}
pub fn recv(&self, ctx: &Context) -> Result<()>
{
let mut l = self.inner.lock().unwrap();
match l.state {
State::Inactive(_) | State::Receiving => unsafe {
nng_sys::nng_ctx_recv(ctx.handle(), *l.aio);
l.state = State::Receiving;
Ok(())
},
_ => Err(ErrorKind::TryAgain.into()),
}
}
pub fn sleep(&self, dur: Duration) -> Result<()>
{
let ms = crate::duration_to_nng(Some(dur));
let mut l = self.inner.lock().unwrap();
if let State::Inactive(_) = l.state {
unsafe { nng_sys::nng_sleep_aio(ms, *l.aio) }
l.state = State::Sleeping;
Ok(())
} else { Err(ErrorKind::TryAgain.into()) }
}
fn event_update_state(&self)
{
assert!(
self.callback.is_none() || Arc::strong_count(&self.inner) == 1,
"Trying to update state on incorrect Aio instance"
);
let mut l = self.inner.lock().unwrap();
let mut old_state = State::Inactive(None);
std::mem::swap(&mut l.state, &mut old_state);
l.state = match old_state {
State::Inactive(m) => State::Inactive(m),
State::Sleeping => State::Inactive(None),
State::Sending => unsafe {
let rv = nng_sys::nng_aio_result(*l.aio);
let msg = if rv != 0 {
Some(Message::from_ptr(nng_sys::nng_aio_get_msg(*l.aio)))
} else { None };
State::Inactive(msg)
},
State::Receiving => unsafe {
let rv = nng_sys::nng_aio_result(*l.aio);
let msg = if rv == 0 {
Some(Message::from_ptr(nng_sys::nng_aio_get_msg(*l.aio)))
} else { None };
State::Inactive(msg)
},
};
}
}
type SharedInner = Arc<Mutex<Inner>>;
struct Inner
{
aio: AioPtr,
state: State,
}
impl Inner
{
fn new() -> Result<SharedInner>
{
let mut aio = ptr::null_mut();
let rv = unsafe { nng_sys::nng_aio_alloc(&mut aio, None, ptr::null_mut()) };
validate_ptr!(rv, aio);
Ok(Arc::new(Mutex::new(Inner { aio: AioPtr(aio), state: State::Inactive(None) })))
}
fn with_callback<F>(mut callback: F) -> Result<(SharedInner, Box<FnMut() + Send + RefUnwindSafe + 'static>)>
where F: FnMut(&Aio) + Send + RefUnwindSafe + 'static
{
let shared_inner = Arc::new(Mutex::new(Inner {
aio: AioPtr(ptr::null_mut()),
state: State::Inactive(None),
}));
let cb_aio = Aio {
inner: shared_inner.clone(),
callback: None,
};
let trampoline = move || {
cb_aio.event_update_state();
callback(&cb_aio)
};
let (rv, box_fn) = unsafe {
let mut l = shared_inner.lock().unwrap();
Inner::aio_alloc_trampoline(&mut *l.aio, trampoline)
};
if rv != 0 {
if !shared_inner.lock().unwrap().aio.is_null() {
std::mem::forget(shared_inner);
}
Err(ErrorKind::from_code(rv).into())
} else {
assert!(!shared_inner.lock().unwrap().aio.is_null(), "Nng returned null pointer from successful function");
Ok((shared_inner, box_fn))
}
}
unsafe fn aio_alloc_trampoline<F>(
aio: *mut *mut nng_sys::nng_aio,
trampoline: F,
) -> (c_int, Box<FnMut() + Send + RefUnwindSafe + 'static>)
where F: FnMut() + Send + RefUnwindSafe + 'static
{
let mut box_fn = Box::new(trampoline);
let rv = nng_sys::nng_aio_alloc(aio, Some(Inner::trampoline::<F>), &mut *box_fn as *mut _ as _);
(rv, box_fn)
}
extern "C" fn trampoline<F>(arg: *mut c_void)
where F: FnMut() + RefUnwindSafe + Send + 'static
{
let res = catch_unwind(|| unsafe {
let callback_ptr = arg as *mut F;
if callback_ptr.is_null() {
panic!("Null argument given to trampoline function");
}
(*callback_ptr)()
});
if let Err(e) = res {
error!("Panic in callback function: {:?}", e);
}
}
}
impl Drop for Inner
{
fn drop(&mut self)
{
if !self.aio.is_null() {
unsafe { nng_sys::nng_aio_free(*self.aio) }
}
}
}
#[repr(transparent)]
struct AioPtr(*mut nng_sys::nng_aio);
impl std::ops::Deref for AioPtr
{
type Target = *mut nng_sys::nng_aio;
fn deref(&self) -> &Self::Target
{
&self.0
}
}
impl std::ops::DerefMut for AioPtr
{
fn deref_mut(&mut self) -> &mut Self::Target
{
&mut self.0
}
}
unsafe impl Send for AioPtr {}
enum State
{
Inactive(Option<Message>),
Sleeping,
Sending,
Receiving,
}