iot_device_bridge 1.1.1

Bridge between messaging of the device and the cloud IoT (e.g., AWS).
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
//! DeviceShadow  processing that is not device or shadow specific (see also the DeviceAdapter).
//! The DeviceShadow concept includes the named shadows for device state and iot state..
//!
//! Operations:
//!   - Retrieving DeviceShadow:
//!      - device or cloud client => IoT : `get` message
//!   - Updating DeviceShadow:
//!      - device => IoT : `update` message with state `reported`
//!      - cloud client => IoT : `update` message with state `desired`
//!   - Deleting DeviceShadow:
//!      - device or cloud client => IoT :
//!         - a property : `update` message with property : null
//!         - entire shadow : `delete` message
//!
//! To enable low coupling between adapting to the device specificity and the generic shadow,
//! the device specific processing is defined in the `device_adapter::DeviceState`.

use crate::connector_aws::{self, IotMessage};
use crate::error::IoTError::{self, AWSResponseError, ChannelSendError};
use log::debug;
use rumqttc::{self, Event, EventLoop, Packet, QoS, Request, Sender, Subscribe, SubscribeFilter};
use serde_json::Value;
use tokio::sync::broadcast::Sender as Tx;
use tokio::sync::mpsc::Sender as Xmit;
use tokio::sync::RwLock;

// Device Shadow Subscription topics
pub const UPDATE_ACCEPTED: &str = "update/accepted";
const UPDATE_REJECTED: &str = "update/rejected";
pub const UPDATE_DELTA: &str = "update/delta";
pub const UPDATE_DOCUMENTS: &str = "update/documents";
pub const GET_ACCEPTED: &str = "get/accepted";
const GET_REJECTED: &str = "get/rejected";
const DELETE_ACCEPTED: &str = "delete/accepted";
const DELETE_REJECTED: &str = "delete/rejected";

/// Device Shadow Publishing topics
pub const GET: &str = "get";
pub const UPDATE: &str = "update";
// currently not used publishing topics: DELETE
// const DELETE: &str =    "delete";

/// Device Shadow Subscribing topics
pub const SHADOW_TOPICS_NUM: usize = 8;
const SHADOW_SUB_TOPICS: [&'static str; SHADOW_TOPICS_NUM] = [
    UPDATE_ACCEPTED,
    UPDATE_REJECTED,
    UPDATE_DELTA,
    UPDATE_DOCUMENTS,
    GET_ACCEPTED,
    GET_REJECTED,
    DELETE_ACCEPTED,
    DELETE_REJECTED,
];

const TOPIC_STRING_CAPACITY: usize = 100;

/// Structure holding the `DeviceShadowDescriptor` for device and iot shadows
#[derive(Debug)]
pub struct ShadowDescriptors {
    pub device: DeviceShadowDescriptor,
    pub iot: DeviceShadowDescriptor,
}

impl ShadowDescriptors {
    pub fn new(device_descr: DeviceShadowDescriptor, iot_descr: DeviceShadowDescriptor) -> Self {
        ShadowDescriptors {
            device: device_descr,
            iot: iot_descr,
        }
    }
}

/// DeviceShadowDescriptor holds the easy accessible parameters of a DeviceShadow
/// - name : name of the shadow
/// - shadow : the device shadow structure containing shadow value and processing info
/// - tx : channel for sending shadow to the task processing value of shadow delta (or desired)
#[derive(Debug)]
pub struct DeviceShadowDescriptor {
    name: String,
    shadow: DeviceShadow,
    tx: Tx<serde_json::Value>,
}

impl DeviceShadowDescriptor {
    pub fn new(name: String, shadow: DeviceShadow, tx: Tx<serde_json::Value>) -> Self {
        DeviceShadowDescriptor {
            name: name,
            shadow: shadow,
            tx: tx,
        }
    }

    pub fn get_name(&self) -> String {
        self.name.clone()
    }

    pub fn get_shadow_ref(&mut self) -> &mut DeviceShadow {
        &mut self.shadow
    }

    pub fn get_tx(&self) -> Tx<serde_json::Value> {
        self.tx.clone()
    }
}

/// ShadowTopic holds pob/sub prefix and shadow name
/// - prefix : specific part of the pub/sub topics used to co0mmunication with cloud IoT
/// - name : shadow name
#[derive(Debug, Clone)]
pub struct ShadowTopic {
    prefix: String,
    name: String,
}

