actix_mqtt_client/actors/packets/
mod.rs

1#[macro_use]
2mod macros;
3pub mod connack;
4pub mod connect;
5pub mod disconnect;
6pub mod pingreq;
7pub mod pingresp;
8pub mod puback;
9pub mod pubcomp;
10pub mod publish;
11pub mod pubrec;
12pub mod pubrel;
13pub mod suback;
14pub mod subscribe;
15pub mod unsuback;
16pub mod unsubscribe;
17
18use std::vec::Vec;
19
20use actix::dev::ToEnvelope;
21use actix::{Actor, AsyncContext, Context, Handler, Message, Recipient};
22use mqtt::packet::VariablePacket;
23
24use crate::actors::actions::status::{PacketStatus, StatusOperationMessage};
25use crate::actors::{ErrorMessage, StopMessage};
26use crate::consts::COMMAND_TIMEOUT;
27
28use super::{handle_mailbox_error_with_resend, handle_send_error_with_resend, send_error};
29
30#[derive(Clone)]
31pub struct PacketMessage<T: Clone> {
32    pub packet: T,
33    pub retry_count: u16,
34}
35
36impl<T: Clone> Message for PacketMessage<T> {
37    type Result = ();
38}
39
40impl<T: Clone> PacketMessage<T> {
41    pub fn new(packet: T, retry_count: u16) -> Self {
42        PacketMessage {
43            packet,
44            retry_count,
45        }
46    }
47}
48
49pub type VariablePacketMessage = PacketMessage<VariablePacket>;
50
51/// The actix message containing the payload of a MQTT publish packet
52#[derive(Debug, Message, Clone)]
53#[rtype(result = "()")]
54pub struct PublishMessage {
55    /// The packet identifier of the publish packet for QoS Level 1 and Level 2, or 0 for QoS Level 0
56    pub id: u16,
57    /// The topic name of the message
58    pub topic_name: String,
59    /// The message payload
60    pub payload: Vec<u8>,
61}
62
63#[derive(PartialEq)]
64pub enum PublishPacketStatus {
65    PendingAck,
66    PendingRec,
67    PendingRel,
68    PendingComp,
69}
70
71fn schedule_status_check<TActor, TMessage, TStatusPayload, TStatusCheckFunc>(
72    ctx: &mut Context<TActor>,
73    status_recipient: &Recipient<StatusOperationMessage<TStatusPayload>>,
74    error_recipient: &Recipient<ErrorMessage>,
75    stop_recipient: &Recipient<StopMessage>,
76    id: u16,
77    retry_msg: TMessage,
78    status_check_func: TStatusCheckFunc,
79) where
80    TActor: Actor<Context = Context<TActor>> + Handler<TMessage>,
81    TMessage: Message + Send + 'static + Clone,
82    TMessage::Result: Send,
83    TActor::Context: ToEnvelope<TActor, TMessage>,
84    TStatusPayload: Send + 'static,
85    TStatusCheckFunc: FnOnce(&Option<PacketStatus<TStatusPayload>>) -> bool + Send + 'static,
86{
87    let error_recipient = error_recipient.clone();
88    let stop_recipient = stop_recipient.clone();
89    let status_recipient = status_recipient.clone();
90    let addr = ctx.address();
91    let addr_clone = addr.clone();
92    let msg_clone = retry_msg.clone();
93    ctx.run_later(COMMAND_TIMEOUT.clone(), move |_, _| {
94        let status_future = async move {
95            let status_result = status_recipient
96                .send(
97                    crate::actors::actions::status::StatusOperationMessage::GetAndRemovePacketStatus(
98                        id,
99                    ),
100                )
101                .await;
102            match status_result {
103                Ok(status) => {
104                    if status_check_func(&status) {
105                        addr.do_send(retry_msg);
106                    }
107                }
108                Err(e) => {
109                    handle_mailbox_error_with_resend(
110                        "schedule_status_check",
111                        e,
112                        &error_recipient,
113                        &stop_recipient,
114                        addr_clone,
115                        msg_clone,
116                    );
117                }
118            }
119        };
120
121        actix::Arbiter::current().spawn(status_future);
122    });
123}
124
125fn set_packet_status<TActor, TMessage, TStatusPayload>(
126    tag: &str,
127    ctx: &mut Context<TActor>,
128    status_recipient: &Recipient<StatusOperationMessage<TStatusPayload>>,
129    error_recipient: &Recipient<ErrorMessage>,
130    stop_recipient: &Recipient<StopMessage>,
131    resend_msg: TMessage,
132    status_message: StatusOperationMessage<TStatusPayload>,
133) -> bool
134where
135    TActor: Actor<Context = Context<TActor>> + Handler<TMessage>,
136    TMessage: Message + Send + 'static,
137    TMessage::Result: Send,
138    TActor::Context: ToEnvelope<TActor, TMessage>,
139    TStatusPayload: Send + 'static,
140{
141    if let Err(e) = status_recipient.try_send(status_message) {
142        let addr = ctx.address();
143        handle_send_error_with_resend(tag, e, error_recipient, stop_recipient, addr, resend_msg);
144        false
145    } else {
146        true
147    }
148}
149
150fn reset_packet_status<TActor, TMessage, TStatusPayload>(
151    tag: &str,
152    ctx: &mut Context<TActor>,
153    status_recipient: &Recipient<StatusOperationMessage<TStatusPayload>>,
154    error_recipient: &Recipient<ErrorMessage>,
155    stop_recipient: &Recipient<StopMessage>,
156    id: u16,
157    resend_msg: TMessage,
158) -> bool
159where
160    TActor: Actor<Context = Context<TActor>> + Handler<TMessage>,
161    TMessage: Message + Send + 'static,
162    TMessage::Result: Send,
163    TActor::Context: ToEnvelope<TActor, TMessage>,
164    TStatusPayload: Send + 'static,
165{
166    if let Err(e) = status_recipient.try_send(StatusOperationMessage::RemovePacketStatus(id)) {
167        let addr = ctx.address();
168        handle_send_error_with_resend(tag, e, error_recipient, stop_recipient, addr, resend_msg);
169        false
170    } else {
171        true
172    }
173}
174
175fn send_packet<TActor, TMessage>(
176    tag: &str,
177    ctx: &Context<TActor>,
178    send_recipient: &Recipient<VariablePacketMessage>,
179    error_recipient: &Recipient<ErrorMessage>,
180    stop_recipient: &Recipient<StopMessage>,
181    packet: VariablePacket,
182    resend_msg: TMessage,
183) -> bool
184where
185    TActor: Actor<Context = Context<TActor>> + Handler<TMessage>,
186    TMessage: Message + Send + 'static,
187    TMessage::Result: Send,
188    TActor::Context: ToEnvelope<TActor, TMessage>,
189{
190    let message = VariablePacketMessage::new(packet, 0);
191    if let Err(e) = send_recipient.try_send(message) {
192        let addr = ctx.address();
193        handle_send_error_with_resend(tag, e, error_recipient, stop_recipient, addr, resend_msg);
194        false
195    } else {
196        true
197    }
198}