use crate::config::{IotRegistrationStatus};
use crate::device_shadow::{
DeviceShadowDescriptor, ShadowDescriptors, GET_ACCEPTED, UPDATE_ACCEPTED, UPDATE_DELTA,
UPDATE_DOCUMENTS,
};
use crate::error::IoTError::{self, ChannelSendError, DeviceShadowError};
use crate::crypto::{DataEncryptionConfig, DataEncryptionConfigOption};
use tokio::sync::mpsc::Sender as Xmit;
use log::debug;
use serde::{self, Deserialize, Serialize};
#[derive(Debug)]
pub struct IotMessage {
pub topic: String,
pub message: String,
}
impl std::fmt::Display for IotMessage {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "topic: {} message: {}", self.topic, self.message)
}
}
impl IotMessage {
pub fn new(topic: String, message: String) -> Self {
IotMessage {
topic: topic,
message: message,
}
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct IotState {
pub iot_registration_status: IotRegistrationStatus,
pub data_encryption_config: DataEncryptionConfig,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct IotStateOption {
iot_registration_status: Option<IotRegistrationStatus>,
pub data_encryption_config: Option<DataEncryptionConfigOption>,
}
impl Default for IotState {
fn default() -> IotState {
IotState {
iot_registration_status: IotRegistrationStatus::Registered("".to_string()),
data_encryption_config: DataEncryptionConfig::new(),
}
}
}
impl IotState {
pub fn new() -> IotState {
Default::default()
}
pub fn set_iot_registration_status(&mut self, iot_registration_status: IotRegistrationStatus) {
self.iot_registration_status = iot_registration_status;
}
pub fn get_iot_registration_status(&self) -> &IotRegistrationStatus {
&self.iot_registration_status
}
pub fn update(&mut self, iot_state_option: &IotStateOption) {
if let Some(x) = &iot_state_option.iot_registration_status {
self.iot_registration_status = x.clone();
}
if let Some(x) = &iot_state_option.data_encryption_config {
self.data_encryption_config.update(x);
}
}
pub fn on_receive_iot_shadow(
&mut self,
shadow_value: &serde_json::Value,
) -> Result<IotRegistrationStatus, IoTError> {
let val = match shadow_value {
serde_json::Value::Null => {
debug!("Empty IoT Shadow ");
return Err::<IotRegistrationStatus, IoTError>(
IoTError::SerializeError
);
}
_ => shadow_value.clone(),
};
let iso: IotStateOption = match serde_json::from_value(val) {
Ok(i) => i,
Err(e) => {
debug!("Serialization error to IotState: {}", e);
return Err(IoTError::SerializeError);
}
};
self.update(&iso);
return Ok::<IotRegistrationStatus, IoTError>(
self.get_iot_registration_status().clone(),
);
}
}
async fn process_shadow_response(
event: &rumqttc::Publish,
shadow_descriptor: &mut DeviceShadowDescriptor,
xmit: &mut Xmit<IotMessage>,
) -> Result<(), IoTError> {
match shadow_descriptor
.get_shadow_ref()
.retrieve_shadow_delivery(&event)
.await
{
Ok((shadow_msg, shadow_value)) => {
debug!("Shadow_msg: {}", shadow_msg);
let iot_message: IotMessage;
match shadow_msg {
UPDATE_DELTA => {
debug!("Shadow_value: {:?}", shadow_value);
if shadow_value != serde_json::Value::Null {
shadow_descriptor
.get_tx()
.send(shadow_value.clone())
.unwrap();
};
iot_message = shadow_descriptor
.get_shadow_ref()
.build_shadow_message_update(shadow_value.to_string(), true);
match xmit.send(iot_message).await {
Ok(_) => return Ok::<_, IoTError>(()),
Err(_) => return Err::<_, IoTError>(ChannelSendError),
}
}
GET_ACCEPTED => {
debug!("Shadow_value: {:?}", shadow_value);
if shadow_value != serde_json::Value::Null {
shadow_descriptor
.get_tx()
.send(shadow_value.clone())
.unwrap();
};
return Ok::<_, IoTError>(());
}
UPDATE_ACCEPTED | UPDATE_DOCUMENTS => return Ok::<_, IoTError>(()),
_ => {
debug!("UNEXPECTED Shadow Message ==> DeviceShadowError");
return Err::<_, IoTError>(DeviceShadowError);
}
};
}
Err(IoTError::NotDeviceShadowTopicError) => {
debug!(
"{} for shadow: {}",
IoTError::NotDeviceShadowTopicError,
shadow_descriptor.get_name()
);
return Ok::<_, IoTError>(());
}
Err(e) => {
debug!("Shadow event processing error: {}", e);
return Ok::<_, IoTError>(());
}
};
}
pub async fn on_iot_event(
event: rumqttc::Publish,
shadow_descriptors: &mut ShadowDescriptors,
xmit: &mut Xmit<IotMessage>,
) -> Result<(), IoTError> {
debug!("Received IoT event: {:?}", event);
let event_topic = event.topic.to_string();
if event_topic.contains(&shadow_descriptors.device.get_name()) {
return process_shadow_response(&event, &mut shadow_descriptors.device, xmit).await;
} else if event_topic.contains(&shadow_descriptors.iot.get_name()) {
return process_shadow_response(&event, &mut shadow_descriptors.iot, xmit).await;
} else {
debug!(
"Shadow event topic {} does not contain shadow names",
event_topic
);
return Err::<_, IoTError>(DeviceShadowError);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::crypto::{DataEncryptionMethod};
#[test]
fn iot_registration_status_test() {
let data = r#"
{
"iot_registration_status": {
"Registered": "98765"
},
"data_encryption_config": {
"method": "EciesSecp256k1",
"public_key": [
4,89,117,155,81,243,172,179,
90,195,137,53,151,179,94,29,
83,81,109,41,239,43,231,104,
14,189,163,2,229,86,3,148,
164,194,250,198,166,60,62,162,
124,188,178,137,87,61,52,245,
18,210,207,175,130,234,120,161,
45,205,156,7,34,37,164,106,
128
]
}
}"#;
let mut is = IotState::new();
println!("IotState - NEW: {:?}", is);
assert_eq!(is.iot_registration_status, IotRegistrationStatus::Registered("".to_string()));
assert_eq!(is.data_encryption_config.public_key.len(), 0);
is = serde_json::from_str(data).unwrap();
println!("IotState - JSON loaded: {:?}", is);
assert_eq!(is.iot_registration_status, IotRegistrationStatus::Registered("98765".to_string()));
assert_eq!(is.data_encryption_config.public_key.len(), 65);
assert_eq!(is.data_encryption_config.public_key[0], 4u8);
assert_eq!(is.data_encryption_config.public_key[64], 128u8);
is.set_iot_registration_status(IotRegistrationStatus::Registered("12345".to_string()));
println!("IotState - set_iot_registration_status: {:?}", is);
assert_eq!(is.iot_registration_status, IotRegistrationStatus::Registered("12345".to_string()));
let iot_registration_status = is.get_iot_registration_status();
println!("IotState - get_iot_registration_status: {:?}", iot_registration_status);
assert_eq!(*iot_registration_status, IotRegistrationStatus::Registered("12345".to_string()));
}
use serde_json::Value;
#[test]
fn iot_status_update_test() {
println!("\nTEST PART 1");
let data = r#"
{
"iot_registration_status": {
"Registered": "98765"
},
"data_encryption_config": {
"method": "EciesSecp256k1",
"public_key": [
4,89,117,155,81,243,172,179,
90,195,137,53,151,179,94,29,
83,81,109,41,239,43,231,104,
14,189,163,2,229,86,3,148,
164,194,250,198,166,60,62,162,
124,188,178,137,87,61,52,245,
18,210,207,175,130,234,120,161,
45,205,156,7,34,37,164,106,
128
]
}
}"#;
let mut is = IotState::new();
println!("IotState - NEW: {:?}", is);
let val: Value = serde_json::from_str(data).unwrap();
let iso: IotStateOption = serde_json::from_value(val).unwrap();
is.update(&iso);
println!("IotState - UPDATED: {:?}", is);
assert_eq!(is.iot_registration_status, IotRegistrationStatus::Registered("98765".to_string()));
assert_eq!(is.data_encryption_config.public_key.len(), 65);
assert_eq!(is.data_encryption_config.public_key[0], 4u8);
assert_eq!(is.data_encryption_config.public_key[64], 128u8);
println!("\nTEST PART 2");
let data = r#"
{
"data_encryption_config": {
"method": "NoEncryption"
}
}"#;
let val: Value = serde_json::from_str(data).unwrap();
let iso: IotStateOption = serde_json::from_value(val).unwrap();
is.update(&iso);
println!("IotState - UPDATED: {:?}", is);
assert_eq!(is.data_encryption_config.method, DataEncryptionMethod::NoEncryption);
}
#[test]
fn on_receive_iot_shadow_test() {
let data = r#"
{
"iot_registration_status": {
"Registered": "98765"
},
"data_encryption_config": {
"method": "EciesSecp256k1",
"public_key": [
4,89,117,155,81,243,172,179,
90,195,137,53,151,179,94,29,
83,81,109,41,239,43,231,104,
14,189,163,2,229,86,3,148,
164,194,250,198,166,60,62,162,
124,188,178,137,87,61,52,245,
18,210,207,175,130,234,120,161,
45,205,156,7,34,37,164,106,
128
]
}
}"#;
let mut is = IotState::new();
println!("IotState - NEW: {:?}", is);
println!("\nTEST PART 1");
let val: Value = serde_json::from_str(data).unwrap();
let res = is.on_receive_iot_shadow(&val);
println!("IotState - UPDATED: {:?}", is);
println!("IotState - on_receive_iot_shadow RESULT: {:?}", res);
assert_eq!(res.unwrap(), IotRegistrationStatus::Registered("98765".to_string()));
println!("\nTEST PART 2");
let val: Value = serde_json::Value::Null;
let res = is.on_receive_iot_shadow(&val);
println!("IotState - UPDATED: {:?}", is);
println!("IotState - on_receive_iot_shadow RESULT: {:?}", res);
assert_eq!(res, Err(IoTError::SerializeError));
}
}