iot_device_bridge 1.1.1

Bridge between messaging of the device and the cloud IoT (e.g., AWS).
Documentation
//! IoT connector for bridging to the cloud or other IoT facilities.
//!
//! This is a minimal version.
//!
//! Objectives:
//!   1. process events from AWS related to the Device Shadow operations
//!      - these could be rules for processing messages from the device, e.g.:
//!        filtering by a whitelist or mapping of topics
//!      - these could be complex commands / tasks, e.g.:
//!        robot navigation targets / paths
//!   2. process events from AWS related to the IoT Shadow operations
//!      - e.g., initiating Certificate Rotation procedure
//!   3. process other IoT messages / events from cloud IoT, e.g.:
//!      messages directed to the device, e.g. specific short commands

use crate::config::{IotRegistrationStatus};
use crate::device_shadow::{
    DeviceShadowDescriptor, ShadowDescriptors, GET_ACCEPTED, UPDATE_ACCEPTED, UPDATE_DELTA,
    UPDATE_DOCUMENTS,
};
use crate::error::IoTError::{self, ChannelSendError, DeviceShadowError};
use crate::crypto::{DataEncryptionConfig, DataEncryptionConfigOption};
use tokio::sync::mpsc::Sender as Xmit;

use log::debug;
use serde::{self, Deserialize, Serialize};

/// `IotMessage` is a structure representing the topic and the payload of a message.
#[derive(Debug)]
pub struct IotMessage {
    pub topic: String,
    pub message: String,
}

impl std::fmt::Display for IotMessage {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(f, "topic: {} message: {}", self.topic, self.message)
    }
}

impl IotMessage {
    pub fn new(topic: String, message: String) -> Self {
        IotMessage {
            topic: topic,
            message: message,
        }
    }
}

#[derive(Serialize, Deserialize, Debug)]
pub struct IotState {
    pub iot_registration_status: IotRegistrationStatus,
    pub data_encryption_config: DataEncryptionConfig,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct IotStateOption {
    iot_registration_status: Option<IotRegistrationStatus>,
    pub data_encryption_config: Option<DataEncryptionConfigOption>,
}

impl Default for IotState {
    fn default() -> IotState {
        IotState {
            iot_registration_status: IotRegistrationStatus::Registered("".to_string()),
            data_encryption_config: DataEncryptionConfig::new(),
        }
    }
}

impl IotState {
    pub fn new() -> IotState {
        Default::default()
    }

    pub fn set_iot_registration_status(&mut self, iot_registration_status: IotRegistrationStatus) {
        self.iot_registration_status = iot_registration_status;
    }
        
    pub fn get_iot_registration_status(&self) -> &IotRegistrationStatus {
        &self.iot_registration_status
    }

    /// `update` function only updates only attributes changed in the IotShadow
    pub fn update(&mut self, iot_state_option: &IotStateOption) {
        if let Some(x) = &iot_state_option.iot_registration_status {
            self.iot_registration_status = x.clone();
        }
        if let Some(x) = &iot_state_option.data_encryption_config {
            self.data_encryption_config.update(x);
        }
    }
    
