use super::*;
use log::debug;
#[derive(Debug, PartialEq)]
enum PushState {
Ready,
Sending,
}
#[derive(Debug)]
struct PushContextAioArg {
aio: NngAio,
state: PushState,
sender: Option<oneshot::Sender<Result<()>>>,
socket: NngSocket,
}
impl PushContextAioArg {
pub fn new(socket: NngSocket) -> Result<AioArg<Self>> {
NngAio::create(
|aio| Self {
aio,
state: PushState::Ready,
sender: None,
socket,
},
publish_callback,
)
}
pub fn send(&mut self, msg: NngMsg, sender: oneshot::Sender<Result<()>>) {
if self.state != PushState::Ready {
panic!();
}
self.sender = Some(sender);
unsafe {
self.state = PushState::Sending;
let msg = msg.take();
if msg.is_null() {
panic!();
}
let nng_aio = self.aio.nng_aio();
nng_aio_set_msg(nng_aio, msg);
nng_send_aio(self.socket.nng_socket(), nng_aio);
}
}
}
impl Aio for PushContextAioArg {
fn aio(&self) -> &NngAio {
&self.aio
}
fn aio_mut(&mut self) -> &mut NngAio {
&mut self.aio
}
}
#[derive(Debug)]
pub struct PushAsyncHandle {
aio_arg: AioArg<PushContextAioArg>,
}
impl AsyncContext for PushAsyncHandle {
fn new(socket: NngSocket) -> Result<Self> {
let aio_arg = PushContextAioArg::new(socket)?;
Ok(Self { aio_arg })
}
}
pub trait AsyncPush {
fn send(&mut self, msg: NngMsg) -> AsyncUnit;
}
impl AsyncPush for PushAsyncHandle {
fn send(&mut self, msg: NngMsg) -> AsyncUnit {
let (sender, receiver) = oneshot::channel::<Result<()>>();
self.aio_arg.send(msg, sender);
Box::pin(receiver.map(result::flatten_result))
}
}
unsafe extern "C" fn publish_callback(arg: AioArgPtr) {
let ctx = &mut *(arg as *mut PushContextAioArg);
trace!("callback Push:{:?}", ctx.state);
match ctx.state {
PushState::Ready => panic!(),
PushState::Sending => {
let nng_aio = ctx.aio.nng_aio();
let res = nng_int_to_result(nng_aio_result(nng_aio));
if let Err(ref err) = res {
debug!("Push failed: {:?}", err);
let _ = NngMsg::from_raw(nng_aio_get_msg(nng_aio));
}
ctx.state = PushState::Ready;
let res = ctx.sender.take().unwrap().send(res);
if let Err(ref err) = res {
debug!("Send finish failed: {:?}", err);
}
}
}
}