impl ShadowTopic {
    fn new(thing_name: String, shadow_type: ShadowType) -> Self {
        let (shadow_topic_prefix, shadow_name) = match shadow_type {
            ShadowType::Classic => (format!("$aws/things/{}/shadow/", thing_name), String::new()),
            ShadowType::Named(shadow_name) => (
                format!("$aws/things/{}/shadow/name/{}/", thing_name, shadow_name),
                shadow_name,
            ),
        };
        let mut prefix = String::with_capacity(TOPIC_STRING_CAPACITY);
        prefix.push_str(&shadow_topic_prefix);
        return ShadowTopic {
            prefix: prefix,
            name: shadow_name,
        };
    }

    pub fn build(&self, postfix: &str) -> String {
        let mut full_topic = self.prefix.to_string();
        full_topic.push_str(postfix);
        full_topic
    }

    fn get_prefix(&self) -> String {
        return self.prefix.clone();
    }

    fn get_name(&self) -> String {
        return self.name.clone();
    }
}

pub enum ShadowType {
    Classic,
    Named(String),
}

impl std::fmt::Display for ShadowType {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        match self {
            ShadowType::Classic => write!(f, "CLASSIC"),
            ShadowType::Named(t) => write!(f, "NAMED:{}", t),
        }
    }
}

/// DeviceShadow holds the attributes for handling a shadow
/// - shadow_value : JSON values of the shadow
/// - eventloop_handle : handle for communocating with the cloud shadow
/// - shadow_topic : ShadowTopic
#[derive(Debug)]
pub struct DeviceShadow {
    shadow_value: RwLock<serde_json::Value>,
    eventloop_handle: Sender<Request>,
    shadow_topic: ShadowTopic,
}

impl DeviceShadow {
    pub fn new(
        thing_name: String,
        shadow_type: ShadowType,
        eventloop_handle: Sender<Request>,
    ) -> Self {
        let shadow_topic = ShadowTopic::new(thing_name, shadow_type);

        let device_shadow = DeviceShadow {
            shadow_value: RwLock::new(serde_json::Value::Null),
            eventloop_handle: eventloop_handle,
            shadow_topic: shadow_topic.clone(),
        };

        device_shadow
    }

    /// Get Device Shadow prefix, e.g., AWS specific
    pub fn get_name(&self) -> String {
        return self.shadow_topic.get_name();
    }

    /// Get Device Shadow prefix, e.g., AWS specific
    pub fn get_prefix(&self) -> String {
        return self.shadow_topic.get_prefix();
    }

    /// Get Device Shadow topic, e.g., AWS specific.
    pub fn get_shadow_topic(&self) -> ShadowTopic {
        return self.shadow_topic.clone();
    }

    /// Get Shadow Name from topic, e.g., "$aws/things/{}/shadow/name/{}/"
    pub fn get_shadow_name_from_aws_topic(&self, aws_topic: &String) -> String {
        let v: Vec<&str> = aws_topic.split('/').collect();
        return v[5].to_string();
    }

    // // Publish message helper
    // async fn publish_msg(&self, topic: String, payload: serde_json::Value) {
    //     let mut publish = Publish::new(topic, QoS::AtMostOnce, payload.to_string());
    //     publish.retain = false;
    //     let publish = Request::Publish(publish);
    //     self.eventloop_handle.send(publish).await.unwrap();
    // }

    /// Builds either UPDATE reported or UPDATE desired-and-reported Iot Message
    pub fn build_shadow_message_update(
        &self,
        shadow_value: String,
        reported_only: bool,
    ) -> IotMessage {
        IotMessage::new(
            self.get_shadow_topic().build(UPDATE).to_string(),
            build_shadow_message_payload(shadow_value, reported_only),
        )
    }

    /// Write shadow_response object to shadow object.
    async fn put_local_shadow(&self, value: &serde_json::Value) {
        let mut shadow_value = self.shadow_value.write().await;
        *shadow_value = value.clone();
    }