    pub fn on_receive_iot_shadow(
        &mut self,
        shadow_value: &serde_json::Value,
    ) -> Result<IotRegistrationStatus, IoTError> {

        let val = match shadow_value {
            serde_json::Value::Null => {
                debug!("Empty IoT Shadow ");
                return Err::<IotRegistrationStatus, IoTError>(
                    IoTError::SerializeError
                );
            }
            _ => shadow_value.clone(),
        };

        let iso: IotStateOption = match serde_json::from_value(val) {
            Ok(i) => i,
            Err(e) => {
                debug!("Serialization error to IotState: {}", e);
                return Err(IoTError::SerializeError);
            }
        };
        self.update(&iso);

        return Ok::<IotRegistrationStatus, IoTError>(
            self.get_iot_registration_status().clone(),
        );

    }
}

async fn process_shadow_response(
    event: &rumqttc::Publish,
    shadow_descriptor: &mut DeviceShadowDescriptor,
    xmit: &mut Xmit<IotMessage>,
) -> Result<(), IoTError> /* -> Option<IotMessage> */ {
    match shadow_descriptor
        .get_shadow_ref()
        .retrieve_shadow_delivery(&event)
        .await
    {
        Ok((shadow_msg, shadow_value)) => {
            // shadow request / response processed -- send it to device adapter and return
            debug!("Shadow_msg: {}", shadow_msg);
            let iot_message: IotMessage;
            match shadow_msg {
                UPDATE_DELTA => {
                    debug!("Shadow_value: {:?}", shadow_value);
                    if shadow_value != serde_json::Value::Null {
                        // send to the task processing value of shadow
                        shadow_descriptor
                            .get_tx()
                            .send(shadow_value.clone())
                            .unwrap();
                    };
                    iot_message = shadow_descriptor
                        .get_shadow_ref()
                        .build_shadow_message_update(shadow_value.to_string(), true);

                    // send to the task responding to the shadow in the cloud
                    match xmit.send(iot_message).await {
                        Ok(_) => return Ok::<_, IoTError>(()),
                        Err(_) => return Err::<_, IoTError>(ChannelSendError),
                    }
                }
                GET_ACCEPTED => {
                    debug!("Shadow_value: {:?}", shadow_value);
                    if shadow_value != serde_json::Value::Null {
                        // send to the task processing value of shadow
                        shadow_descriptor
                            .get_tx()
                            .send(shadow_value.clone())
                            .unwrap();
                    };
                    return Ok::<_, IoTError>(());
                }
                UPDATE_ACCEPTED | UPDATE_DOCUMENTS => return Ok::<_, IoTError>(()),
                _ => {
                    debug!("UNEXPECTED Shadow Message ==> DeviceShadowError");
                    return Err::<_, IoTError>(DeviceShadowError);
                }
            };
        }
        Err(IoTError::NotDeviceShadowTopicError) => {
            // not this shadow request / response i.e. continue processing
            debug!(
                "{} for shadow: {}",
                IoTError::NotDeviceShadowTopicError,
                shadow_descriptor.get_name()
            );
            return Ok::<_, IoTError>(());
        }
        Err(e) => {
            debug!("Shadow event processing error: {}", e);
            return Ok::<_, IoTError>(());
        }
    };
}

/// The `on_iot_event` function is called, when a message is received from cloud to:
///   1. process the received shadow messages
///   2. store the shadow values
///   3. sends the responses to the cloud
///   4. initiate further processing of the received values
pub async fn on_iot_event(
    event: rumqttc::Publish,
    shadow_descriptors: &mut ShadowDescriptors,
    xmit: &mut Xmit<IotMessage>,
) -> Result<(), IoTError> {
    debug!("Received IoT event: {:?}", event);

    let event_topic = event.topic.to_string();

    if event_topic.contains(&shadow_descriptors.device.get_name()) {
        return process_shadow_response(&event, &mut shadow_descriptors.device, xmit).await;
    } else if event_topic.contains(&shadow_descriptors.iot.get_name()) {
        return process_shadow_response(&event, &mut shadow_descriptors.iot, xmit).await;
    } else {
        debug!(
            "Shadow event topic {} does not contain shadow names",
            event_topic
        );
        return Err::<_, IoTError>(DeviceShadowError);
    }
}


#[cfg(test)]
mod tests {
    use super::*;
    use crate::crypto::{DataEncryptionMethod};

    #[test]
    fn iot_registration_status_test() {
        let data = r#"
            {
                "iot_registration_status": {
                    "Registered": "98765"
                },
                "data_encryption_config": {
                    "method": "EciesSecp256k1",
                    "public_key": [
                        4,89,117,155,81,243,172,179,
                        90,195,137,53,151,179,94,29,
                        83,81,109,41,239,43,231,104,
                        14,189,163,2,229,86,3,148,
                        164,194,250,198,166,60,62,162,
                        124,188,178,137,87,61,52,245,
                        18,210,207,175,130,234,120,161,
                        45,205,156,7,34,37,164,106,
                        128
                    ]
                }
            }"#;

        let mut is = IotState::new();
        println!("IotState - NEW: {:?}", is);
        assert_eq!(is.iot_registration_status, IotRegistrationStatus::Registered("".to_string()));
        assert_eq!(is.data_encryption_config.public_key.len(), 0);

        is = serde_json::from_str(data).unwrap();
        println!("IotState - JSON loaded: {:?}", is);
        assert_eq!(is.iot_registration_status, IotRegistrationStatus::Registered("98765".to_string()));
        assert_eq!(is.data_encryption_config.public_key.len(), 65);
        assert_eq!(is.data_encryption_config.public_key[0], 4u8);
        assert_eq!(is.data_encryption_config.public_key[64], 128u8);

        is.set_iot_registration_status(IotRegistrationStatus::Registered("12345".to_string()));
        println!("IotState - set_iot_registration_status: {:?}", is);
        assert_eq!(is.iot_registration_status, IotRegistrationStatus::Registered("12345".to_string()));

        let iot_registration_status = is.get_iot_registration_status();
        println!("IotState - get_iot_registration_status: {:?}", iot_registration_status);
        assert_eq!(*iot_registration_status, IotRegistrationStatus::Registered("12345".to_string()));
    }

