actix_mqtt_client/actors/
mod.rs1use std::io;
2
3#[macro_use]
4pub mod macros;
5pub mod actions;
6pub mod packets;
7mod utils;
8
9use std::io::ErrorKind;
10
11use actix::dev::ToEnvelope;
12use actix::prelude::SendError;
13use actix::{Actor, Addr, Arbiter, Handler, MailboxError, Message, Recipient};
14use log::trace;
15use tokio::time::{sleep_until, Instant};
16
17use crate::consts::RESEND_DELAY;
18
19#[derive(Message)]
21#[rtype(result = "()")]
22pub struct StopMessage;
23
24#[derive(Message)]
26#[rtype(result = "()")]
27pub struct ErrorMessage(pub io::Error);
28
29pub fn send_error<T: AsRef<str>>(
30 tag: &str,
31 error_recipient: &Recipient<ErrorMessage>,
32 kind: io::ErrorKind,
33 message: T,
34) {
35 let error = io::Error::new(kind, message.as_ref());
36 let send_result = error_recipient.try_send(ErrorMessage(error));
37 log::debug!(
38 "[{}] Send result for error recipient: {:?}",
39 tag,
40 send_result
41 );
42}
43
44fn resend<TActor, TMessage>(addr: Addr<TActor>, msg: TMessage)
45where
46 TMessage: Message + Send + 'static,
47 TActor: Actor + Handler<TMessage>,
48 TMessage::Result: Send,
49 TActor::Context: ToEnvelope<TActor, TMessage>,
50{
51 trace!("Schedule resend message");
52 let later_func = async move {
53 let delay_time = Instant::now() + RESEND_DELAY.clone();
54 sleep_until(delay_time).await;
55 addr.do_send(msg);
56 };
57 Arbiter::current().spawn(later_func);
58}
59
60fn handle_send_error<T>(
61 tag: &str,
62 e: SendError<T>,
63 error_recipient: &Recipient<ErrorMessage>,
64 stop_recipient: &Recipient<StopMessage>,
65) {
66 match e {
67 SendError::Closed(_) => {
68 send_error(
69 tag,
70 error_recipient,
71 ErrorKind::Interrupted,
72 format!("[{}] Target mailbox is closed", tag),
73 );
74 let _ = stop_recipient.do_send(StopMessage);
75 }
76 SendError::Full(_) => {
77 send_error(
79 tag,
80 error_recipient,
81 ErrorKind::Other,
82 format!("[{}] Target mailbox is full", tag),
83 );
84 }
85 }
86}
87
88fn handle_send_error_with_resend<T, TActor, TMessage>(
89 tag: &str,
90 e: SendError<T>,
91 error_recipient: &Recipient<ErrorMessage>,
92 stop_recipient: &Recipient<StopMessage>,
93 addr: Addr<TActor>,
94 msg: TMessage,
95) where
96 TMessage: Message + Send + 'static,
97 TActor: Actor + Handler<TMessage>,
98 TMessage::Result: Send,
99 TActor::Context: ToEnvelope<TActor, TMessage>,
100{
101 match e {
102 SendError::Closed(_) => {
103 send_error(
104 tag,
105 error_recipient,
106 ErrorKind::Interrupted,
107 format!("[{}] Target mailbox is closed", tag),
108 );
109 let _ = stop_recipient.do_send(StopMessage);
110 }
111 SendError::Full(_) => {
112 send_error(
113 tag,
114 error_recipient,
115 ErrorKind::Other,
116 format!("[{}] Target mailbox is full, will retry send", tag),
117 );
118 resend(addr, msg);
119 }
120 }
121}
122
123fn handle_mailbox_error_with_resend<TActor, TMessage>(
145 tag: &str,
146 e: MailboxError,
147 error_recipient: &Recipient<ErrorMessage>,
148 stop_recipient: &Recipient<StopMessage>,
149 addr: Addr<TActor>,
150 msg: TMessage,
151) where
152 TMessage: Message + Send + 'static,
153 TActor: Actor + Handler<TMessage>,
154 TMessage::Result: Send,
155 TActor::Context: ToEnvelope<TActor, TMessage>,
156{
157 match e {
158 MailboxError::Closed => {
159 send_error(
160 tag,
161 error_recipient,
162 ErrorKind::Interrupted,
163 "Target mailbox is closed",
164 );
165 let _ = stop_recipient.do_send(StopMessage);
166 }
167 MailboxError::Timeout => {
168 send_error(
169 tag,
170 error_recipient,
171 ErrorKind::Other,
172 "Send timeout, will resend",
173 );
174
175 resend(addr, msg);
176 }
177 }
178}