actix_mqtt/
sink.rs

1use std::collections::VecDeque;
2use std::fmt;
3
4use actix_ioframe::Sink;
5use actix_utils::oneshot;
6use bytes::Bytes;
7use bytestring::ByteString;
8use futures::future::{Future, TryFutureExt};
9use mqtt_codec as mqtt;
10
11use crate::cell::Cell;
12
13#[derive(Clone)]
14pub struct MqttSink {
15    sink: Sink<mqtt::Packet>,
16    pub(crate) inner: Cell<MqttSinkInner>,
17}
18
19#[derive(Default)]
20pub(crate) struct MqttSinkInner {
21    pub(crate) idx: u16,
22    pub(crate) queue: VecDeque<(u16, oneshot::Sender<()>)>,
23}
24
25impl MqttSink {
26    pub(crate) fn new(sink: Sink<mqtt::Packet>) -> Self {
27        MqttSink {
28            sink,
29            inner: Cell::new(MqttSinkInner::default()),
30        }
31    }
32
33    /// Close mqtt connection
34    pub fn close(&self) {
35        self.sink.close();
36    }
37
38    /// Send publish packet with qos set to 0
39    pub fn publish_qos0(&self, topic: ByteString, payload: Bytes, dup: bool) {
40        log::trace!("Publish (QoS0) to {:?}", topic);
41        let publish = mqtt::Publish {
42            topic,
43            payload,
44            dup,
45            retain: false,
46            qos: mqtt::QoS::AtMostOnce,
47            packet_id: None,
48        };
49        self.sink.send(mqtt::Packet::Publish(publish));
50    }
51
52    /// Send publish packet
53    pub fn publish_qos1(
54        &mut self,
55        topic: ByteString,
56        payload: Bytes,
57        dup: bool,
58    ) -> impl Future<Output = Result<(), ()>> {
59        let (tx, rx) = oneshot::channel();
60
61        let inner = self.inner.get_mut();
62        inner.idx += 1;
63        if inner.idx == 0 {
64            inner.idx = 1
65        }
66        inner.queue.push_back((inner.idx, tx));
67
68        let publish = mqtt::Packet::Publish(mqtt::Publish {
69            topic,
70            payload,
71            dup,
72            retain: false,
73            qos: mqtt::QoS::AtLeastOnce,
74            packet_id: Some(inner.idx),
75        });
76        log::trace!("Publish (QoS1) to {:#?}", publish);
77
78        self.sink.send(publish);
79        rx.map_err(|_| ())
80    }
81
82    pub(crate) fn complete_publish_qos1(&mut self, packet_id: u16) {
83        if let Some((idx, tx)) = self.inner.get_mut().queue.pop_front() {
84            if idx != packet_id {
85                log::trace!(
86                    "MQTT protocol error, packet_id order does not match, expected {}, got: {}",
87                    idx,
88                    packet_id
89                );
90                self.close();
91            } else {
92                log::trace!("Ack publish packet with id: {}", packet_id);
93                let _ = tx.send(());
94            }
95        } else {
96            log::trace!("Unexpected PublishAck packet");
97            self.close();
98        }
99    }
100}
101
102impl fmt::Debug for MqttSink {
103    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
104        fmt.debug_struct("MqttSink").finish()
105    }
106}