1use core::fmt::{Display, Write};
2use core::str::FromStr;
3
4use mqttrust::{Mqtt, QoS, SubscribeTopic};
5
6use crate::ota::error::OtaError;
7use crate::{
8 jobs::{MAX_STREAM_ID_LEN, MAX_THING_NAME_LEN},
9 ota::{
10 config::Config,
11 data_interface::{DataInterface, FileBlock, Protocol},
12 encoding::{cbor, FileContext},
13 },
14};
15
16#[derive(Debug, Clone, Copy, PartialEq)]
17pub enum Encoding {
18 Cbor,
19 Json,
20}
21
22impl Display for Encoding {
23 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
24 match self {
25 Encoding::Cbor => write!(f, "cbor"),
26 Encoding::Json => write!(f, "json"),
27 }
28 }
29}
30
31impl FromStr for Encoding {
32 type Err = ();
33
34 fn from_str(s: &str) -> Result<Self, Self::Err> {
35 match s {
36 "cbor" => Ok(Self::Cbor),
37 "json" => Ok(Self::Json),
38 _ => Err(()),
39 }
40 }
41}
42
43#[derive(Debug)]
44pub enum Topic<'a> {
45 Data(Encoding, &'a str),
46 Description(Encoding, &'a str),
47 Rejected(Encoding, &'a str),
48}
49
50impl<'a> Topic<'a> {
51 pub fn from_str(s: &'a str) -> Option<Self> {
52 let tt = s.splitn(8, '/').collect::<heapless::Vec<&str, 8>>();
53 Some(match (tt.get(0), tt.get(1), tt.get(2), tt.get(3)) {
54 (Some(&"$aws"), Some(&"things"), _, Some(&"streams")) => {
55 match (tt.get(4), tt.get(5), tt.get(6), tt.get(7)) {
57 (Some(stream_name), Some(&"data"), Some(encoding), None) => {
58 Topic::Data(Encoding::from_str(encoding).ok()?, stream_name)
59 }
60 (Some(stream_name), Some(&"description"), Some(encoding), None) => {
61 Topic::Description(Encoding::from_str(encoding).ok()?, stream_name)
62 }
63 (Some(stream_name), Some(&"rejected"), Some(encoding), None) => {
64 Topic::Rejected(Encoding::from_str(encoding).ok()?, stream_name)
65 }
66 _ => return None,
67 }
68 }
69 _ => return None,
70 })
71 }
72}
73
74impl<'a> From<&Topic<'a>> for OtaTopic<'a> {
75 fn from(t: &Topic<'a>) -> Self {
76 match t {
77 Topic::Data(encoding, job_id) => Self::Data(*encoding, job_id),
78 Topic::Description(encoding, job_id) => Self::Description(*encoding, job_id),
79 Topic::Rejected(encoding, job_id) => Self::Rejected(*encoding, job_id),
80 }
81 }
82}
83
84enum OtaTopic<'a> {
85 Data(Encoding, &'a str),
86 Description(Encoding, &'a str),
87 Rejected(Encoding, &'a str),
88
89 Get(Encoding, &'a str),
90}
91
92impl<'a> OtaTopic<'a> {
93 pub fn format<const L: usize>(&self, client_id: &str) -> Result<heapless::String<L>, OtaError> {
94 let mut topic_path = heapless::String::new();
95 match self {
96 Self::Data(encoding, stream_name) => topic_path.write_fmt(format_args!(
97 "$aws/things/{}/streams/{}/data/{}",
98 client_id, stream_name, encoding
99 )),
100 Self::Description(encoding, stream_name) => topic_path.write_fmt(format_args!(
101 "$aws/things/{}/streams/{}/description/{}",
102 client_id, stream_name, encoding
103 )),
104 Self::Rejected(encoding, stream_name) => topic_path.write_fmt(format_args!(
105 "$aws/things/{}/streams/{}/rejected/{}",
106 client_id, stream_name, encoding
107 )),
108 Self::Get(encoding, stream_name) => topic_path.write_fmt(format_args!(
109 "$aws/things/{}/streams/{}/get/{}",
110 client_id, stream_name, encoding
111 )),
112 }
113 .map_err(|_| OtaError::Overflow)?;
114
115 Ok(topic_path)
116 }
117}
118
119impl<'a, M> DataInterface for &'a M
120where
121 M: Mqtt,
122{
123 const PROTOCOL: Protocol = Protocol::Mqtt;
124
125 fn init_file_transfer(&self, file_ctx: &mut FileContext) -> Result<(), OtaError> {
127 let topic_path = OtaTopic::Data(Encoding::Cbor, file_ctx.stream_name.as_str())
128 .format::<256>(self.client_id())?;
129 let topic = SubscribeTopic {
130 topic_path: topic_path.as_str(),
131 qos: mqttrust::QoS::AtLeastOnce,
132 };
133
134 debug!("Subscribing to: [{:?}]", &topic_path);
135
136 self.subscribe(&[topic])?;
137
138 Ok(())
139 }
140
141 fn request_file_block(
143 &self,
144 file_ctx: &mut FileContext,
145 config: &Config,
146 ) -> Result<(), OtaError> {
147 file_ctx.request_block_remaining = file_ctx.bitmap.len() as u32;
149
150 let buf = &mut [0u8; 32];
151 let len = cbor::to_slice(
152 &cbor::GetStreamRequest {
153 client_token: None,
155 stream_version: None,
156 file_id: file_ctx.fileid,
157 block_size: config.block_size,
158 block_offset: Some(file_ctx.block_offset),
159 block_bitmap: Some(&file_ctx.bitmap),
160 number_of_blocks: None,
161 },
162 buf,
163 )
164 .map_err(|_| OtaError::Encoding)?;
165
166 self.publish(
167 OtaTopic::Get(Encoding::Cbor, file_ctx.stream_name.as_str())
168 .format::<{ MAX_STREAM_ID_LEN + MAX_THING_NAME_LEN + 30 }>(self.client_id())?
169 .as_str(),
170 &buf[..len],
171 QoS::AtMostOnce,
172 )?;
173
174 Ok(())
175 }
176
177 fn decode_file_block<'c>(
179 &self,
180 _file_ctx: &mut FileContext,
181 payload: &'c mut [u8],
182 ) -> Result<FileBlock<'c>, OtaError> {
183 Ok(
184 serde_cbor::de::from_mut_slice::<cbor::GetStreamResponse>(payload)
185 .map_err(|_| OtaError::Encoding)?
186 .into(),
187 )
188 }
189
190 fn cleanup(&self, file_ctx: &mut FileContext, config: &Config) -> Result<(), OtaError> {
192 if config.unsubscribe_on_shutdown {
193 self.unsubscribe(&[
195 OtaTopic::Data(Encoding::Cbor, file_ctx.stream_name.as_str())
196 .format::<256>(self.client_id())?
197 .as_str(),
198 ])?;
199 }
200 Ok(())
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use mqttrust::{encoding::v4::decode_slice, Packet, SubscribeTopic};
207
208 use super::*;
209 use crate::{ota::test::test_file_ctx, test::MockMqtt};
210
211 #[test]
212 fn protocol_fits() {
213 assert_eq!(<&MockMqtt as DataInterface>::PROTOCOL, Protocol::Mqtt);
214 }
215
216 #[test]
217 fn init_file_transfer_subscribes() {
218 let mqtt = &MockMqtt::new();
219
220 let mut file_ctx = test_file_ctx(&Config::default());
221
222 mqtt.init_file_transfer(&mut file_ctx).unwrap();
223
224 assert_eq!(mqtt.tx.borrow_mut().len(), 1);
225
226 let bytes = mqtt.tx.borrow_mut().pop_front().unwrap();
227
228 let packet = decode_slice(bytes.as_slice()).unwrap();
229 let topics = match packet {
230 Some(Packet::Subscribe(ref s)) => s.topics().collect::<Vec<_>>(),
231 _ => panic!(),
232 };
233 assert_eq!(
234 topics,
235 vec![SubscribeTopic {
236 topic_path: "$aws/things/test_client/streams/test_stream/data/cbor",
237 qos: QoS::AtLeastOnce
238 }]
239 );
240 }
241
242 #[test]
243 fn request_file_block_publish() {
244 let mqtt = &MockMqtt::new();
245
246 let config = Config::default();
247 let mut file_ctx = test_file_ctx(&config);
248
249 mqtt.request_file_block(&mut file_ctx, &config).unwrap();
250
251 assert_eq!(mqtt.tx.borrow_mut().len(), 1);
252
253 let bytes = mqtt.tx.borrow_mut().pop_front().unwrap();
254
255 let publish = match decode_slice(bytes.as_slice()).unwrap() {
256 Some(Packet::Publish(s)) => s,
257 _ => panic!(),
258 };
259
260 assert_eq!(
261 publish,
262 mqttrust::encoding::v4::publish::Publish {
263 dup: false,
264 qos: QoS::AtMostOnce,
265 retain: false,
266 topic_name: "$aws/things/test_client/streams/test_stream/get/cbor",
267 payload: &[
268 164, 97, 102, 0, 97, 108, 25, 1, 0, 97, 111, 0, 97, 98, 68, 255, 255, 255, 127
269 ],
270 pid: None
271 }
272 );
273 }
274
275 #[test]
276 fn decode_file_block_cbor() {
277 let mqtt = &MockMqtt::new();
278
279 let mut file_ctx = test_file_ctx(&Config::default());
280
281 let payload = &mut [
282 191, 97, 102, 0, 97, 105, 0, 97, 108, 25, 4, 0, 97, 112, 89, 4, 0, 141, 62, 28, 246,
283 80, 193, 2, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
284 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
285 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
286 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
287 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
288 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
289 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
290 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
291 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
292 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
293 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
294 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
295 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
296 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
297 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
298 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
299 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
300 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
301 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
302 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
303 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
304 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
305 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
306 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
307 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
308 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
309 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
310 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
311 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
312 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
313 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
314 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
315 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
316 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
317 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
318 0, 0, 0, 0, 0, 0, 255,
319 ];
320
321 let file_blk = mqtt.decode_file_block(&mut file_ctx, payload).unwrap();
322
323 assert_eq!(mqtt.tx.borrow_mut().len(), 0);
324 assert_eq!(file_blk.file_id, 0);
325 assert_eq!(file_blk.block_id, 0);
326 assert_eq!(
327 file_blk.block_payload,
328 &[
329 141, 62, 28, 246, 80, 193, 2, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
330 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
331 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
332 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
333 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
334 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
335 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
336 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
337 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
338 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
339 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
340 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
341 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
342 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
343 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
344 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
345 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
346 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
347 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
348 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
349 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
350 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
351 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
352 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
353 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
354 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
355 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
356 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
357 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
358 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
359 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
360 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
361 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
362 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
363 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
364 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
365 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
366 ]
367 );
368 assert_eq!(file_blk.block_size, 1024);
369 assert_eq!(file_blk.client_token, None);
370 }
371
372 #[test]
373 fn cleanup_unsubscribe() {
374 let mqtt = &MockMqtt::new();
375
376 let config = Config::default();
377
378 let mut file_ctx = test_file_ctx(&config);
379
380 mqtt.cleanup(&mut file_ctx, &config).unwrap();
381
382 assert_eq!(mqtt.tx.borrow_mut().len(), 1);
383 let bytes = mqtt.tx.borrow_mut().pop_front().unwrap();
384
385 let packet = decode_slice(bytes.as_slice()).unwrap();
386 let topics = match packet {
387 Some(Packet::Unsubscribe(ref s)) => s.topics().collect::<Vec<_>>(),
388 _ => panic!(),
389 };
390
391 assert_eq!(
392 topics,
393 vec!["$aws/things/test_client/streams/test_stream/data/cbor"]
394 );
395 }
396
397 #[test]
398 fn cleanup_no_unsubscribe() {
399 let mqtt = &MockMqtt::new();
400
401 let mut config = Config::default();
402 config.unsubscribe_on_shutdown = false;
403
404 let mut file_ctx = test_file_ctx(&config);
405
406 mqtt.cleanup(&mut file_ctx, &config).unwrap();
407
408 assert_eq!(mqtt.tx.borrow_mut().len(), 0);
409 }
410}