rustot/ota/data_interface/
mqtt.rs

1use core::fmt::{Display, Write};
2use core::str::FromStr;
3
4use mqttrust::{Mqtt, QoS, SubscribeTopic};
5
6use crate::ota::error::OtaError;
7use crate::{
8    jobs::{MAX_STREAM_ID_LEN, MAX_THING_NAME_LEN},
9    ota::{
10        config::Config,
11        data_interface::{DataInterface, FileBlock, Protocol},
12        encoding::{cbor, FileContext},
13    },
14};
15
16#[derive(Debug, Clone, Copy, PartialEq)]
17pub enum Encoding {
18    Cbor,
19    Json,
20}
21
22impl Display for Encoding {
23    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
24        match self {
25            Encoding::Cbor => write!(f, "cbor"),
26            Encoding::Json => write!(f, "json"),
27        }
28    }
29}
30
31impl FromStr for Encoding {
32    type Err = ();
33
34    fn from_str(s: &str) -> Result<Self, Self::Err> {
35        match s {
36            "cbor" => Ok(Self::Cbor),
37            "json" => Ok(Self::Json),
38            _ => Err(()),
39        }
40    }
41}
42
43#[derive(Debug)]
44pub enum Topic<'a> {
45    Data(Encoding, &'a str),
46    Description(Encoding, &'a str),
47    Rejected(Encoding, &'a str),
48}
49
50impl<'a> Topic<'a> {
51    pub fn from_str(s: &'a str) -> Option<Self> {
52        let tt = s.splitn(8, '/').collect::<heapless::Vec<&str, 8>>();
53        Some(match (tt.get(0), tt.get(1), tt.get(2), tt.get(3)) {
54            (Some(&"$aws"), Some(&"things"), _, Some(&"streams")) => {
55                // This is a stream topic! Figure out which
56                match (tt.get(4), tt.get(5), tt.get(6), tt.get(7)) {
57                    (Some(stream_name), Some(&"data"), Some(encoding), None) => {
58                        Topic::Data(Encoding::from_str(encoding).ok()?, stream_name)
59                    }
60                    (Some(stream_name), Some(&"description"), Some(encoding), None) => {
61                        Topic::Description(Encoding::from_str(encoding).ok()?, stream_name)
62                    }
63                    (Some(stream_name), Some(&"rejected"), Some(encoding), None) => {
64                        Topic::Rejected(Encoding::from_str(encoding).ok()?, stream_name)
65                    }
66                    _ => return None,
67                }
68            }
69            _ => return None,
70        })
71    }
72}
73
74impl<'a> From<&Topic<'a>> for OtaTopic<'a> {
75    fn from(t: &Topic<'a>) -> Self {
76        match t {
77            Topic::Data(encoding, job_id) => Self::Data(*encoding, job_id),
78            Topic::Description(encoding, job_id) => Self::Description(*encoding, job_id),
79            Topic::Rejected(encoding, job_id) => Self::Rejected(*encoding, job_id),
80        }
81    }
82}
83
84enum OtaTopic<'a> {
85    Data(Encoding, &'a str),
86    Description(Encoding, &'a str),
87    Rejected(Encoding, &'a str),
88
89    Get(Encoding, &'a str),
90}
91
92impl<'a> OtaTopic<'a> {
93    pub fn format<const L: usize>(&self, client_id: &str) -> Result<heapless::String<L>, OtaError> {
94        let mut topic_path = heapless::String::new();
95        match self {
96            Self::Data(encoding, stream_name) => topic_path.write_fmt(format_args!(
97                "$aws/things/{}/streams/{}/data/{}",
98                client_id, stream_name, encoding
99            )),
100            Self::Description(encoding, stream_name) => topic_path.write_fmt(format_args!(
101                "$aws/things/{}/streams/{}/description/{}",
102                client_id, stream_name, encoding
103            )),
104            Self::Rejected(encoding, stream_name) => topic_path.write_fmt(format_args!(
105                "$aws/things/{}/streams/{}/rejected/{}",
106                client_id, stream_name, encoding
107            )),
108            Self::Get(encoding, stream_name) => topic_path.write_fmt(format_args!(
109                "$aws/things/{}/streams/{}/get/{}",
110                client_id, stream_name, encoding
111            )),
112        }
113        .map_err(|_| OtaError::Overflow)?;
114
115        Ok(topic_path)
116    }
117}
118
119impl<'a, M> DataInterface for &'a M
120where
121    M: Mqtt,
122{
123    const PROTOCOL: Protocol = Protocol::Mqtt;
124
125    /// Init file transfer by subscribing to the OTA data stream topic
126    fn init_file_transfer(&self, file_ctx: &mut FileContext) -> Result<(), OtaError> {
127        let topic_path = OtaTopic::Data(Encoding::Cbor, file_ctx.stream_name.as_str())
128            .format::<256>(self.client_id())?;
129        let topic = SubscribeTopic {
130            topic_path: topic_path.as_str(),
131            qos: mqttrust::QoS::AtLeastOnce,
132        };
133
134        debug!("Subscribing to: [{:?}]", &topic_path);
135
136        self.subscribe(&[topic])?;
137
138        Ok(())
139    }
140
141    /// Request file block by publishing to the get stream topic
142    fn request_file_block(
143        &self,
144        file_ctx: &mut FileContext,
145        config: &Config,
146    ) -> Result<(), OtaError> {
147        // Reset number of blocks requested
148        file_ctx.request_block_remaining = file_ctx.bitmap.len() as u32;
149
150        let buf = &mut [0u8; 32];
151        let len = cbor::to_slice(
152            &cbor::GetStreamRequest {
153                // Arbitrary client token sent in the stream "GET" message
154                client_token: None,
155                stream_version: None,
156                file_id: file_ctx.fileid,
157                block_size: config.block_size,
158                block_offset: Some(file_ctx.block_offset),
159                block_bitmap: Some(&file_ctx.bitmap),
160                number_of_blocks: None,
161            },
162            buf,
163        )
164        .map_err(|_| OtaError::Encoding)?;
165
166        self.publish(
167            OtaTopic::Get(Encoding::Cbor, file_ctx.stream_name.as_str())
168                .format::<{ MAX_STREAM_ID_LEN + MAX_THING_NAME_LEN + 30 }>(self.client_id())?
169                .as_str(),
170            &buf[..len],
171            QoS::AtMostOnce,
172        )?;
173
174        Ok(())
175    }
176
177    /// Decode a cbor encoded fileblock received from streaming service
178    fn decode_file_block<'c>(
179        &self,
180        _file_ctx: &mut FileContext,
181        payload: &'c mut [u8],
182    ) -> Result<FileBlock<'c>, OtaError> {
183        Ok(
184            serde_cbor::de::from_mut_slice::<cbor::GetStreamResponse>(payload)
185                .map_err(|_| OtaError::Encoding)?
186                .into(),
187        )
188    }
189
190    /// Perform any cleanup operations required for data plane
191    fn cleanup(&self, file_ctx: &mut FileContext, config: &Config) -> Result<(), OtaError> {
192        if config.unsubscribe_on_shutdown {
193            // Unsubscribe from data stream topics
194            self.unsubscribe(&[
195                OtaTopic::Data(Encoding::Cbor, file_ctx.stream_name.as_str())
196                    .format::<256>(self.client_id())?
197                    .as_str(),
198            ])?;
199        }
200        Ok(())
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use mqttrust::{encoding::v4::decode_slice, Packet, SubscribeTopic};
207
208    use super::*;
209    use crate::{ota::test::test_file_ctx, test::MockMqtt};
210
211    #[test]
212    fn protocol_fits() {
213        assert_eq!(<&MockMqtt as DataInterface>::PROTOCOL, Protocol::Mqtt);
214    }
215
216    #[test]
217    fn init_file_transfer_subscribes() {
218        let mqtt = &MockMqtt::new();
219
220        let mut file_ctx = test_file_ctx(&Config::default());
221
222        mqtt.init_file_transfer(&mut file_ctx).unwrap();
223
224        assert_eq!(mqtt.tx.borrow_mut().len(), 1);
225
226        let bytes = mqtt.tx.borrow_mut().pop_front().unwrap();
227
228        let packet = decode_slice(bytes.as_slice()).unwrap();
229        let topics = match packet {
230            Some(Packet::Subscribe(ref s)) => s.topics().collect::<Vec<_>>(),
231            _ => panic!(),
232        };
233        assert_eq!(
234            topics,
235            vec![SubscribeTopic {
236                topic_path: "$aws/things/test_client/streams/test_stream/data/cbor",
237                qos: QoS::AtLeastOnce
238            }]
239        );
240    }
241
242    #[test]
243    fn request_file_block_publish() {
244        let mqtt = &MockMqtt::new();
245
246        let config = Config::default();
247        let mut file_ctx = test_file_ctx(&config);
248
249        mqtt.request_file_block(&mut file_ctx, &config).unwrap();
250
251        assert_eq!(mqtt.tx.borrow_mut().len(), 1);
252
253        let bytes = mqtt.tx.borrow_mut().pop_front().unwrap();
254
255        let publish = match decode_slice(bytes.as_slice()).unwrap() {
256            Some(Packet::Publish(s)) => s,
257            _ => panic!(),
258        };
259
260        assert_eq!(
261            publish,
262            mqttrust::encoding::v4::publish::Publish {
263                dup: false,
264                qos: QoS::AtMostOnce,
265                retain: false,
266                topic_name: "$aws/things/test_client/streams/test_stream/get/cbor",
267                payload: &[
268                    164, 97, 102, 0, 97, 108, 25, 1, 0, 97, 111, 0, 97, 98, 68, 255, 255, 255, 127
269                ],
270                pid: None
271            }
272        );
273    }
274
275    #[test]
276    fn decode_file_block_cbor() {
277        let mqtt = &MockMqtt::new();
278
279        let mut file_ctx = test_file_ctx(&Config::default());
280
281        let payload = &mut [
282            191, 97, 102, 0, 97, 105, 0, 97, 108, 25, 4, 0, 97, 112, 89, 4, 0, 141, 62, 28, 246,
283            80, 193, 2, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
284            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
285            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
286            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
287            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
288            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
289            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
290            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
291            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
292            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
293            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
294            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
295            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
296            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
297            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
298            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
299            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
300            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
301            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
302            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
303            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
304            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
305            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
306            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
307            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
308            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
309            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
310            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
311            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
312            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
313            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
314            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
315            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
316            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
317            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
318            0, 0, 0, 0, 0, 0, 255,
319        ];
320
321        let file_blk = mqtt.decode_file_block(&mut file_ctx, payload).unwrap();
322
323        assert_eq!(mqtt.tx.borrow_mut().len(), 0);
324        assert_eq!(file_blk.file_id, 0);
325        assert_eq!(file_blk.block_id, 0);
326        assert_eq!(
327            file_blk.block_payload,
328            &[
329                141, 62, 28, 246, 80, 193, 2, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
330                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
331                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
332                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
333                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
334                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
335                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
336                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
337                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
338                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
339                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
340                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
341                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
342                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
343                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
344                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
345                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
346                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
347                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
348                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
349                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
350                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
351                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
352                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
353                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
354                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
355                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
356                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
357                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
358                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
359                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
360                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
361                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
362                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
363                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
364                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
365                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
366            ]
367        );
368        assert_eq!(file_blk.block_size, 1024);
369        assert_eq!(file_blk.client_token, None);
370    }
371
372    #[test]
373    fn cleanup_unsubscribe() {
374        let mqtt = &MockMqtt::new();
375
376        let config = Config::default();
377
378        let mut file_ctx = test_file_ctx(&config);
379
380        mqtt.cleanup(&mut file_ctx, &config).unwrap();
381
382        assert_eq!(mqtt.tx.borrow_mut().len(), 1);
383        let bytes = mqtt.tx.borrow_mut().pop_front().unwrap();
384
385        let packet = decode_slice(bytes.as_slice()).unwrap();
386        let topics = match packet {
387            Some(Packet::Unsubscribe(ref s)) => s.topics().collect::<Vec<_>>(),
388            _ => panic!(),
389        };
390
391        assert_eq!(
392            topics,
393            vec!["$aws/things/test_client/streams/test_stream/data/cbor"]
394        );
395    }
396
397    #[test]
398    fn cleanup_no_unsubscribe() {
399        let mqtt = &MockMqtt::new();
400
401        let mut config = Config::default();
402        config.unsubscribe_on_shutdown = false;
403
404        let mut file_ctx = test_file_ctx(&config);
405
406        mqtt.cleanup(&mut file_ctx, &config).unwrap();
407
408        assert_eq!(mqtt.tx.borrow_mut().len(), 0);
409    }
410}