actix_mqtt_client/actors/
mod.rs

1use std::io;
2
3#[macro_use]
4pub mod macros;
5pub mod actions;
6pub mod packets;
7mod utils;
8
9use std::io::ErrorKind;
10
11use actix::dev::ToEnvelope;
12use actix::prelude::SendError;
13use actix::{Actor, Addr, Arbiter, Handler, MailboxError, Message, Recipient};
14use log::trace;
15use tokio::time::{sleep_until, Instant};
16
17use crate::consts::RESEND_DELAY;
18
19/// The actix message indicating that the client is about to stop
20#[derive(Message)]
21#[rtype(result = "()")]
22pub struct StopMessage;
23
24/// The actix message containing the error happens inside the client
25#[derive(Message)]
26#[rtype(result = "()")]
27pub struct ErrorMessage(pub io::Error);
28
29pub fn send_error<T: AsRef<str>>(
30    tag: &str,
31    error_recipient: &Recipient<ErrorMessage>,
32    kind: io::ErrorKind,
33    message: T,
34) {
35    let error = io::Error::new(kind, message.as_ref());
36    let send_result = error_recipient.try_send(ErrorMessage(error));
37    log::debug!(
38        "[{}] Send result for error recipient: {:?}",
39        tag,
40        send_result
41    );
42}
43
44fn resend<TActor, TMessage>(addr: Addr<TActor>, msg: TMessage)
45where
46    TMessage: Message + Send + 'static,
47    TActor: Actor + Handler<TMessage>,
48    TMessage::Result: Send,
49    TActor::Context: ToEnvelope<TActor, TMessage>,
50{
51    trace!("Schedule resend message");
52    let later_func = async move {
53        let delay_time = Instant::now() + RESEND_DELAY.clone();
54        sleep_until(delay_time).await;
55        addr.do_send(msg);
56    };
57    Arbiter::current().spawn(later_func);
58}
59
60fn handle_send_error<T>(
61    tag: &str,
62    e: SendError<T>,
63    error_recipient: &Recipient<ErrorMessage>,
64    stop_recipient: &Recipient<StopMessage>,
65) {
66    match e {
67        SendError::Closed(_) => {
68            send_error(
69                tag,
70                error_recipient,
71                ErrorKind::Interrupted,
72                format!("[{}] Target mailbox is closed", tag),
73            );
74            let _ = stop_recipient.do_send(StopMessage);
75        }
76        SendError::Full(_) => {
77            // Do nothing
78            send_error(
79                tag,
80                error_recipient,
81                ErrorKind::Other,
82                format!("[{}] Target mailbox is full", tag),
83            );
84        }
85    }
86}
87
88fn handle_send_error_with_resend<T, TActor, TMessage>(
89    tag: &str,
90    e: SendError<T>,
91    error_recipient: &Recipient<ErrorMessage>,
92    stop_recipient: &Recipient<StopMessage>,
93    addr: Addr<TActor>,
94    msg: TMessage,
95) where
96    TMessage: Message + Send + 'static,
97    TActor: Actor + Handler<TMessage>,
98    TMessage::Result: Send,
99    TActor::Context: ToEnvelope<TActor, TMessage>,
100{
101    match e {
102        SendError::Closed(_) => {
103            send_error(
104                tag,
105                error_recipient,
106                ErrorKind::Interrupted,
107                format!("[{}] Target mailbox is closed", tag),
108            );
109            let _ = stop_recipient.do_send(StopMessage);
110        }
111        SendError::Full(_) => {
112            send_error(
113                tag,
114                error_recipient,
115                ErrorKind::Other,
116                format!("[{}] Target mailbox is full, will retry send", tag),
117            );
118            resend(addr, msg);
119        }
120    }
121}
122
123// fn handle_mailbox_error(
124//     e: MailboxError,
125//     error_recipient: &Recipient<ErrorMessage>,
126//     stop_recipient: &Recipient<StopMessage>,
127// ) {
128//     match e {
129//         MailboxError::Closed => {
130//             send_error(
131//                 error_recipient,
132//                 ErrorKind::Interrupted,
133//                 "Target mailbox is closed",
134//             );
135//             stop_system(stop_recipient, errors::ERROR_CODE_FAILED_TO_SEND_MESSAGE);
136//         }
137//         MailboxError::Timeout => {
138//             // Do nothing
139//             send_error(error_recipient, ErrorKind::Other, "Send timeout");
140//         }
141//     }
142// }
143
144fn handle_mailbox_error_with_resend<TActor, TMessage>(
145    tag: &str,
146    e: MailboxError,
147    error_recipient: &Recipient<ErrorMessage>,
148    stop_recipient: &Recipient<StopMessage>,
149    addr: Addr<TActor>,
150    msg: TMessage,
151) where
152    TMessage: Message + Send + 'static,
153    TActor: Actor + Handler<TMessage>,
154    TMessage::Result: Send,
155    TActor::Context: ToEnvelope<TActor, TMessage>,
156{
157    match e {
158        MailboxError::Closed => {
159            send_error(
160                tag,
161                error_recipient,
162                ErrorKind::Interrupted,
163                "Target mailbox is closed",
164            );
165            let _ = stop_recipient.do_send(StopMessage);
166        }
167        MailboxError::Timeout => {
168            send_error(
169                tag,
170                error_recipient,
171                ErrorKind::Other,
172                "Send timeout, will resend",
173            );
174
175            resend(addr, msg);
176        }
177    }
178}