1use std::future::Future;
2use std::task::{Poll, Context};
3use std::sync::Arc;
4use std::pin::Pin;
5
6use crate::QoS;
7use crate::shared::{ClientShared, NotifierMap, SubscriptionState, NotifyResult};
8
9pub(super) trait NotifierMapAccessor: Unpin
11{
12 fn access_notifier_map(&self) -> &NotifierMap;
13}
14
15macro_rules! def_notifier_acessors {
16 ($($name:ident => |$var:ident| $expr:expr),+) => {
17 $(
18 pub struct $name(pub(super) Arc<ClientShared>);
19
20 impl NotifierMapAccessor for $name
21 {
22 fn access_notifier_map(&self) -> &NotifierMap { let $var = &self.0; $expr }
23 }
24 )+
25 };
26}
27
28def_notifier_acessors! {
29 AckNotifierMapAccessor => |shared| &shared.notify_ack,
30 RecNotifierMapAccessor => |shared| &shared.notify_rec,
31 CompNotifierMapAccessor => |shared| &shared.notify_comp
32}
33
34pub(super) struct PublishFuture<NMA: NotifierMapAccessor>
41{
42 packet_id: u16,
43 notifier: NMA,
44 result: Option<bool>
45}
46
47impl<NMA: NotifierMapAccessor> PublishFuture<NMA>
48{
49 pub fn new(packet_id: u16, notifier: NMA) -> Self
53 {
54 notifier.access_notifier_map().lock().insert(packet_id, NotifyResult::WithoutWaker);
55
56 Self {
57 packet_id,
58 notifier,
59 result: None
60 }
61 }
62}
63
64impl<NMA: NotifierMapAccessor> Future for PublishFuture<NMA>
65{
66 type Output = bool;
67
68 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool>
69 {
70 if let Some(ret) = self.result {
71 return Poll::Ready(ret);
72 }
73
74 let mut map = self.notifier.access_notifier_map().lock();
75
76 match map.get_mut(&self.packet_id) {
77 Some(entry) => match entry {
78 NotifyResult::Failed => {
79 drop(map);
80 self.as_mut().result = Some(true);
81 Poll::Ready(true)
82 },
83 _ => {
84 *entry = NotifyResult::WithWaker(cx.waker().clone());
85 Poll::Pending
86 }
87 },
88 None => {
89 drop(map);
90 self.as_mut().result = Some(false);
91 Poll::Ready(false)
92 }
93 }
94 }
95}
96
97impl<NMA: NotifierMapAccessor> Drop for PublishFuture<NMA>
98{
99 fn drop(&mut self)
100 {
101 if self.result.is_none() {
102 self.notifier.access_notifier_map().lock().remove(&self.packet_id);
103 }
104 }
105}
106
107pub(super) struct SubscribeFuture<'a>
112{
113 client_shared: &'a ClientShared,
114 topic: &'a str,
115 waker_index: Option<usize>,
116 result: Option<Result<QoS, ()>>
117}
118
119impl<'a> SubscribeFuture<'a>
120{
121 pub(super) fn new(client_shared: &'a ClientShared, topic: &'a str) -> Self
126 {
127 Self {
128 client_shared, topic,
129 waker_index: None,
130 result: None
131 }
132 }
133}
134
135impl<'a> Future for SubscribeFuture<'a>
136{
137 type Output = Result<QoS, ()>;
138
139 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<QoS, ()>>
140 {
141 if let Some(result) = self.result {
142 return Poll::Ready(result);
143 }
144
145 let mut sub_map = self.client_shared.subs.lock();
146
147 let sub_state = match sub_map.get_mut(self.topic) {
148 Some(x) => x,
149 None => {
150 drop(sub_map);
152 self.as_mut().result = Some(Err(()));
153 return Poll::Ready(Err(()));
154 }
155 };
156
157 match sub_state {
158 SubscriptionState::Existing(qos) => {
159 let qos = *qos;
161 drop(sub_map);
162 self.as_mut().result = Some(Ok(qos));
163
164 Poll::Ready(Ok(qos))
165 },
166 SubscriptionState::Pending(wakers) => {
167 let waker = Some(cx.waker().clone());
168
169 if let Some(i) = self.waker_index {
170 wakers[i] = waker;
171 } else {
172 let i = wakers.len();
173 wakers.push(waker);
174
175 drop(sub_map);
176 self.as_mut().waker_index = Some(i);
177 }
178
179 Poll::Pending
180 }
181 }
182 }
183}
184
185impl<'a> Drop for SubscribeFuture<'a>
186{
187 fn drop(&mut self)
188 {
189 if self.result.is_none() {
190 if let Some(i) = self.waker_index {
191 match self.client_shared.subs.lock().get_mut(self.topic) {
192 Some(SubscriptionState::Pending(wakers)) => wakers[i] = None,
193 _ => {}
194 }
195 }
196 }
197 }
198}