    /// The `retrieve_shadow_delivery` function
    ///   1. performs actions on the general DeviceShadow
    ///   2. stores the delivered payload
    ///   3. returns following sub-branch to be processed by device specific function  
    ///      a. Response `UPDATE_ACCEPTED`: `state.desired.*`
    ///      b. Response `UPDATE_DELTA`: `state.*`
    ///      c. Response `UPDATE_DOCUMENTS`: `current.state.desired.*`
    ///      d. Response `GET_ACCEPTED`: `state.desired.*`
    ///      e. all other responses return `serde_json::Value::Null`
    /// Remark: The device specific processing of the "documents", "delta" and "update" shadow responses
    /// (particularly containing incremental changes only) will be done in the Device Adapter.
    pub async fn retrieve_shadow_delivery(
        &mut self,
        event: &rumqttc::Publish,
    ) -> Result<(&str, Value), IoTError> {
        let prefix = self.get_prefix();
        let topic = event.topic.to_string();
        let shadow_payload: Value = serde_json::from_slice(&(event.payload.clone())).unwrap();
        let mut shadow_value: Value = serde_json::Value::Null;
        let shadow_msg: &str;

        match topic.strip_prefix(&prefix) {
            Some(UPDATE_ACCEPTED) => {
                debug!("Shadow Received: UPDATE ACCEPTED");
                self.put_local_shadow(&shadow_payload).await;
                (shadow_msg, shadow_value) = (
                    UPDATE_ACCEPTED,
                    (shadow_payload["state"]["desired"]).clone(),
                );
            }
            Some(UPDATE_REJECTED) => {
                debug!("Shadow Received: UPDATE Rejected");
                shadow_msg = UPDATE_REJECTED
            }
            Some(UPDATE_DELTA) => {
                debug!("Shadow Received: UPDATE DELTA");
                // self.put_local_shadow(&shadow_payload).await;
                (shadow_msg, shadow_value) = (UPDATE_DELTA, (shadow_payload["state"]).clone());
            }
            Some(UPDATE_DOCUMENTS) => {
                debug!("Shadow Received: UPDATE DOCUMENTS");
                self.put_local_shadow(&shadow_payload).await;
                (shadow_msg, shadow_value) = (
                    UPDATE_DOCUMENTS,
                    (shadow_payload["current"]["state"]["desired"]).clone(),
                );
            }
            Some(GET_ACCEPTED) => {
                debug!("Shadow Received: GET ACCEPTED");
                self.put_local_shadow(&shadow_payload).await;
                (shadow_msg, shadow_value) =
                    (GET_ACCEPTED, (shadow_payload["state"]["desired"]).clone());
            }
            Some(GET_REJECTED) => {
                debug!("Shadow Received: GET Rejected");
                shadow_msg = GET_REJECTED
            }
            Some(DELETE_ACCEPTED) => {
                debug!("Shadow Received: DELETE Rejected");
                self.put_local_shadow(&serde_json::Value::Null).await;
                shadow_msg = DELETE_ACCEPTED
            }
            Some(DELETE_REJECTED) => {
                debug!("Shadow Received: DELETE Rejected");
                shadow_msg = DELETE_REJECTED
            }
            Some(_) => {
                debug!("Not DeviceShadow topic: {}", topic);
                return Err(IoTError::NotDeviceShadowTopicError);
            }
            None => {
                debug!("Prefix not found: {}", prefix);
                return Err(IoTError::DeviceShadowError);
            }
        };

        Ok((shadow_msg, shadow_value))
    }

    /// Helper function to
    ///   - subscribe to all prefixes of a DeviceShadow and
    ///   - initiate GET of the remote shadow by sending the message to the IoT sender via MPSC channel
    pub async fn initiate_shadow(
        &self,
        eventloop: &mut EventLoop,
        xmit: Xmit<IotMessage>,
        shadow_initial_update: String,
    ) -> Result<(), IoTError> {
        let mut sub_topics: Vec<SubscribeFilter> = vec![];
        for topic in SHADOW_SUB_TOPICS.iter() {
            sub_topics.push(SubscribeFilter::new(
                self.shadow_topic.build(topic).to_string(),
                QoS::AtMostOnce,
            ));
        }
        let topic_list = Request::Subscribe(Subscribe::new_many(sub_topics)?);

        self.eventloop_handle.send_async(topic_list).await.unwrap();

        loop {
            match eventloop.poll().await? {
                Event::Incoming(event) => {
                    match event {
                        Packet::SubAck(r) => {
                            debug!("SubAck event on iot_receiver: {:?}", r);
                            // check if subscription to all shadow topics is acknowledged
                            if r.return_codes.len() == SHADOW_TOPICS_NUM {
                                if !shadow_initial_update.is_empty() {
                                    let update_remote_shadow_msg: IotMessage =
                                        connector_aws::IotMessage::new(
                                            self.shadow_topic.build(UPDATE).to_string(),
                                            shadow_initial_update,
                                        );
                                    match xmit.send(update_remote_shadow_msg).await {
                                        Ok(_) => return Ok::<_, IoTError>(()),
                                        Err(_) => return Err::<_, IoTError>(ChannelSendError),
                                    }
                                }
                                let get_remote_shadow_msg: IotMessage =
                                    connector_aws::IotMessage::new(
                                        self.shadow_topic.build(GET).to_string(),
                                        serde_json::Value::Null.to_string(),
                                    );
                                match xmit.send(get_remote_shadow_msg).await {
                                    Ok(_) => return Ok::<_, IoTError>(()),
                                    Err(_) => return Err::<_, IoTError>(ChannelSendError),
                                }
                            } else {
                                return Err::<_, IoTError>(AWSResponseError);
                            }
                        }
                        _ => {
                            debug!("SHADOW: Got event on iot_receiver: {:?}", event);
                        }
                    }
                }
                _ => (),
            }
        }
    }
}