    use serde_json::Value;
    #[test]
    fn iot_status_update_test() {
        println!("\nTEST PART 1");
        let data = r#"
            {
                "iot_registration_status": {
                    "Registered": "98765"
                },
                "data_encryption_config": {
                    "method": "EciesSecp256k1",
                    "public_key": [
                        4,89,117,155,81,243,172,179,
                        90,195,137,53,151,179,94,29,
                        83,81,109,41,239,43,231,104,
                        14,189,163,2,229,86,3,148,
                        164,194,250,198,166,60,62,162,
                        124,188,178,137,87,61,52,245,
                        18,210,207,175,130,234,120,161,
                        45,205,156,7,34,37,164,106,
                        128
                    ]
                }
            }"#;

        let mut is = IotState::new();
        println!("IotState - NEW: {:?}", is);

        let val: Value = serde_json::from_str(data).unwrap();
        let iso: IotStateOption = serde_json::from_value(val).unwrap();

        is.update(&iso);
        println!("IotState - UPDATED: {:?}", is);
        assert_eq!(is.iot_registration_status, IotRegistrationStatus::Registered("98765".to_string()));
        assert_eq!(is.data_encryption_config.public_key.len(), 65);
        assert_eq!(is.data_encryption_config.public_key[0], 4u8);
        assert_eq!(is.data_encryption_config.public_key[64], 128u8);

        println!("\nTEST PART 2");
        let data = r#"
        {
            "data_encryption_config": {
                "method": "NoEncryption"
            }
        }"#;

        let val: Value = serde_json::from_str(data).unwrap();
        let iso: IotStateOption = serde_json::from_value(val).unwrap();

        is.update(&iso);
        println!("IotState - UPDATED: {:?}", is);
        assert_eq!(is.data_encryption_config.method, DataEncryptionMethod::NoEncryption);

    }

    #[test]
    fn on_receive_iot_shadow_test() {
        let data = r#"
        {
            "iot_registration_status": {
                "Registered": "98765"
            },
            "data_encryption_config": {
                "method": "EciesSecp256k1",
                "public_key": [
                    4,89,117,155,81,243,172,179,
                    90,195,137,53,151,179,94,29,
                    83,81,109,41,239,43,231,104,
                    14,189,163,2,229,86,3,148,
                    164,194,250,198,166,60,62,162,
                    124,188,178,137,87,61,52,245,
                    18,210,207,175,130,234,120,161,
                    45,205,156,7,34,37,164,106,
                    128
                ]
            }
        }"#;

        let mut is = IotState::new();
        println!("IotState - NEW: {:?}", is);

        println!("\nTEST PART 1");
        let val: Value = serde_json::from_str(data).unwrap();
        let res = is.on_receive_iot_shadow(&val);
        println!("IotState - UPDATED: {:?}", is);
        println!("IotState - on_receive_iot_shadow RESULT: {:?}", res);
        assert_eq!(res.unwrap(), IotRegistrationStatus::Registered("98765".to_string()));

        println!("\nTEST PART 2");
        let val: Value = serde_json::Value::Null;
        let res = is.on_receive_iot_shadow(&val);
        println!("IotState - UPDATED: {:?}", is);
        println!("IotState - on_receive_iot_shadow RESULT: {:?}", res);
        assert_eq!(res, Err(IoTError::SerializeError));

    }
}