actix_mqtt_client/actors/packets/
pingreq.rs1use std::time::Duration;
2
3use actix::{Actor, Arbiter, AsyncContext, Context, Handler, Message, Recipient};
4use log::{error, trace};
5use mqtt::packet::PingreqPacket;
6
7use crate::actors::actions::status::{StatusExistenceMessage, StatusOperationMessage};
8use crate::actors::{ErrorMessage, StopMessage};
9
10use super::handle_mailbox_error_with_resend;
11use super::VariablePacketMessage;
12
13#[derive(Message)]
14#[rtype(result = "()")]
15pub struct Pingreq(pub u16);
16
17#[derive(Message, Clone)]
18#[rtype(result = "()")]
19struct SendPing(pub u16);
20
21pub struct PingreqActor {
22 status_recipient: Recipient<StatusOperationMessage<()>>,
23 connect_status_recipient: Recipient<StatusExistenceMessage>,
24 send_recipient: Recipient<VariablePacketMessage>,
25 error_recipient: Recipient<ErrorMessage>,
26 stop_recipient: Recipient<StopMessage>,
27 interval: Duration,
28}
29
30impl PingreqActor {
31 pub fn new(
32 status_recipient: Recipient<StatusOperationMessage<()>>,
33 connect_status_recipient: Recipient<StatusExistenceMessage>,
34 send_recipient: Recipient<VariablePacketMessage>,
35 error_recipient: Recipient<ErrorMessage>,
36 stop_recipient: Recipient<StopMessage>,
37 interval: Duration,
38 ) -> Self {
39 PingreqActor {
40 status_recipient,
41 connect_status_recipient,
42 send_recipient,
43 error_recipient,
44 stop_recipient,
45 interval,
46 }
47 }
48}
49
50impl Actor for PingreqActor {
51 type Context = Context<Self>;
52 fn started(&mut self, ctx: &mut Self::Context) {
53 trace!("PingreqActor started");
54 ctx.notify(Pingreq(0));
55 ctx.run_interval(self.interval.clone(), |_, ctx| {
56 trace!("Start to send ping");
57 ctx.notify(Pingreq(0));
58 });
59 }
60
61 fn stopped(&mut self, _: &mut Self::Context) {
62 trace!("PingreqActor stopped");
63 }
64}
65
66impl Handler<Pingreq> for PingreqActor {
67 type Result = ();
68 fn handle(&mut self, msg: Pingreq, ctx: &mut Self::Context) -> Self::Result {
69 let last_retry_count = msg.0;
70 assert_valid_retry_count!(PingreqActor, self, last_retry_count, 0);
71 let status_recipient = self.status_recipient.clone();
72 let connect_status_recipient = self.connect_status_recipient.clone();
73 let error_recipient = self.error_recipient.clone();
74 let stop_recipient = self.stop_recipient.clone();
75 let addr = ctx.address();
76 let addr_clone = addr.clone();
77 let status_future = async move {
78 let connect_status_result = connect_status_recipient
82 .send(StatusExistenceMessage(1u16))
83 .await;
84 match connect_status_result {
85 Ok(false) => {
86 trace!("Server not connected yet, do nothing.");
87 return;
88 }
89 Err(e) => {
90 error!("Failed to get connect status: {}", e);
91 return;
92 }
93 _ => {
94 trace!("Server connected, will send ping");
95 }
96 }
97
98 let status_result = status_recipient
99 .send(StatusOperationMessage::GetAndRemovePacketStatus(0))
100 .await;
101 match status_result {
102 Ok(status) => {
103 if status.is_none() {
104 addr.do_send(SendPing(0));
106 }
107 }
108 Err(e) => {
109 handle_mailbox_error_with_resend(
110 "PingreqActor:status_recipient",
111 e,
112 &error_recipient,
113 &stop_recipient,
114 addr_clone,
115 Pingreq(last_retry_count + 1),
116 );
117 }
118 }
119 };
120 Arbiter::current().spawn(status_future);
121 }
122}
123
124fn get_retry_count_from_message(msg: &SendPing) -> u16 {
125 msg.0
126}
127
128fn create_retry_msessage_from_message(msg: SendPing) -> SendPing {
129 SendPing(msg.0 + 1)
130}
131
132fn create_packet_and_id_from_message(_: &SendPing) -> Option<(PingreqPacket, u16)> {
133 Some((PingreqPacket::new(), 0))
134}
135
136impl_send_packet_actor!(
137 PingreqActor,
138 SendPing,
139 PingreqPacket,
140 get_retry_count_from_message,
141 create_retry_msessage_from_message,
142 create_packet_and_id_from_message
143);