actix_mqtt/
subs.rs

1use std::marker::PhantomData;
2
3use bytestring::ByteString;
4use mqtt_codec as mqtt;
5
6use crate::dispatcher::MqttState;
7use crate::sink::MqttSink;
8
9/// Subscribe message
10pub struct Subscribe<S> {
11    topics: Vec<(ByteString, mqtt::QoS)>,
12    codes: Vec<mqtt::SubscribeReturnCode>,
13    state: MqttState<S>,
14}
15
16/// Result of a subscribe message
17pub struct SubscribeResult {
18    pub(crate) codes: Vec<mqtt::SubscribeReturnCode>,
19}
20
21impl<S> Subscribe<S> {
22    pub(crate) fn new(state: MqttState<S>, topics: Vec<(ByteString, mqtt::QoS)>) -> Self {
23        let mut codes = Vec::with_capacity(topics.len());
24        (0..topics.len()).for_each(|_| codes.push(mqtt::SubscribeReturnCode::Failure));
25
26        Self {
27            topics,
28            state,
29            codes,
30        }
31    }
32
33    #[inline]
34    /// returns reference to a connection session
35    pub fn session(&self) -> &S {
36        self.state.session()
37    }
38
39    #[inline]
40    /// returns mutable reference to a connection session
41    pub fn session_mut(&mut self) -> &mut S {
42        self.state.session_mut()
43    }
44
45    #[inline]
46    /// Mqtt client sink object
47    pub fn sink(&self) -> MqttSink {
48        self.state.sink().clone()
49    }
50
51    #[inline]
52    /// returns iterator over subscription topics
53    pub fn iter_mut(&mut self) -> SubscribeIter<S> {
54        SubscribeIter {
55            subs: self as *const _ as *mut _,
56            entry: 0,
57            lt: PhantomData,
58        }
59    }
60
61    #[inline]
62    /// convert subscription to a result
63    pub fn into_result(self) -> SubscribeResult {
64        SubscribeResult { codes: self.codes }
65    }
66}
67
68impl<'a, S> IntoIterator for &'a mut Subscribe<S> {
69    type Item = Subscription<'a, S>;
70    type IntoIter = SubscribeIter<'a, S>;
71
72    fn into_iter(self) -> SubscribeIter<'a, S> {
73        self.iter_mut()
74    }
75}
76
77/// Iterator over subscription topics
78pub struct SubscribeIter<'a, S> {
79    subs: *mut Subscribe<S>,
80    entry: usize,
81    lt: PhantomData<&'a mut Subscribe<S>>,
82}
83
84impl<'a, S> SubscribeIter<'a, S> {
85    fn next_unsafe(&mut self) -> Option<Subscription<'a, S>> {
86        let subs = unsafe { &mut *self.subs };
87
88        if self.entry < subs.topics.len() {
89            let s = Subscription {
90                topic: &subs.topics[self.entry].0,
91                qos: subs.topics[self.entry].1,
92                state: subs.state.clone(),
93                code: &mut subs.codes[self.entry],
94            };
95            self.entry += 1;
96            Some(s)
97        } else {
98            None
99        }
100    }
101}
102
103impl<'a, S> Iterator for SubscribeIter<'a, S> {
104    type Item = Subscription<'a, S>;
105
106    #[inline]
107    fn next(&mut self) -> Option<Subscription<'a, S>> {
108        self.next_unsafe()
109    }
110}
111
112/// Subscription topic
113pub struct Subscription<'a, S> {
114    topic: &'a ByteString,
115    state: MqttState<S>,
116    qos: mqtt::QoS,
117    code: &'a mut mqtt::SubscribeReturnCode,
118}
119
120impl<'a, S> Subscription<'a, S> {
121    #[inline]
122    /// reference to a connection session
123    pub fn session(&self) -> &S {
124        self.state.session()
125    }
126
127    #[inline]
128    /// mutable reference to a connection session
129    pub fn session_mut(&mut self) -> &mut S {
130        self.state.session_mut()
131    }
132
133    #[inline]
134    /// subscription topic
135    pub fn topic(&self) -> &'a ByteString {
136        &self.topic
137    }
138
139    #[inline]
140    /// the level of assurance for delivery of an Application Message.
141    pub fn qos(&self) -> mqtt::QoS {
142        self.qos
143    }
144
145    #[inline]
146    /// fail to subscribe to the topic
147    pub fn fail(&mut self) {
148        *self.code = mqtt::SubscribeReturnCode::Failure
149    }
150
151    #[inline]
152    /// subscribe to a topic with specific qos
153    pub fn subscribe(&mut self, qos: mqtt::QoS) {
154        *self.code = mqtt::SubscribeReturnCode::Success(qos)
155    }
156}
157
158/// Unsubscribe message
159pub struct Unsubscribe<S> {
160    state: MqttState<S>,
161    topics: Vec<ByteString>,
162}
163
164impl<S> Unsubscribe<S> {
165    pub(crate) fn new(state: MqttState<S>, topics: Vec<ByteString>) -> Self {
166        Self { topics, state }
167    }
168
169    #[inline]
170    /// reference to a connection session
171    pub fn session(&self) -> &S {
172        self.state.session()
173    }
174
175    #[inline]
176    /// mutable reference to a connection session
177    pub fn session_mut(&mut self) -> &mut S {
178        self.state.session_mut()
179    }
180
181    #[inline]
182    /// Mqtt client sink object
183    pub fn sink(&self) -> MqttSink {
184        self.state.sink().clone()
185    }
186
187    /// returns iterator over unsubscribe topics
188    pub fn iter(&self) -> impl Iterator<Item = &ByteString> {
189        self.topics.iter()
190    }
191}