/// A helper function for building JSON with `desired` and `reported` nodfes or `reported` only.
pub fn build_shadow_message_payload(shadow_value: String, reported_only: bool) -> String {
    return match reported_only {
        true => format!("{{\"state\":{{\"reported\":{}}}}}", shadow_value), // "{{\"state\":{{\"desired\"null,\"reported\":{}}}}}" -- add to send only reported node
        false => format!(
            "{{\"state\":{{\"desired\":{},\"reported\":{}}}}}",
            shadow_value, shadow_value
        ),
    };
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::config::CONFIG_DIRNAME;
    use crate::mqtt_client;
    use find_folder::Search;
    use std::env;

    fn find_config_dir() -> String {
        let mut exe_folder = env::current_exe().unwrap();
        println!("EXE_FOLDER: {:#?}", exe_folder);
        exe_folder.pop(); // Remove the executable's name, leaving the path to the containing folder
        let pb: std::path::PathBuf = Search::ParentsThenKids(5, 5)
            .of(exe_folder)
            .for_folder(CONFIG_DIRNAME)
            .expect("Config directory not found");
        return pb.into_os_string().into_string().unwrap();
    }

    #[tokio::test]
    async fn get_shadow_elements_test() {
        let thing_name: String = "iot_client_id".to_string();
        let shadow_name: String = "iot_shadow".to_string();
        let aws_topic: String = format!("$aws/things/{}/shadow/name/{}/", thing_name, shadow_name);
        let config_dir: String = find_config_dir();

        let aws_settings = mqtt_client::ConnectionSettings::new_tls(
            thing_name.clone(),
            "ENDPOINTID-ats.iot.eu-central-1.amazonaws.com".to_string(),
            8883,
            format!("{}{}", config_dir, "/certs/AmazonRootCA1.pem"),
            format!("{}{}", config_dir, "/certs/IotCertificate.pem"),
            format!("{}{}", config_dir, "/certs/IotPrivateKey.pem"),
            None,
        );

        let (iot_client, _) = mqtt_client::AsyncClient::new(aws_settings).await.unwrap();

        let device_shadow: DeviceShadow;
        device_shadow = DeviceShadow::new(
            thing_name,
            ShadowType::Named(shadow_name),
            iot_client.get_eventloop_handle(),
        );

        let device_shadow_prefix = device_shadow.get_prefix();
        println!("device_shadow_prefix: {:?}", device_shadow_prefix);
        assert_eq!(
            device_shadow_prefix,
            "$aws/things/iot_client_id/shadow/name/iot_shadow/"
        );

        let device_shadow_topic = device_shadow.get_shadow_topic();
        println!("device_shadow_topic: {:?}", device_shadow_topic);
        assert_eq!(
            device_shadow_topic.prefix,
            "$aws/things/iot_client_id/shadow/name/iot_shadow/"
        );

        let device_shadow_name = device_shadow.get_shadow_name_from_aws_topic(&aws_topic);
        println!("device_shadow_name: {:?}", device_shadow_name);
        assert_eq!(device_shadow_name, "iot_shadow");
    }

    #[test]
    fn build_shadow_message_payload_test() {
        let shadow_value = "{\"iot_registration_status\":\"REGISTERED\"}".to_string();
        let payload = build_shadow_message_payload(shadow_value, true);
        println!("payload for reported_only=true: {}", payload);
        assert_eq!(
            payload,
            "{\"state\":{\"reported\":{\"iot_registration_status\":\"REGISTERED\"}}}"
        );

        let shadow_value =
            "{\"iot_registration_status\":\"CERTIFICATE_ROTATION_REQUESTED\"}".to_string();
        let payload = build_shadow_message_payload(shadow_value, false);
        println!("payload for reported_only=false: {}", payload);
        assert_eq!(payload, "{\"state\":{\"desired\":{\"iot_registration_status\":\"CERTIFICATE_ROTATION_REQUESTED\"},\"reported\":{\"iot_registration_status\":\"CERTIFICATE_ROTATION_REQUESTED\"}}}");
    }
}