Skip to main content

runng/asyncio/
reply.rs

1//! Async request/reply
2
3use super::*;
4use crate::ctx::NngCtx;
5use std::sync::Mutex;
6
7#[derive(Debug, PartialEq)]
8enum ReplyState {
9    Idle,
10    Receiving,
11    Wait,
12    Sending,
13}
14
15#[derive(Debug)]
16struct ReplyContextAioArg {
17    aio: NngAio,
18    ctx: NngCtx,
19    queue: Mutex<WorkQueue>,
20    reply_sender: Option<oneshot::Sender<Result<()>>>,
21    socket: NngSocket,
22    state: ReplyState,
23}
24
25impl ReplyContextAioArg {
26    pub fn new(socket: NngSocket) -> Result<AioArg<Self>> {
27        let ctx = NngCtx::new(socket.clone())?;
28        let queue = Mutex::new(WorkQueue::default());
29        let mut context = NngAio::create(
30            |aio| Self {
31                aio,
32                ctx,
33                queue,
34                reply_sender: None,
35                socket,
36                state: ReplyState::Idle,
37            },
38            reply_callback,
39        )?;
40
41        context.receive();
42        Ok(context)
43    }
44
45    fn receive(&mut self) {
46        if self.state != ReplyState::Idle {
47            panic!();
48        }
49        self.state = ReplyState::Receiving;
50        unsafe {
51            nng_ctx_recv(self.ctx.ctx(), self.aio.nng_aio());
52        }
53    }
54
55    pub fn reply(&mut self, msg: NngMsg, sender: oneshot::Sender<Result<()>>) {
56        if self.state != ReplyState::Wait {
57            panic!();
58        }
59
60        self.reply_sender = Some(sender);
61        unsafe {
62            let aio = self.aio.nng_aio();
63
64            self.state = ReplyState::Sending;
65            // Nng assumes ownership of the message
66            nng_aio_set_msg(aio, msg.take());
67            nng_ctx_send(self.ctx.ctx(), aio);
68        }
69    }
70}
71
72impl Aio for ReplyContextAioArg {
73    fn aio(&self) -> &NngAio {
74        &self.aio
75    }
76    fn aio_mut(&mut self) -> &mut NngAio {
77        &mut self.aio
78    }
79}
80
81/// Async reply context for request/reply pattern.
82#[derive(Debug)]
83pub struct ReplyAsyncHandle {
84    aio_arg: AioArg<ReplyContextAioArg>,
85}
86
87impl AsyncContext for ReplyAsyncHandle {
88    fn new(socket: NngSocket) -> Result<Self> {
89        let aio_arg = ReplyContextAioArg::new(socket)?;
90        Ok(Self { aio_arg })
91    }
92}
93
94/// Trait for asynchronous contexts that can receive a request and then send a reply.
95pub trait ReplyAsync {
96    // FIXME: Can change this to -> impl Future later?
97    /// Asynchronously receive a request.
98    fn receive(&mut self) -> AsyncMsg;
99    /// Asynchronously reply to previously received request.
100    fn reply(&mut self, msg: NngMsg) -> oneshot::Receiver<Result<()>>;
101}
102
103impl ReplyAsync for ReplyAsyncHandle {
104    fn receive(&mut self) -> AsyncMsg {
105        let mut queue = self.aio_arg.queue.lock().unwrap();
106        if let Some(item) = queue.ready.pop_front() {
107            Box::pin(future::ready(item))
108        } else {
109            let (sender, receiver) = oneshot::channel();
110            queue.waiting.push_back(sender);
111            let receiver = receiver.map(result::flatten_result);
112            Box::pin(receiver)
113        }
114    }
115
116    fn reply(&mut self, msg: NngMsg) -> oneshot::Receiver<Result<()>> {
117        let (sender, receiver) = oneshot::channel();
118        self.aio_arg.reply(msg, sender);
119        receiver
120    }
121}
122
123unsafe extern "C" fn reply_callback(arg: AioArgPtr) {
124    let ctx = &mut *(arg as *mut ReplyContextAioArg);
125    let aio_nng = ctx.aio.nng_aio();
126    trace!("reply_callback::{:?}", ctx.state);
127    match ctx.state {
128        ReplyState::Idle => panic!(),
129        ReplyState::Receiving => {
130            let res = nng_int_to_result(nng_aio_result(aio_nng));
131            match res {
132                Err(res) => {
133                    match res {
134                        Error::Errno(NngErrno::ECLOSED) | Error::Errno(NngErrno::ECANCELED) => {
135                            debug!("reply_callback {:?}", res);
136                        }
137                        _ => {
138                            trace!("reply_callback::Err({:?})", res);
139                            ctx.receive();
140                        }
141                    }
142
143                    ctx.queue.lock().unwrap().push_back(Err(res));
144                }
145                Ok(()) => {
146                    let msg = NngMsg::from_raw(nng_aio_get_msg(aio_nng));
147                    // Reset state before signaling completion
148                    ctx.state = ReplyState::Wait;
149                    ctx.queue.lock().unwrap().push_back(Ok(msg));
150                }
151            }
152        }
153        ReplyState::Wait => panic!(),
154        ReplyState::Sending => {
155            let res = nng_int_to_result(nng_aio_result(aio_nng));
156            if res.is_err() {
157                // Nng requires we resume ownership of the message
158                let _ = NngMsg::from_raw(nng_aio_get_msg(aio_nng));
159            }
160
161            let sender = ctx.reply_sender.take().unwrap();
162            // Reset state and start receiving again before
163            // signaling completion to avoid race condition where we say we're done, but
164            // not yet ready for receive() to be called.
165            ctx.state = ReplyState::Idle;
166            ctx.receive();
167            sender.send(res).unwrap();
168        }
169    }
170}