actix_mqtt_client/actors/packets/
subscribe.rs

1use std::vec::Vec;
2
3use actix::{AsyncContext, Handler, Message};
4use log::error;
5use mqtt::packet::SubscribePacket;
6pub use mqtt::{QualityOfService, TopicFilter};
7
8use crate::actors::utils;
9
10#[derive(Message, Clone)]
11#[rtype(result = "()")]
12pub struct Subscribe {
13    topic: String,
14    qos: QualityOfService,
15}
16
17impl Subscribe {
18    pub fn new(topic: String, qos: QualityOfService) -> Self {
19        Subscribe { topic, qos }
20    }
21}
22
23#[derive(Message, Clone)]
24#[rtype(result = "()")]
25pub struct BatchSubscribe {
26    subscriptions: Vec<Subscribe>,
27    retry_count: u16,
28}
29
30impl BatchSubscribe {
31    pub fn new(subscriptions: Vec<Subscribe>) -> Self {
32        BatchSubscribe {
33            subscriptions,
34            retry_count: 0,
35        }
36    }
37}
38
39fn get_retry_count_from_message(msg: &BatchSubscribe) -> u16 {
40    msg.retry_count
41}
42
43fn create_retry_message_from_message(msg: BatchSubscribe) -> BatchSubscribe {
44    let mut retry_msg = msg;
45    retry_msg.retry_count += 1;
46    retry_msg
47}
48
49fn create_packet_and_id_from_message(msg: &BatchSubscribe) -> Option<(SubscribePacket, u16)> {
50    let subscriptions: Vec<(TopicFilter, QualityOfService)> = msg
51        .subscriptions
52        .clone()
53        .into_iter()
54        .map(|s| (TopicFilter::new(s.topic), s.qos))
55        .filter(|(result, _)| match result {
56            Ok(_) => true,
57            Err(e) => {
58                error!("Error pasing topic: {}, ignore", e);
59                false
60            }
61        })
62        .map(|(result, qos)| (result.unwrap(), qos))
63        .collect();
64    if subscriptions.is_empty() {
65        error!("No valid topic found");
66        return None;
67    }
68
69    let id = utils::next_id();
70    let subscribe_packet = SubscribePacket::new(id, subscriptions);
71    Some((subscribe_packet, id))
72}
73
74define_send_packet_actor!(SubscribeActor);
75impl_empty_actor!(SubscribeActor);
76impl_send_packet_actor!(
77    SubscribeActor,
78    BatchSubscribe,
79    SubscribePacket,
80    get_retry_count_from_message,
81    create_retry_message_from_message,
82    create_packet_and_id_from_message
83);
84
85impl Handler<Subscribe> for SubscribeActor {
86    type Result = ();
87    fn handle(&mut self, msg: Subscribe, ctx: &mut Self::Context) -> Self::Result {
88        let batch_msg = BatchSubscribe::new(vec![msg]);
89        ctx.notify(batch_msg);
90    }
91}