1use super::*;
4use crate::ctx::NngCtx;
5use std::sync::Mutex;
6
7#[derive(Debug, PartialEq)]
8enum ReplyState {
9 Idle,
10 Receiving,
11 Wait,
12 Sending,
13}
14
15#[derive(Debug)]
16struct ReplyContextAioArg {
17 aio: NngAio,
18 ctx: NngCtx,
19 queue: Mutex<WorkQueue>,
20 reply_sender: Option<oneshot::Sender<Result<()>>>,
21 socket: NngSocket,
22 state: ReplyState,
23}
24
25impl ReplyContextAioArg {
26 pub fn new(socket: NngSocket) -> Result<AioArg<Self>> {
27 let ctx = NngCtx::new(socket.clone())?;
28 let queue = Mutex::new(WorkQueue::default());
29 let mut context = NngAio::create(
30 |aio| Self {
31 aio,
32 ctx,
33 queue,
34 reply_sender: None,
35 socket,
36 state: ReplyState::Idle,
37 },
38 reply_callback,
39 )?;
40
41 context.receive();
42 Ok(context)
43 }
44
45 fn receive(&mut self) {
46 if self.state != ReplyState::Idle {
47 panic!();
48 }
49 self.state = ReplyState::Receiving;
50 unsafe {
51 nng_ctx_recv(self.ctx.ctx(), self.aio.nng_aio());
52 }
53 }
54
55 pub fn reply(&mut self, msg: NngMsg, sender: oneshot::Sender<Result<()>>) {
56 if self.state != ReplyState::Wait {
57 panic!();
58 }
59
60 self.reply_sender = Some(sender);
61 unsafe {
62 let aio = self.aio.nng_aio();
63
64 self.state = ReplyState::Sending;
65 nng_aio_set_msg(aio, msg.take());
67 nng_ctx_send(self.ctx.ctx(), aio);
68 }
69 }
70}
71
72impl Aio for ReplyContextAioArg {
73 fn aio(&self) -> &NngAio {
74 &self.aio
75 }
76 fn aio_mut(&mut self) -> &mut NngAio {
77 &mut self.aio
78 }
79}
80
81#[derive(Debug)]
83pub struct ReplyAsyncHandle {
84 aio_arg: AioArg<ReplyContextAioArg>,
85}
86
87impl AsyncContext for ReplyAsyncHandle {
88 fn new(socket: NngSocket) -> Result<Self> {
89 let aio_arg = ReplyContextAioArg::new(socket)?;
90 Ok(Self { aio_arg })
91 }
92}
93
94pub trait ReplyAsync {
96 fn receive(&mut self) -> AsyncMsg;
99 fn reply(&mut self, msg: NngMsg) -> oneshot::Receiver<Result<()>>;
101}
102
103impl ReplyAsync for ReplyAsyncHandle {
104 fn receive(&mut self) -> AsyncMsg {
105 let mut queue = self.aio_arg.queue.lock().unwrap();
106 if let Some(item) = queue.ready.pop_front() {
107 Box::pin(future::ready(item))
108 } else {
109 let (sender, receiver) = oneshot::channel();
110 queue.waiting.push_back(sender);
111 let receiver = receiver.map(result::flatten_result);
112 Box::pin(receiver)
113 }
114 }
115
116 fn reply(&mut self, msg: NngMsg) -> oneshot::Receiver<Result<()>> {
117 let (sender, receiver) = oneshot::channel();
118 self.aio_arg.reply(msg, sender);
119 receiver
120 }
121}
122
123unsafe extern "C" fn reply_callback(arg: AioArgPtr) {
124 let ctx = &mut *(arg as *mut ReplyContextAioArg);
125 let aio_nng = ctx.aio.nng_aio();
126 trace!("reply_callback::{:?}", ctx.state);
127 match ctx.state {
128 ReplyState::Idle => panic!(),
129 ReplyState::Receiving => {
130 let res = nng_int_to_result(nng_aio_result(aio_nng));
131 match res {
132 Err(res) => {
133 match res {
134 Error::Errno(NngErrno::ECLOSED) | Error::Errno(NngErrno::ECANCELED) => {
135 debug!("reply_callback {:?}", res);
136 }
137 _ => {
138 trace!("reply_callback::Err({:?})", res);
139 ctx.receive();
140 }
141 }
142
143 ctx.queue.lock().unwrap().push_back(Err(res));
144 }
145 Ok(()) => {
146 let msg = NngMsg::from_raw(nng_aio_get_msg(aio_nng));
147 ctx.state = ReplyState::Wait;
149 ctx.queue.lock().unwrap().push_back(Ok(msg));
150 }
151 }
152 }
153 ReplyState::Wait => panic!(),
154 ReplyState::Sending => {
155 let res = nng_int_to_result(nng_aio_result(aio_nng));
156 if res.is_err() {
157 let _ = NngMsg::from_raw(nng_aio_get_msg(aio_nng));
159 }
160
161 let sender = ctx.reply_sender.take().unwrap();
162 ctx.state = ReplyState::Idle;
166 ctx.receive();
167 sender.send(res).unwrap();
168 }
169 }
170}