1use std::collections::VecDeque;
2use std::fmt;
3
4use actix_ioframe::Sink;
5use actix_utils::oneshot;
6use bytes::Bytes;
7use bytestring::ByteString;
8use futures::future::{Future, TryFutureExt};
9use mqtt_codec as mqtt;
10
11use crate::cell::Cell;
12
13#[derive(Clone)]
14pub struct MqttSink {
15 sink: Sink<mqtt::Packet>,
16 pub(crate) inner: Cell<MqttSinkInner>,
17}
18
19#[derive(Default)]
20pub(crate) struct MqttSinkInner {
21 pub(crate) idx: u16,
22 pub(crate) queue: VecDeque<(u16, oneshot::Sender<()>)>,
23}
24
25impl MqttSink {
26 pub(crate) fn new(sink: Sink<mqtt::Packet>) -> Self {
27 MqttSink {
28 sink,
29 inner: Cell::new(MqttSinkInner::default()),
30 }
31 }
32
33 pub fn close(&self) {
35 self.sink.close();
36 }
37
38 pub fn publish_qos0(&self, topic: ByteString, payload: Bytes, dup: bool) {
40 log::trace!("Publish (QoS0) to {:?}", topic);
41 let publish = mqtt::Publish {
42 topic,
43 payload,
44 dup,
45 retain: false,
46 qos: mqtt::QoS::AtMostOnce,
47 packet_id: None,
48 };
49 self.sink.send(mqtt::Packet::Publish(publish));
50 }
51
52 pub fn publish_qos1(
54 &mut self,
55 topic: ByteString,
56 payload: Bytes,
57 dup: bool,
58 ) -> impl Future<Output = Result<(), ()>> {
59 let (tx, rx) = oneshot::channel();
60
61 let inner = self.inner.get_mut();
62 inner.idx += 1;
63 if inner.idx == 0 {
64 inner.idx = 1
65 }
66 inner.queue.push_back((inner.idx, tx));
67
68 let publish = mqtt::Packet::Publish(mqtt::Publish {
69 topic,
70 payload,
71 dup,
72 retain: false,
73 qos: mqtt::QoS::AtLeastOnce,
74 packet_id: Some(inner.idx),
75 });
76 log::trace!("Publish (QoS1) to {:#?}", publish);
77
78 self.sink.send(publish);
79 rx.map_err(|_| ())
80 }
81
82 pub(crate) fn complete_publish_qos1(&mut self, packet_id: u16) {
83 if let Some((idx, tx)) = self.inner.get_mut().queue.pop_front() {
84 if idx != packet_id {
85 log::trace!(
86 "MQTT protocol error, packet_id order does not match, expected {}, got: {}",
87 idx,
88 packet_id
89 );
90 self.close();
91 } else {
92 log::trace!("Ack publish packet with id: {}", packet_id);
93 let _ = tx.send(());
94 }
95 } else {
96 log::trace!("Unexpected PublishAck packet");
97 self.close();
98 }
99 }
100}
101
102impl fmt::Debug for MqttSink {
103 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
104 fmt.debug_struct("MqttSink").finish()
105 }
106}