actix_mqtt_client/actors/packets/
subscribe.rs1use std::vec::Vec;
2
3use actix::{AsyncContext, Handler, Message};
4use log::error;
5use mqtt::packet::SubscribePacket;
6pub use mqtt::{QualityOfService, TopicFilter};
7
8use crate::actors::utils;
9
10#[derive(Message, Clone)]
11#[rtype(result = "()")]
12pub struct Subscribe {
13 topic: String,
14 qos: QualityOfService,
15}
16
17impl Subscribe {
18 pub fn new(topic: String, qos: QualityOfService) -> Self {
19 Subscribe { topic, qos }
20 }
21}
22
23#[derive(Message, Clone)]
24#[rtype(result = "()")]
25pub struct BatchSubscribe {
26 subscriptions: Vec<Subscribe>,
27 retry_count: u16,
28}
29
30impl BatchSubscribe {
31 pub fn new(subscriptions: Vec<Subscribe>) -> Self {
32 BatchSubscribe {
33 subscriptions,
34 retry_count: 0,
35 }
36 }
37}
38
39fn get_retry_count_from_message(msg: &BatchSubscribe) -> u16 {
40 msg.retry_count
41}
42
43fn create_retry_message_from_message(msg: BatchSubscribe) -> BatchSubscribe {
44 let mut retry_msg = msg;
45 retry_msg.retry_count += 1;
46 retry_msg
47}
48
49fn create_packet_and_id_from_message(msg: &BatchSubscribe) -> Option<(SubscribePacket, u16)> {
50 let subscriptions: Vec<(TopicFilter, QualityOfService)> = msg
51 .subscriptions
52 .clone()
53 .into_iter()
54 .map(|s| (TopicFilter::new(s.topic), s.qos))
55 .filter(|(result, _)| match result {
56 Ok(_) => true,
57 Err(e) => {
58 error!("Error pasing topic: {}, ignore", e);
59 false
60 }
61 })
62 .map(|(result, qos)| (result.unwrap(), qos))
63 .collect();
64 if subscriptions.is_empty() {
65 error!("No valid topic found");
66 return None;
67 }
68
69 let id = utils::next_id();
70 let subscribe_packet = SubscribePacket::new(id, subscriptions);
71 Some((subscribe_packet, id))
72}
73
74define_send_packet_actor!(SubscribeActor);
75impl_empty_actor!(SubscribeActor);
76impl_send_packet_actor!(
77 SubscribeActor,
78 BatchSubscribe,
79 SubscribePacket,
80 get_retry_count_from_message,
81 create_retry_message_from_message,
82 create_packet_and_id_from_message
83);
84
85impl Handler<Subscribe> for SubscribeActor {
86 type Result = ();
87 fn handle(&mut self, msg: Subscribe, ctx: &mut Self::Context) -> Self::Result {
88 let batch_msg = BatchSubscribe::new(vec![msg]);
89 ctx.notify(batch_msg);
90 }
91}