actix_mqtt_client/actors/packets/
pingreq.rs

1use std::time::Duration;
2
3use actix::{Actor, Arbiter, AsyncContext, Context, Handler, Message, Recipient};
4use log::{error, trace};
5use mqtt::packet::PingreqPacket;
6
7use crate::actors::actions::status::{StatusExistenceMessage, StatusOperationMessage};
8use crate::actors::{ErrorMessage, StopMessage};
9
10use super::handle_mailbox_error_with_resend;
11use super::VariablePacketMessage;
12
13#[derive(Message)]
14#[rtype(result = "()")]
15pub struct Pingreq(pub u16);
16
17#[derive(Message, Clone)]
18#[rtype(result = "()")]
19struct SendPing(pub u16);
20
21pub struct PingreqActor {
22    status_recipient: Recipient<StatusOperationMessage<()>>,
23    connect_status_recipient: Recipient<StatusExistenceMessage>,
24    send_recipient: Recipient<VariablePacketMessage>,
25    error_recipient: Recipient<ErrorMessage>,
26    stop_recipient: Recipient<StopMessage>,
27    interval: Duration,
28}
29
30impl PingreqActor {
31    pub fn new(
32        status_recipient: Recipient<StatusOperationMessage<()>>,
33        connect_status_recipient: Recipient<StatusExistenceMessage>,
34        send_recipient: Recipient<VariablePacketMessage>,
35        error_recipient: Recipient<ErrorMessage>,
36        stop_recipient: Recipient<StopMessage>,
37        interval: Duration,
38    ) -> Self {
39        PingreqActor {
40            status_recipient,
41            connect_status_recipient,
42            send_recipient,
43            error_recipient,
44            stop_recipient,
45            interval,
46        }
47    }
48}
49
50impl Actor for PingreqActor {
51    type Context = Context<Self>;
52    fn started(&mut self, ctx: &mut Self::Context) {
53        trace!("PingreqActor started");
54        ctx.notify(Pingreq(0));
55        ctx.run_interval(self.interval.clone(), |_, ctx| {
56            trace!("Start to send ping");
57            ctx.notify(Pingreq(0));
58        });
59    }
60
61    fn stopped(&mut self, _: &mut Self::Context) {
62        trace!("PingreqActor stopped");
63    }
64}
65
66impl Handler<Pingreq> for PingreqActor {
67    type Result = ();
68    fn handle(&mut self, msg: Pingreq, ctx: &mut Self::Context) -> Self::Result {
69        let last_retry_count = msg.0;
70        assert_valid_retry_count!(PingreqActor, self, last_retry_count, 0);
71        let status_recipient = self.status_recipient.clone();
72        let connect_status_recipient = self.connect_status_recipient.clone();
73        let error_recipient = self.error_recipient.clone();
74        let stop_recipient = self.stop_recipient.clone();
75        let addr = ctx.address();
76        let addr_clone = addr.clone();
77        let status_future = async move {
78            // For connect status:
79            //      status message with id = 0 indicating the connecing status
80            //      status message with id = 1 indicating the connected status
81            let connect_status_result = connect_status_recipient
82                .send(StatusExistenceMessage(1u16))
83                .await;
84            match connect_status_result {
85                Ok(false) => {
86                    trace!("Server not connected yet, do nothing.");
87                    return;
88                }
89                Err(e) => {
90                    error!("Failed to get connect status: {}", e);
91                    return;
92                }
93                _ => {
94                    trace!("Server connected, will send ping");
95                }
96            }
97
98            let status_result = status_recipient
99                .send(StatusOperationMessage::GetAndRemovePacketStatus(0))
100                .await;
101            match status_result {
102                Ok(status) => {
103                    if status.is_none() {
104                        // Only try send ping if no previous on-going ping
105                        addr.do_send(SendPing(0));
106                    }
107                }
108                Err(e) => {
109                    handle_mailbox_error_with_resend(
110                        "PingreqActor:status_recipient",
111                        e,
112                        &error_recipient,
113                        &stop_recipient,
114                        addr_clone,
115                        Pingreq(last_retry_count + 1),
116                    );
117                }
118            }
119        };
120        Arbiter::current().spawn(status_future);
121    }
122}
123
124fn get_retry_count_from_message(msg: &SendPing) -> u16 {
125    msg.0
126}
127
128fn create_retry_msessage_from_message(msg: SendPing) -> SendPing {
129    SendPing(msg.0 + 1)
130}
131
132fn create_packet_and_id_from_message(_: &SendPing) -> Option<(PingreqPacket, u16)> {
133    Some((PingreqPacket::new(), 0))
134}
135
136impl_send_packet_actor!(
137    PingreqActor,
138    SendPing,
139    PingreqPacket,
140    get_retry_count_from_message,
141    create_retry_msessage_from_message,
142    create_packet_and_id_from_message
143);