lmc/
futures.rs

1use std::future::Future;
2use std::task::{Poll, Context};
3use std::sync::Arc;
4use std::pin::Pin;
5
6use crate::QoS;
7use crate::shared::{ClientShared, NotifierMap, SubscriptionState, NotifyResult};
8
9/// A trait used to statically index a [`NotifierMap`] in [`ClientShared`]
10pub(super) trait NotifierMapAccessor: Unpin
11{
12    fn access_notifier_map(&self) -> &NotifierMap;
13}
14
15macro_rules! def_notifier_acessors {
16    ($($name:ident => |$var:ident| $expr:expr),+) => {
17        $(
18            pub struct $name(pub(super) Arc<ClientShared>);
19
20            impl NotifierMapAccessor for $name
21            {
22                fn access_notifier_map(&self) -> &NotifierMap { let $var = &self.0; $expr }
23            }
24        )+
25    };
26}
27
28def_notifier_acessors! {
29    AckNotifierMapAccessor  => |shared| &shared.notify_ack,
30    RecNotifierMapAccessor  => |shared| &shared.notify_rec,
31    CompNotifierMapAccessor => |shared| &shared.notify_comp
32}
33
34/// A future that can be used to await any of the message publish stages:
35/// - `PUBACK` using [`AckNotifierMapAccessor`]
36/// - `PUBREC` using [`RecNotifierMapAccessor`]
37/// - `PUBCOMP` using [`CompNotifierMapAccessor`]
38/// 
39/// Can be cancelled safely.
40pub(super) struct PublishFuture<NMA: NotifierMapAccessor>
41{
42    packet_id: u16,
43    notifier: NMA,
44    result: Option<bool>
45}
46
47impl<NMA: NotifierMapAccessor> PublishFuture<NMA>
48{
49    /// Instantiates a [`PublishFuture`] for the specified packet ID and
50    /// publish stage. Automatically registers itself into the corresponding
51    /// [`NotifierMap`].
52    pub fn new(packet_id: u16, notifier: NMA) -> Self
53    {
54        notifier.access_notifier_map().lock().insert(packet_id, NotifyResult::WithoutWaker);
55
56        Self {
57            packet_id,
58            notifier,
59            result: None
60        }
61    }
62}
63
64impl<NMA: NotifierMapAccessor> Future for PublishFuture<NMA>
65{
66    type Output = bool;
67
68    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool>
69    {
70        if let Some(ret) = self.result {
71            return Poll::Ready(ret);
72        }
73
74        let mut map = self.notifier.access_notifier_map().lock();
75
76        match map.get_mut(&self.packet_id) {
77            Some(entry) => match entry {
78                NotifyResult::Failed => {
79                    drop(map);
80                    self.as_mut().result = Some(true);
81                    Poll::Ready(true)
82                },
83                _ => {
84                    *entry = NotifyResult::WithWaker(cx.waker().clone());
85                    Poll::Pending
86                }
87            },
88            None => {
89                drop(map);
90                self.as_mut().result = Some(false);
91                Poll::Ready(false)
92            }
93        }
94    }
95}
96
97impl<NMA: NotifierMapAccessor> Drop for PublishFuture<NMA>
98{
99    fn drop(&mut self)
100    {
101        if self.result.is_none() {
102            self.notifier.access_notifier_map().lock().remove(&self.packet_id);
103        }
104    }
105}
106
107/// A future that can be used to await a `SUBACK` packet for the
108/// specified topic.
109/// 
110/// Can be cancelled safely.
111pub(super) struct SubscribeFuture<'a>
112{
113    client_shared: &'a ClientShared,
114    topic: &'a str,
115    waker_index: Option<usize>,
116    result: Option<Result<QoS, ()>>
117}
118
119impl<'a> SubscribeFuture<'a>
120{
121    /// Instantiates a [`SubscribeFuture`] for the specified topic. Note that
122    /// an entry for the corresponding topic in the subscription map should
123    /// exist prior to a call to this function or at least, prior to awaiting
124    /// the returned future.
125    pub(super) fn new(client_shared: &'a ClientShared, topic: &'a str) -> Self
126    {
127        Self {
128            client_shared, topic,
129            waker_index: None,
130            result: None
131        }
132    }
133}
134
135impl<'a> Future for SubscribeFuture<'a>
136{
137    type Output = Result<QoS, ()>;
138
139    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<QoS, ()>>
140    {
141        if let Some(result) = self.result {
142            return Poll::Ready(result);
143        }
144
145        let mut sub_map = self.client_shared.subs.lock();
146
147        let sub_state = match sub_map.get_mut(self.topic) {
148            Some(x) => x,
149            None => {
150                //Subscription failed
151                drop(sub_map);
152                self.as_mut().result = Some(Err(()));
153                return Poll::Ready(Err(()));
154            }
155        };
156
157        match sub_state {
158            SubscriptionState::Existing(qos) => {
159                //Subscription succeeded
160                let qos = *qos;
161                drop(sub_map);
162                self.as_mut().result = Some(Ok(qos));
163
164                Poll::Ready(Ok(qos))
165            },
166            SubscriptionState::Pending(wakers) => {
167                let waker = Some(cx.waker().clone());
168
169                if let Some(i) = self.waker_index {
170                    wakers[i] = waker;
171                } else {
172                    let i = wakers.len();
173                    wakers.push(waker);
174
175                    drop(sub_map);
176                    self.as_mut().waker_index = Some(i);
177                }
178
179                Poll::Pending
180            }
181        }
182    }
183}
184
185impl<'a> Drop for SubscribeFuture<'a>
186{
187    fn drop(&mut self)
188    {
189        if self.result.is_none() {
190            if let Some(i) = self.waker_index {
191                match self.client_shared.subs.lock().get_mut(self.topic) {
192                    Some(SubscriptionState::Pending(wakers)) => wakers[i] = None,
193                    _ => {}
194                }
195            }
196        }
197    }
198}