use std::future::Future;
use std::task::{Poll, Context};
use std::sync::Arc;
use std::pin::Pin;
use crate::QoS;
use crate::shared::{ClientShared, NotifierMap, SubscriptionState, NotifyResult};
pub(super) trait NotifierMapAccessor: Unpin
{
fn access_notifier_map(&self) -> &NotifierMap;
}
macro_rules! def_notifier_acessors {
($($name:ident => |$var:ident| $expr:expr),+) => {
$(
pub struct $name(pub(super) Arc<ClientShared>);
impl NotifierMapAccessor for $name
{
fn access_notifier_map(&self) -> &NotifierMap { let $var = &self.0; $expr }
}
)+
};
}
def_notifier_acessors! {
AckNotifierMapAccessor => |shared| &shared.notify_ack,
RecNotifierMapAccessor => |shared| &shared.notify_rec,
CompNotifierMapAccessor => |shared| &shared.notify_comp
}
pub(super) struct PublishFuture<NMA: NotifierMapAccessor>
{
packet_id: u16,
notifier: NMA,
result: Option<bool>
}
impl<NMA: NotifierMapAccessor> PublishFuture<NMA>
{
pub fn new(packet_id: u16, notifier: NMA) -> Self
{
notifier.access_notifier_map().lock().insert(packet_id, NotifyResult::WithoutWaker);
Self {
packet_id,
notifier,
result: None
}
}
}
impl<NMA: NotifierMapAccessor> Future for PublishFuture<NMA>
{
type Output = bool;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool>
{
if let Some(ret) = self.result {
return Poll::Ready(ret);
}
let mut map = self.notifier.access_notifier_map().lock();
match map.get_mut(&self.packet_id) {
Some(entry) => match entry {
NotifyResult::Failed => {
drop(map);
self.as_mut().result = Some(true);
Poll::Ready(true)
},
_ => {
*entry = NotifyResult::WithWaker(cx.waker().clone());
Poll::Pending
}
},
None => {
drop(map);
self.as_mut().result = Some(false);
Poll::Ready(false)
}
}
}
}
impl<NMA: NotifierMapAccessor> Drop for PublishFuture<NMA>
{
fn drop(&mut self)
{
if self.result.is_none() {
self.notifier.access_notifier_map().lock().remove(&self.packet_id);
}
}
}
pub(super) struct SubscribeFuture<'a>
{
client_shared: &'a ClientShared,
topic: &'a str,
waker_index: Option<usize>,
result: Option<Result<QoS, ()>>
}
impl<'a> SubscribeFuture<'a>
{
pub(super) fn new(client_shared: &'a ClientShared, topic: &'a str) -> Self
{
Self {
client_shared, topic,
waker_index: None,
result: None
}
}
}
impl<'a> Future for SubscribeFuture<'a>
{
type Output = Result<QoS, ()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<QoS, ()>>
{
if let Some(result) = self.result {
return Poll::Ready(result);
}
let mut sub_map = self.client_shared.subs.lock();
let sub_state = match sub_map.get_mut(self.topic) {
Some(x) => x,
None => {
drop(sub_map);
self.as_mut().result = Some(Err(()));
return Poll::Ready(Err(()));
}
};
match sub_state {
SubscriptionState::Existing(qos) => {
let qos = *qos;
drop(sub_map);
self.as_mut().result = Some(Ok(qos));
Poll::Ready(Ok(qos))
},
SubscriptionState::Pending(wakers) => {
let waker = Some(cx.waker().clone());
if let Some(i) = self.waker_index {
wakers[i] = waker;
} else {
let i = wakers.len();
wakers.push(waker);
drop(sub_map);
self.as_mut().waker_index = Some(i);
}
Poll::Pending
}
}
}
}
impl<'a> Drop for SubscribeFuture<'a>
{
fn drop(&mut self)
{
if self.result.is_none() {
if let Some(i) = self.waker_index {
match self.client_shared.subs.lock().get_mut(self.topic) {
Some(SubscriptionState::Pending(wakers)) => wakers[i] = None,
_ => {}
}
}
}
}
}