actix_mqtt_client/actors/packets/
mod.rs1#[macro_use]
2mod macros;
3pub mod connack;
4pub mod connect;
5pub mod disconnect;
6pub mod pingreq;
7pub mod pingresp;
8pub mod puback;
9pub mod pubcomp;
10pub mod publish;
11pub mod pubrec;
12pub mod pubrel;
13pub mod suback;
14pub mod subscribe;
15pub mod unsuback;
16pub mod unsubscribe;
17
18use std::vec::Vec;
19
20use actix::dev::ToEnvelope;
21use actix::{Actor, AsyncContext, Context, Handler, Message, Recipient};
22use mqtt::packet::VariablePacket;
23
24use crate::actors::actions::status::{PacketStatus, StatusOperationMessage};
25use crate::actors::{ErrorMessage, StopMessage};
26use crate::consts::COMMAND_TIMEOUT;
27
28use super::{handle_mailbox_error_with_resend, handle_send_error_with_resend, send_error};
29
30#[derive(Clone)]
31pub struct PacketMessage<T: Clone> {
32 pub packet: T,
33 pub retry_count: u16,
34}
35
36impl<T: Clone> Message for PacketMessage<T> {
37 type Result = ();
38}
39
40impl<T: Clone> PacketMessage<T> {
41 pub fn new(packet: T, retry_count: u16) -> Self {
42 PacketMessage {
43 packet,
44 retry_count,
45 }
46 }
47}
48
49pub type VariablePacketMessage = PacketMessage<VariablePacket>;
50
51#[derive(Debug, Message, Clone)]
53#[rtype(result = "()")]
54pub struct PublishMessage {
55 pub id: u16,
57 pub topic_name: String,
59 pub payload: Vec<u8>,
61}
62
63#[derive(PartialEq)]
64pub enum PublishPacketStatus {
65 PendingAck,
66 PendingRec,
67 PendingRel,
68 PendingComp,
69}
70
71fn schedule_status_check<TActor, TMessage, TStatusPayload, TStatusCheckFunc>(
72 ctx: &mut Context<TActor>,
73 status_recipient: &Recipient<StatusOperationMessage<TStatusPayload>>,
74 error_recipient: &Recipient<ErrorMessage>,
75 stop_recipient: &Recipient<StopMessage>,
76 id: u16,
77 retry_msg: TMessage,
78 status_check_func: TStatusCheckFunc,
79) where
80 TActor: Actor<Context = Context<TActor>> + Handler<TMessage>,
81 TMessage: Message + Send + 'static + Clone,
82 TMessage::Result: Send,
83 TActor::Context: ToEnvelope<TActor, TMessage>,
84 TStatusPayload: Send + 'static,
85 TStatusCheckFunc: FnOnce(&Option<PacketStatus<TStatusPayload>>) -> bool + Send + 'static,
86{
87 let error_recipient = error_recipient.clone();
88 let stop_recipient = stop_recipient.clone();
89 let status_recipient = status_recipient.clone();
90 let addr = ctx.address();
91 let addr_clone = addr.clone();
92 let msg_clone = retry_msg.clone();
93 ctx.run_later(COMMAND_TIMEOUT.clone(), move |_, _| {
94 let status_future = async move {
95 let status_result = status_recipient
96 .send(
97 crate::actors::actions::status::StatusOperationMessage::GetAndRemovePacketStatus(
98 id,
99 ),
100 )
101 .await;
102 match status_result {
103 Ok(status) => {
104 if status_check_func(&status) {
105 addr.do_send(retry_msg);
106 }
107 }
108 Err(e) => {
109 handle_mailbox_error_with_resend(
110 "schedule_status_check",
111 e,
112 &error_recipient,
113 &stop_recipient,
114 addr_clone,
115 msg_clone,
116 );
117 }
118 }
119 };
120
121 actix::Arbiter::current().spawn(status_future);
122 });
123}
124
125fn set_packet_status<TActor, TMessage, TStatusPayload>(
126 tag: &str,
127 ctx: &mut Context<TActor>,
128 status_recipient: &Recipient<StatusOperationMessage<TStatusPayload>>,
129 error_recipient: &Recipient<ErrorMessage>,
130 stop_recipient: &Recipient<StopMessage>,
131 resend_msg: TMessage,
132 status_message: StatusOperationMessage<TStatusPayload>,
133) -> bool
134where
135 TActor: Actor<Context = Context<TActor>> + Handler<TMessage>,
136 TMessage: Message + Send + 'static,
137 TMessage::Result: Send,
138 TActor::Context: ToEnvelope<TActor, TMessage>,
139 TStatusPayload: Send + 'static,
140{
141 if let Err(e) = status_recipient.try_send(status_message) {
142 let addr = ctx.address();
143 handle_send_error_with_resend(tag, e, error_recipient, stop_recipient, addr, resend_msg);
144 false
145 } else {
146 true
147 }
148}
149
150fn reset_packet_status<TActor, TMessage, TStatusPayload>(
151 tag: &str,
152 ctx: &mut Context<TActor>,
153 status_recipient: &Recipient<StatusOperationMessage<TStatusPayload>>,
154 error_recipient: &Recipient<ErrorMessage>,
155 stop_recipient: &Recipient<StopMessage>,
156 id: u16,
157 resend_msg: TMessage,
158) -> bool
159where
160 TActor: Actor<Context = Context<TActor>> + Handler<TMessage>,
161 TMessage: Message + Send + 'static,
162 TMessage::Result: Send,
163 TActor::Context: ToEnvelope<TActor, TMessage>,
164 TStatusPayload: Send + 'static,
165{
166 if let Err(e) = status_recipient.try_send(StatusOperationMessage::RemovePacketStatus(id)) {
167 let addr = ctx.address();
168 handle_send_error_with_resend(tag, e, error_recipient, stop_recipient, addr, resend_msg);
169 false
170 } else {
171 true
172 }
173}
174
175fn send_packet<TActor, TMessage>(
176 tag: &str,
177 ctx: &Context<TActor>,
178 send_recipient: &Recipient<VariablePacketMessage>,
179 error_recipient: &Recipient<ErrorMessage>,
180 stop_recipient: &Recipient<StopMessage>,
181 packet: VariablePacket,
182 resend_msg: TMessage,
183) -> bool
184where
185 TActor: Actor<Context = Context<TActor>> + Handler<TMessage>,
186 TMessage: Message + Send + 'static,
187 TMessage::Result: Send,
188 TActor::Context: ToEnvelope<TActor, TMessage>,
189{
190 let message = VariablePacketMessage::new(packet, 0);
191 if let Err(e) = send_recipient.try_send(message) {
192 let addr = ctx.address();
193 handle_send_error_with_resend(tag, e, error_recipient, stop_recipient, addr, resend_msg);
194 false
195 } else {
196 true
197 }
198}