1use std::marker::PhantomData;
2
3use bytestring::ByteString;
4use mqtt_codec as mqtt;
5
6use crate::dispatcher::MqttState;
7use crate::sink::MqttSink;
8
9pub struct Subscribe<S> {
11 topics: Vec<(ByteString, mqtt::QoS)>,
12 codes: Vec<mqtt::SubscribeReturnCode>,
13 state: MqttState<S>,
14}
15
16pub struct SubscribeResult {
18 pub(crate) codes: Vec<mqtt::SubscribeReturnCode>,
19}
20
21impl<S> Subscribe<S> {
22 pub(crate) fn new(state: MqttState<S>, topics: Vec<(ByteString, mqtt::QoS)>) -> Self {
23 let mut codes = Vec::with_capacity(topics.len());
24 (0..topics.len()).for_each(|_| codes.push(mqtt::SubscribeReturnCode::Failure));
25
26 Self {
27 topics,
28 state,
29 codes,
30 }
31 }
32
33 #[inline]
34 pub fn session(&self) -> &S {
36 self.state.session()
37 }
38
39 #[inline]
40 pub fn session_mut(&mut self) -> &mut S {
42 self.state.session_mut()
43 }
44
45 #[inline]
46 pub fn sink(&self) -> MqttSink {
48 self.state.sink().clone()
49 }
50
51 #[inline]
52 pub fn iter_mut(&mut self) -> SubscribeIter<S> {
54 SubscribeIter {
55 subs: self as *const _ as *mut _,
56 entry: 0,
57 lt: PhantomData,
58 }
59 }
60
61 #[inline]
62 pub fn into_result(self) -> SubscribeResult {
64 SubscribeResult { codes: self.codes }
65 }
66}
67
68impl<'a, S> IntoIterator for &'a mut Subscribe<S> {
69 type Item = Subscription<'a, S>;
70 type IntoIter = SubscribeIter<'a, S>;
71
72 fn into_iter(self) -> SubscribeIter<'a, S> {
73 self.iter_mut()
74 }
75}
76
77pub struct SubscribeIter<'a, S> {
79 subs: *mut Subscribe<S>,
80 entry: usize,
81 lt: PhantomData<&'a mut Subscribe<S>>,
82}
83
84impl<'a, S> SubscribeIter<'a, S> {
85 fn next_unsafe(&mut self) -> Option<Subscription<'a, S>> {
86 let subs = unsafe { &mut *self.subs };
87
88 if self.entry < subs.topics.len() {
89 let s = Subscription {
90 topic: &subs.topics[self.entry].0,
91 qos: subs.topics[self.entry].1,
92 state: subs.state.clone(),
93 code: &mut subs.codes[self.entry],
94 };
95 self.entry += 1;
96 Some(s)
97 } else {
98 None
99 }
100 }
101}
102
103impl<'a, S> Iterator for SubscribeIter<'a, S> {
104 type Item = Subscription<'a, S>;
105
106 #[inline]
107 fn next(&mut self) -> Option<Subscription<'a, S>> {
108 self.next_unsafe()
109 }
110}
111
112pub struct Subscription<'a, S> {
114 topic: &'a ByteString,
115 state: MqttState<S>,
116 qos: mqtt::QoS,
117 code: &'a mut mqtt::SubscribeReturnCode,
118}
119
120impl<'a, S> Subscription<'a, S> {
121 #[inline]
122 pub fn session(&self) -> &S {
124 self.state.session()
125 }
126
127 #[inline]
128 pub fn session_mut(&mut self) -> &mut S {
130 self.state.session_mut()
131 }
132
133 #[inline]
134 pub fn topic(&self) -> &'a ByteString {
136 &self.topic
137 }
138
139 #[inline]
140 pub fn qos(&self) -> mqtt::QoS {
142 self.qos
143 }
144
145 #[inline]
146 pub fn fail(&mut self) {
148 *self.code = mqtt::SubscribeReturnCode::Failure
149 }
150
151 #[inline]
152 pub fn subscribe(&mut self, qos: mqtt::QoS) {
154 *self.code = mqtt::SubscribeReturnCode::Success(qos)
155 }
156}
157
158pub struct Unsubscribe<S> {
160 state: MqttState<S>,
161 topics: Vec<ByteString>,
162}
163
164impl<S> Unsubscribe<S> {
165 pub(crate) fn new(state: MqttState<S>, topics: Vec<ByteString>) -> Self {
166 Self { topics, state }
167 }
168
169 #[inline]
170 pub fn session(&self) -> &S {
172 self.state.session()
173 }
174
175 #[inline]
176 pub fn session_mut(&mut self) -> &mut S {
178 self.state.session_mut()
179 }
180
181 #[inline]
182 pub fn sink(&self) -> MqttSink {
184 self.state.sink().clone()
185 }
186
187 pub fn iter(&self) -> impl Iterator<Item = &ByteString> {
189 self.topics.iter()
190 }
191}