1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
//! Async read.

use super::*;
use std::sync::Mutex;

#[derive(Debug)]
struct PullAioArg {
    aio: NngAio,
    queue: Mutex<WorkQueue>,
    socket: NngSocket,
}

impl PullAioArg {
    pub fn new(socket: NngSocket) -> Result<AioArg<Self>> {
        let queue = Mutex::new(WorkQueue::default());
        let context = NngAio::create(|aio| Self { aio, queue, socket }, read_callback)?;
        context.receive();
        Ok(context)
    }

    fn receive(&self) {
        unsafe {
            nng_recv_aio(self.socket.nng_socket(), self.aio.nng_aio());
        }
    }
}

impl Aio for PullAioArg {
    fn aio(&self) -> &NngAio {
        &self.aio
    }
    fn aio_mut(&mut self) -> &mut NngAio {
        &mut self.aio
    }
}

/// Async pull context for push/pull pattern.
#[derive(Debug)]
pub struct PullAsyncHandle {
    aio_arg: AioArg<PullAioArg>,
}

impl AsyncContext for PullAsyncHandle {
    fn new(socket: NngSocket) -> Result<Self> {
        let aio_arg = PullAioArg::new(socket)?;
        Ok(Self { aio_arg })
    }
}

pub trait ReadAsync {
    fn receive(&mut self) -> AsyncMsg;
}

impl ReadAsync for PullAsyncHandle {
    fn receive(&mut self) -> AsyncMsg {
        let mut queue = self.aio_arg.queue.lock().unwrap();
        queue.pop_front()
    }
}

#[derive(Debug)]
pub struct SubAioArg {
    aio: NngAio,
    ctx: NngCtx,
    queue: Mutex<WorkQueue>,
    socket: NngSocket,
}

impl SubAioArg {
    pub fn new(socket: NngSocket) -> Result<AioArg<Self>> {
        let ctx = NngCtx::new(socket.clone())?;
        let queue = Mutex::new(WorkQueue::default());
        let context = NngAio::create(
            |aio| Self {
                aio,
                ctx,
                queue,
                socket,
            },
            read_callback,
        )?;
        context.receive();
        Ok(context)
    }

    fn receive(&self) {
        unsafe {
            nng_recv_aio(self.socket.nng_socket(), self.aio.nng_aio());
        }
    }
}

impl Aio for SubAioArg {
    fn aio(&self) -> &NngAio {
        &self.aio
    }
    fn aio_mut(&mut self) -> &mut NngAio {
        &mut self.aio
    }
}

/// Asynchronous context for subscribe socket.
#[derive(Debug)]
pub struct SubscribeAsyncHandle {
    aio_arg: AioArg<SubAioArg>,
}

impl AsyncContext for SubscribeAsyncHandle {
    fn new(socket: NngSocket) -> Result<Self> {
        let aio_arg = SubAioArg::new(socket)?;
        Ok(Self { aio_arg })
    }
}

impl ReadAsync for SubscribeAsyncHandle {
    fn receive(&mut self) -> AsyncMsg {
        let mut queue = self.aio_arg.queue.lock().unwrap();
        queue.pop_front()
    }
}

unsafe extern "C" fn read_callback(arg: AioArgPtr) {
    let ctx = &mut *(arg as *mut PullAioArg);
    let aio = ctx.aio.nng_aio();
    let aio_res = nng_aio_result(aio);
    let res = nng_int_to_result(aio_res);
    trace!("read_callback::{:?}", res);
    match res {
        Err(res) => {
            match res {
                // nng_aio_close() calls nng_aio_stop which nng_aio_abort(NNG_ECANCELED) and waits.
                // If we call start_receive() it will fail with ECANCELED and we infinite loop...
                Error::Errno(NngErrno::ECLOSED) | Error::Errno(NngErrno::ECANCELED) => {
                    debug!("read_callback {:?}", res);
                }
                _ => {
                    trace!("read_callback::Err({:?})", res);
                    ctx.receive();
                }
            }
            ctx.queue.lock().unwrap().push_back(Err(res));
        }
        Ok(()) => {
            let msg = NngMsg::from_raw(nng_aio_get_msg(aio));
            ctx.queue.lock().unwrap().push_back(Ok(msg));
            // Don't start next read until after notifying this one is complete.
            ctx.receive();
        }
    }
}