use crate::config::Config;
use crate::connector_aws::{IotMessage, IotState};
use crate::error::IoTError;
use crate::crypto::{DataEncryptionMethod};
use chrono::Utc;
use log::debug;
use serde::{Deserialize, Serialize};
use serde_json::{json, Map, Value};
use std::collections::HashMap;
use std::sync::Arc;
use uuid::Uuid;
#[derive(Debug)]
pub enum FilterType {
Passthrough,
Whitelist,
}
impl std::fmt::Display for FilterType {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
FilterType::Passthrough => write!(f, "PASSTHROUGH"),
FilterType::Whitelist => write!(f, "WHITELIST"),
}
}
}
#[derive(Deserialize, Serialize, Debug, PartialEq, Clone)]
pub enum SourceIdType {
DeviceNoLeadZeros,
Device,
Config,
}
impl std::fmt::Display for SourceIdType {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
SourceIdType::DeviceNoLeadZeros => write!(f, "DEVICE_NO_LEAD_ZEROS"),
SourceIdType::Device => write!(f, "DEVICE"),
SourceIdType::Config => write!(f, "CONFIG"),
}
}
}
#[derive(Deserialize, Serialize, Debug)]
pub struct DeviceStateConfig {
event_type_mapping: HashMap<String, String>,
event_filter: EventFilter,
source_id_type: SourceIdType,
}
#[derive(Deserialize, Debug)]
struct DeviceStateConfigOption {
event_type_mapping: Option<HashMap<String, String>>,
event_filter: Option<EventFilterOption>,
source_id_type: Option<SourceIdType>,
}
impl Default for DeviceStateConfig {
fn default() -> DeviceStateConfig {
let default_event_map: HashMap<String, String> = [
]
.into_iter()
.collect();
let filter = EventFilter::new();
DeviceStateConfig {
event_type_mapping: default_event_map,
event_filter: filter,
source_id_type: SourceIdType::Config,
}
}
}
impl DeviceStateConfig {
pub fn new() -> DeviceStateConfig {
Default::default()
}
fn update(&mut self, device_state_config_option: &DeviceStateConfigOption) {
if let Some(x) = &device_state_config_option.event_type_mapping {
self.event_type_mapping = x.clone();
}
if let Some(x) = &device_state_config_option.event_filter {
self.event_filter.update(&x);
}
if let Some(x) = &device_state_config_option.source_id_type {
self.source_id_type = x.clone();
}
}
fn map_event_type(&self, topic: String) -> String {
match self.event_type_mapping.get(&topic) {
Some(t) => return t.to_string(),
None => return topic,
}
}
pub fn on_receive_device_shadow(
&mut self,
shadow_value: &serde_json::Value,
) -> Result<(), IoTError> {
let val = match shadow_value {
serde_json::Value::Null => {
debug!("Empty branch of Device Shadow ");
return Ok(());
}
_ => shadow_value.clone(),
};
let ds: DeviceStateConfigOption = match serde_json::from_value(val) {
Ok(d) => d,
Err(e) => {
debug!("Serialization error to Device State: {}", e);
return Err(IoTError::DeviceAdapterError);
}
};
self.update(&ds);
Ok(())
}
}
#[derive(Deserialize, Serialize, Debug)]
pub struct DeviceState {
config: DeviceStateConfig,
}
impl Default for DeviceState {
fn default() -> DeviceState {
let config = DeviceStateConfig::new();
DeviceState { config: config }
}
}
impl DeviceState {
pub fn new() -> DeviceState {
Default::default()
}
pub fn get_config(&self) -> &DeviceStateConfig {
&self.config
}
fn map_event_type(&self, topic: String) -> String {
self.config.map_event_type(topic)
}
pub fn on_receive_device_shadow(
&mut self,
shadow_value: &serde_json::Value,
) -> Result<(), IoTError> {
self.config
.on_receive_device_shadow(&shadow_value["config"])
}
}
#[derive(Deserialize, Serialize, Debug)]
pub struct EventFilter {
filter_type: String,
whitelist: Vec<String>,
}
#[derive(Deserialize, Debug)]
struct EventFilterOption {
filter_type: Option<String>,
whitelist: Option<Vec<String>>,
}
impl Default for EventFilter {
fn default() -> EventFilter {
let default_event_whitelist: Vec<String> = [
]
.into_iter()
.collect();
EventFilter {
filter_type: FilterType::Passthrough.to_string(),
whitelist: default_event_whitelist,
}
}
}
impl EventFilter {
pub fn new() -> EventFilter {
Default::default()
}
fn update(&mut self, event_filter_option: &EventFilterOption) {
if let Some(x) = &event_filter_option.filter_type {
self.filter_type = x.clone();
}
if let Some(x) = &event_filter_option.whitelist {
self.whitelist = x.clone();
}
}
pub fn filter(&self, topic: String) -> Option<String> {
if self.filter_type == FilterType::Passthrough.to_string() {
return Some(topic);
} else {
if self.whitelist.contains(&topic.to_string()) {
return Some(topic.to_string());
} else {
debug!("Filtered OUT topic: {}", topic);
return None;
};
}
}
pub fn enable_whitelist_filtering(&mut self) {
self.filter_type = FilterType::Whitelist.to_string();
}
pub fn disable_filtering(&mut self) {
self.filter_type = FilterType::Passthrough.to_string();
}
}
fn create_device_iot_message(
event_type: &String,
payload: String,
config: &Arc<Config>,
device_state: &DeviceState,
iot_state: &IotState,
) -> Option<IotMessage> {
let ser_num: String;
if device_state.get_config().source_id_type == SourceIdType::Config {
ser_num = config.device.instrument_serial_number.clone();
} else {
ser_num = match serde_json::from_str::<Map<String, Value>>(&payload) {
Ok(hm_payload) => {
let id_str = hm_payload
.get("origin")
.unwrap()
.to_string()
.replace("\"", "");
let id_vec: Vec<&str> = id_str.split(".").collect();
let sn = match device_state.get_config().source_id_type {
SourceIdType::DeviceNoLeadZeros => id_vec[1].trim_start_matches('0'),
SourceIdType::Device => id_vec[1],
SourceIdType::Config => "ERROR",
};
sn.to_string()
}
Err(_) => return None,
}
};
let rudi = format!(
"{}{}{}{}",
"urn:rudi:GTIN^", config.device.rudi_gtin, "^", ser_num
);
let current_time = Utc::now();
let (payload, encrypted) = match iot_state.data_encryption_config.method {
DataEncryptionMethod::NoEncryption => (payload, false),
DataEncryptionMethod::EciesSecp256k1 =>
match iot_state.data_encryption_config.encrypt_b64(payload) {
Ok(ciphertext) => (ciphertext, true),
Err(e) => {
let error_info = format!("{} : {}", IoTError::ECIESError, e);
debug!("{}", error_info);
(error_info, true)
}
}
};
let standardized_event: serde_json::Value = json!({
"id": Uuid::new_v4().to_string(),
"source": rudi.to_string(),
"specversion": config.device.spec_version.to_string(),
"type": event_type.to_string(),
"time": current_time.to_rfc3339().to_string(),
"privacyrelevant": config.device.privacy,
"confidentialityrelevant": encrypted,
"data": payload,
"correlation": [
json!({"gtin": config.device.rudi_gtin.to_string()}),
json!({"ref": config.device.rudi_ref.to_string()}),
json!({"instrumenttype": config.device.instrument_type.to_string()}),
json!({"instrumentname": config.device.instrument_name.to_string()}),
json!({"serialnumber": ser_num.to_string()}),
]
});
Some(IotMessage {
topic: format!("{}{}", config.iot.iot_topic_prefix, event_type),
message: standardized_event.to_string(),
})
}
pub fn on_device_event(
event: rumqttc::Publish,
config: &Arc<Config>,
device_state: &DeviceState,
iot_state: &IotState,
) -> Option<IotMessage> {
let stripped_topic: String = match event
.topic
.to_string()
.strip_prefix(config.device.device_topic_prefix.as_str())
{
Some(t) => t.to_string(),
None => event.topic.to_string(),
};
let mut event_type = device_state.map_event_type(stripped_topic);
event_type = match device_state.get_config().event_filter.filter(event_type) {
Some(t) => t,
None => return None,
};
let payload = String::from_utf8(event.payload.to_vec()).unwrap();
let iot_message = create_device_iot_message(&event_type, payload, config, device_state, iot_state)?;
return Some(iot_message);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn on_receive_device_shadow_test() {
let mut device_state = DeviceState::new();
assert!(device_state
.get_config()
.event_filter
.filter_type
.contains(&"PASSTHROUGH".to_string()));
println!("\nDeviceState DEFAULT: {:?}", device_state);
let data = r#"
{
"state": {
"reported": {},
"desired": {
"config": {
"event_type_mapping": {
"ProcessExecution/Run/Started": "ProcessExecution/Run/StartedAsRocket"
},
"event_filter": {
"filter_type": "WHITELIST",
"whitelist": [
"ProcessExecution/Run/Started",
"ProcessExecution/Run/Finished",
"ProcessExecution/Run/Aborted",
"Infrastructure/AuditableEventOccurred",
"InstrumentState/InstrumentStateChanged"
]
},
"source_id_type": "Config"
}
}
}
}"#;
let v: Value = serde_json::from_str(data).unwrap();
match device_state.on_receive_device_shadow(&v["state"]["desired"]) {
Ok(_) => {
println!("\nDeviceState updated: {:?}", device_state);
assert!(device_state
.get_config()
.event_filter
.whitelist
.contains(&"InstrumentState/InstrumentStateChanged".to_string()));
}
Err(err) => {
println!("{}", err);
assert!(false);
}
}
}
#[test]
fn filter_test() {
let device_state = DeviceStateConfig {
event_type_mapping: [(
"Infrastructure/SystemInformation/SystemInformationAvailable".to_string(),
"Infrastructure/SystemInformation".to_string(),
)]
.into_iter()
.collect(),
event_filter: EventFilter {
filter_type: FilterType::Whitelist.to_string(),
whitelist: [
"ProcessExecution/Run/Started",
"ProcessExecution/Run/Finished",
"ProcessExecution/Run/Aborted",
"Infrastructure/AuditableEventOccurred",
"InstrumentState/InstrumentStateChanged",
]
.iter()
.map(|s| s.to_string())
.collect(),
},
source_id_type: SourceIdType::Config,
};
if let Some(t) = device_state
.event_filter
.filter("Infrastructure/AuditableEventOccurred".to_string())
{
assert_eq!("Infrastructure/AuditableEventOccurred", t.to_string());
println!("\nFOUND as expected");
};
if let Some(_) = device_state
.event_filter
.filter("InstrumentState/SOMETHING".to_string())
{
assert!(false);
println!("\nFOUND as NOT expected");
} else {
assert!(true);
println!("\nNOT FOUND as expected");
};
println!("\nDeviceState use in FILTER: {:?}", device_state);
}
#[test]
fn map_event_type_test() {
let device_state = DeviceStateConfig {
event_type_mapping: [(
"Infrastructure/SystemInformation/SystemInformationAvailable".to_string(),
"Infrastructure/SystemInformation".to_string(),
)]
.into_iter()
.collect(),
event_filter: EventFilter {
filter_type: FilterType::Whitelist.to_string(),
whitelist: [
"ProcessExecution/Run/Started",
"ProcessExecution/Run/Finished",
"ProcessExecution/Run/Aborted",
"Infrastructure/AuditableEventOccurred",
"InstrumentState/InstrumentStateChanged",
]
.iter()
.map(|s| s.to_string())
.collect(),
},
source_id_type: SourceIdType::Config,
};
let t1 = device_state.map_event_type(
"Infrastructure/SystemInformation/SystemInformationAvailable".to_string(),
);
assert_eq!("Infrastructure/SystemInformation", t1.to_string());
println!("\nMAPPED FOUND as expected");
let t2 = device_state.map_event_type("InstrumentState/InstrumentStateChanged".to_string());
assert_eq!("InstrumentState/InstrumentStateChanged", t2.to_string());
println!("\nMAPPED NOT FOUND as expected");
println!("\nDeviceState use in MAPPING {:?}", device_state);
}
use crate::connector_aws::{IotStateOption};
#[test]
fn create_device_iot_message_source_id_test() {
let iot_state = IotState::new();
let mut device_state = DeviceState::new();
let event_type = "ProcessExecution/Run/Finished".to_string();
let payload = "{\"schema\":\"http://digitaldevice.link/rmd/X320/messages/RunFinished.json\",\"origin\":\"POKE.001234\",\"timestamp\":\"2020-03-09T12:37:27.925308200Z\",\"runId\":\"1-20200301-1609\"}".to_string();
use std::{thread, time};
let duration = time::Duration::from_secs(1);
let config: Arc<Config>;
for n in 0..3 {
match Config::get_config_from_yaml() {
Ok(config_data) => {
config = Arc::new(config_data);
device_state.config.source_id_type = SourceIdType::Config;
let rudi = format!(
"{}{}{}{}",
"urn:rudi:GTIN^",
config.device.rudi_gtin,
"^",
config.device.instrument_serial_number
);
println!("\nTEST PART 1 - Config based rudi: {}", rudi);
match create_device_iot_message(
&event_type,
payload.clone(),
&config,
&device_state,
&iot_state,
) {
Some(t) => {
println!("IoT message content: {}", t);
assert_eq!(
t.topic,
format!("{}{}", config.iot.iot_topic_prefix, event_type)
); assert!(t.message.contains(&rudi));
}
None => {
debug!("Error in `create_device_iot_message`")
}
}
device_state.config.source_id_type = SourceIdType::Device;
let rudi = format!(
"{}{}{}",
"urn:rudi:GTIN^", config.device.rudi_gtin, "^001234"
);
println!("\nTEST PART 2 - Device based rudi: {}", rudi);
match create_device_iot_message(
&event_type,
payload.clone(),
&config,
&device_state,
&iot_state,
) {
Some(t) => {
println!("IoT message content: {}", t);
assert_eq!(
t.topic,
config.iot.iot_topic_prefix.to_string() + &event_type
); assert!(t.message.contains(&rudi));
}
None => {
debug!("Error in `create_device_iot_message`")
}
}
device_state.config.source_id_type = SourceIdType::DeviceNoLeadZeros;
let rudi =
format!("{}{}{}", "urn:rudi:GTIN^", config.device.rudi_gtin, "^1234");
println!("\nTEST PART 3 - DeviceNoLeadZeros based rudi: {}", rudi);
match create_device_iot_message(
&event_type,
payload,
&config,
&device_state,
&iot_state,
) {
Some(t) => {
println!("IoT message content: {}", t);
assert_eq!(
t.topic,
(vec![config.iot.iot_topic_prefix.as_str(), event_type.as_str()])
.concat()
); assert!(t.message.contains(&rudi));
}
None => {
debug!("Error in `create_device_iot_message`")
}
}
return;
}
Err(error) => {
if n < 2 {
thread::sleep(duration);
return;
} else {
panic!("Problem reading \"config.yaml\": {}", error);
}
}
}
}
}
#[test]
fn create_device_iot_message_encryption_test() {
let data = r#"
{
"iot_registration_status": {
"Registered": "98765"
},
"data_encryption_config": {
"method": "EciesSecp256k1",
"public_key": [
4,89,117,155,81,243,172,179,
90,195,137,53,151,179,94,29,
83,81,109,41,239,43,231,104,
14,189,163,2,229,86,3,148,
164,194,250,198,166,60,62,162,
124,188,178,137,87,61,52,245,
18,210,207,175,130,234,120,161,
45,205,156,7,34,37,164,106,
128
]
}
}"#;
let mut iot_state = IotState::new();
let device_state = DeviceState::new();
let event_type = "ProcessExecution/Run/Finished".to_string();
let payload = "{\"schema\":\"http://digitaldevice.link/rmd/X320/messages/RunFinished.json\",\"origin\":\"POKE.001234\",\"timestamp\":\"2020-03-09T12:37:27.925308200Z\",\"runId\":\"1-20200301-1609\"}".to_string();
use std::{thread, time};
let duration = time::Duration::from_secs(1);
let config: Arc<Config>;
for n in 0..3 {
match Config::get_config_from_yaml() {
Ok(config_data) => {
config = Arc::new(config_data);
println!("\nTEST PART 1 - NO Encryption");
match create_device_iot_message(
&event_type,
payload.clone(),
&config,
&device_state,
&iot_state,
) {
Some(t) => {
println!("IoT message content: {}", t);
assert_eq!(
t.topic,
(vec![config.iot.iot_topic_prefix.as_str(), event_type.as_str()])
.concat()
);
assert!(t.message.contains("RunFinished.json"));
}
None => {
debug!("Error in `create_device_iot_message`")
}
}
println!("\nTEST PART 2 - Encryption");
let val: Value = serde_json::from_str(data).unwrap();
let iot_state_option: IotStateOption = serde_json::from_value(val).unwrap();
iot_state.update(&iot_state_option);
match create_device_iot_message(
&event_type,
payload.clone(),
&config,
&device_state,
&iot_state,
) {
Some(t) => {
println!("IoT message content: {}", t);
assert_eq!(
t.topic,
(vec![config.iot.iot_topic_prefix.as_str(), event_type.as_str()])
.concat()
);
assert!(!t.message.contains("RunFinished.json"));
}
None => {
debug!("Error in `create_device_iot_message`")
}
}
return;
},
Err(error) => {
if n < 2 {
thread::sleep(duration);
return;
} else {
panic!("Problem reading \"config.yaml\": {}", error);
}
}
}
}
}
}