rmqtt-http-api 0.18.0

This plugin provides HTTP APIs for integration with external systems, enabling operations like querying client information and publishing messages.
Documentation
use anyhow::anyhow;
use tokio::sync::oneshot;

use rmqtt::{
    context::ServerContext,
    types::{HashMap, Id, Message as MqttMessage, Subscribe, TopicFilter, Unsubscribe},
    Result,
};

use super::types::{SubscribeParams, UnsubscribeParams};

#[inline]
pub(crate) async fn subscribe(
    scx: &ServerContext,
    params: SubscribeParams,
) -> Result<HashMap<TopicFilter, Result<bool>>> {
    let topics = params.topics()?;
    let qos = params.qos()?;
    let clientid = params.clientid;
    let id = Id::from(scx.node.id(), clientid);
    let entry = scx.extends.shared().await.entry(id);
    let s = entry.session().ok_or_else(|| anyhow!("session does not exist!"))?;
    let shared_subs = scx.extends.shared_subscription().await.is_supported(s.listen_cfg());
    let limit_subs = s.listen_cfg().limit_subscription;
    let tx = entry.tx().ok_or_else(|| anyhow!("session message TX is not exist!"))?;
    let qos = qos.less_value(s.listen_cfg().max_qos_allowed);
    let subs = topics
        .iter()
        .map(|t| Subscribe::from_v3(t, qos, shared_subs, limit_subs))
        .collect::<Result<Vec<_>>>()?;

    let mut reply_rxs = Vec::new();
    for sub in subs {
        let topic_filter = sub.topic_filter.clone();
        let (reply_tx, reply_rx) = oneshot::channel();
        let send_reply = tx.unbounded_send(MqttMessage::Subscribe(sub, reply_tx));

        let reply_fut = async move {
            let reply = if let Err(send_err) = send_reply {
                Err(anyhow!(send_err.to_string()))
            } else {
                match reply_rx.await {
                    Ok(Ok(res)) => Ok(!res.failure()),
                    Ok(Err(e)) => Err(e),
                    Err(e) => Err(anyhow!(e.to_string())),
                }
            };
            (topic_filter, reply)
        };
        reply_rxs.push(reply_fut);
    }

    Ok(futures::future::join_all(reply_rxs).await.into_iter().collect())
}

#[inline]
pub(crate) async fn unsubscribe(scx: &ServerContext, params: UnsubscribeParams) -> Result<()> {
    let topic_filter = params.topic;
    let clientid = params.clientid;
    let id = Id::from(scx.node.id(), clientid);
    let entry = scx.extends.shared().await.entry(id);
    let s = entry.session().ok_or_else(|| anyhow!("session does not exist!"))?;
    let shared_subs = scx.extends.shared_subscription().await.is_supported(s.listen_cfg());
    let limit_subs = s.listen_cfg().limit_subscription;
    let tx = entry.tx().ok_or_else(|| anyhow!("session message TX is not exist!"))?;
    let unsub = Unsubscribe::from(&topic_filter, shared_subs, limit_subs)?;
    let (reply_tx, reply_rx) = oneshot::channel();
    tx.unbounded_send(MqttMessage::Unsubscribe(unsub, reply_tx)).map_err(anyhow::Error::new)?;
    reply_rx.await.map_err(anyhow::Error::new)??;
    Ok(())
}