use super::*;
use crate::protocol::*;
#[derive(Debug, PartialEq)]
enum PullState {
Ready,
Receiving,
}
#[derive(Debug)]
struct PullContextAioArg {
aio: NngAio,
state: PullState,
sender: mpsc::Sender<Result<NngMsg>>,
socket: NngSocket,
}
impl PullContextAioArg {
pub fn new(socket: NngSocket, sender: mpsc::Sender<Result<NngMsg>>) -> Result<AioArg<Self>> {
NngAio::create(
|aio| Self {
aio,
state: PullState::Ready,
sender,
socket,
},
pull_callback,
)
}
pub(crate) fn start_receive(&mut self) {
self.state = PullState::Receiving;
unsafe {
nng_recv_aio(self.socket.nng_socket(), self.aio.nng_aio());
}
}
}
impl Aio for PullContextAioArg {
fn aio(&self) -> &NngAio {
&self.aio
}
fn aio_mut(&mut self) -> &mut NngAio {
&mut self.aio
}
}
#[derive(Debug)]
pub struct PullAsyncStream {
aio_arg: AioArg<PullContextAioArg>,
receiver: Option<mpsc::Receiver<Result<NngMsg>>>,
}
impl AsyncStreamContext for PullAsyncStream {
fn new(socket: NngSocket, buffer: usize) -> Result<Self> {
let (sender, receiver) = mpsc::channel::<Result<NngMsg>>(buffer);
let aio_arg = PullContextAioArg::new(socket, sender)?;
let receiver = Some(receiver);
Ok(Self { aio_arg, receiver })
}
}
pub trait AsyncPull {
fn receive(&mut self) -> Option<mpsc::Receiver<Result<NngMsg>>>;
}
impl AsyncPull for PullAsyncStream {
fn receive(&mut self) -> Option<mpsc::Receiver<Result<NngMsg>>> {
let receiver = self.receiver.take();
if receiver.is_some() {
self.aio_arg.start_receive();
}
receiver
}
}
unsafe extern "C" fn pull_callback(arg: AioArgPtr) {
let ctx = &mut *(arg as *mut PullContextAioArg);
trace!("pull_callback::{:?}", ctx.state);
match ctx.state {
PullState::Ready => panic!(),
PullState::Receiving => {
let aio = ctx.aio.nng_aio();
let aio_res = nng_aio_result(aio);
let res = nng_int_to_result(aio_res);
match res {
Err(res) => {
match res {
Error::Errno(NngErrno::ECLOSED) | Error::Errno(NngErrno::ECANCELED) => {
debug!("pull_callback {:?}", res);
}
_ => {
trace!("pull_callback::Err({:?})", res);
ctx.start_receive();
}
}
try_signal_complete(&mut ctx.sender, Err(res));
}
Ok(()) => {
let msg = NngMsg::from_raw(nng_aio_get_msg(aio));
ctx.start_receive();
try_signal_complete(&mut ctx.sender, Ok(msg));
}
}
}
}
}
#[derive(Debug)]
pub struct SubscribeAsyncStream {
ctx: PullAsyncStream,
}
impl AsyncPull for SubscribeAsyncStream {
fn receive(&mut self) -> Option<mpsc::Receiver<Result<NngMsg>>> {
self.ctx.receive()
}
}
impl AsyncStreamContext for SubscribeAsyncStream {
fn new(socket: NngSocket, buffer: usize) -> Result<Self> {
let ctx = PullAsyncStream::new(socket, buffer)?;
let ctx = Self { ctx };
Ok(ctx)
}
}
impl InternalSocket for SubscribeAsyncStream {
fn socket(&self) -> &NngSocket {
&self.ctx.aio_arg.socket
}
}
impl Subscribe for SubscribeAsyncStream {
fn subscribe(&self, topic: &[u8]) -> Result<()> {
unsafe { subscribe(self.socket().nng_socket(), topic) }
}
fn unsubscribe(&self, topic: &[u8]) -> Result<()> {
unsafe { unsubscribe(self.socket().nng_socket(), topic) }
}
}