use crate::connector_aws::{self, IotMessage};
use crate::error::IoTError::{self, AWSResponseError, ChannelSendError};
use log::debug;
use rumqttc::{self, Event, EventLoop, Packet, QoS, Request, Sender, Subscribe, SubscribeFilter};
use serde_json::Value;
use tokio::sync::broadcast::Sender as Tx;
use tokio::sync::mpsc::Sender as Xmit;
use tokio::sync::RwLock;
pub const UPDATE_ACCEPTED: &str = "update/accepted";
const UPDATE_REJECTED: &str = "update/rejected";
pub const UPDATE_DELTA: &str = "update/delta";
pub const UPDATE_DOCUMENTS: &str = "update/documents";
pub const GET_ACCEPTED: &str = "get/accepted";
const GET_REJECTED: &str = "get/rejected";
const DELETE_ACCEPTED: &str = "delete/accepted";
const DELETE_REJECTED: &str = "delete/rejected";
pub const GET: &str = "get";
pub const UPDATE: &str = "update";
pub const SHADOW_TOPICS_NUM: usize = 8;
const SHADOW_SUB_TOPICS: [&'static str; SHADOW_TOPICS_NUM] = [
UPDATE_ACCEPTED,
UPDATE_REJECTED,
UPDATE_DELTA,
UPDATE_DOCUMENTS,
GET_ACCEPTED,
GET_REJECTED,
DELETE_ACCEPTED,
DELETE_REJECTED,
];
const TOPIC_STRING_CAPACITY: usize = 100;
#[derive(Debug)]
pub struct ShadowDescriptors {
pub device: DeviceShadowDescriptor,
pub iot: DeviceShadowDescriptor,
}
impl ShadowDescriptors {
pub fn new(device_descr: DeviceShadowDescriptor, iot_descr: DeviceShadowDescriptor) -> Self {
ShadowDescriptors {
device: device_descr,
iot: iot_descr,
}
}
}
#[derive(Debug)]
pub struct DeviceShadowDescriptor {
name: String,
shadow: DeviceShadow,
tx: Tx<serde_json::Value>,
}
impl DeviceShadowDescriptor {
pub fn new(name: String, shadow: DeviceShadow, tx: Tx<serde_json::Value>) -> Self {
DeviceShadowDescriptor {
name: name,
shadow: shadow,
tx: tx,
}
}
pub fn get_name(&self) -> String {
self.name.clone()
}
pub fn get_shadow_ref(&mut self) -> &mut DeviceShadow {
&mut self.shadow
}
pub fn get_tx(&self) -> Tx<serde_json::Value> {
self.tx.clone()
}
}
#[derive(Debug, Clone)]
pub struct ShadowTopic {
prefix: String,
name: String,
}
impl ShadowTopic {
fn new(thing_name: String, shadow_type: ShadowType) -> Self {
let (shadow_topic_prefix, shadow_name) = match shadow_type {
ShadowType::Classic => (format!("$aws/things/{}/shadow/", thing_name), String::new()),
ShadowType::Named(shadow_name) => (
format!("$aws/things/{}/shadow/name/{}/", thing_name, shadow_name),
shadow_name,
),
};
let mut prefix = String::with_capacity(TOPIC_STRING_CAPACITY);
prefix.push_str(&shadow_topic_prefix);
return ShadowTopic {
prefix: prefix,
name: shadow_name,
};
}
pub fn build(&self, postfix: &str) -> String {
let mut full_topic = self.prefix.to_string();
full_topic.push_str(postfix);
full_topic
}
fn get_prefix(&self) -> String {
return self.prefix.clone();
}
fn get_name(&self) -> String {
return self.name.clone();
}
}
pub enum ShadowType {
Classic,
Named(String),
}
impl std::fmt::Display for ShadowType {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
ShadowType::Classic => write!(f, "CLASSIC"),
ShadowType::Named(t) => write!(f, "NAMED:{}", t),
}
}
}
#[derive(Debug)]
pub struct DeviceShadow {
shadow_value: RwLock<serde_json::Value>,
eventloop_handle: Sender<Request>,
shadow_topic: ShadowTopic,
}
impl DeviceShadow {
pub fn new(
thing_name: String,
shadow_type: ShadowType,
eventloop_handle: Sender<Request>,
) -> Self {
let shadow_topic = ShadowTopic::new(thing_name, shadow_type);
let device_shadow = DeviceShadow {
shadow_value: RwLock::new(serde_json::Value::Null),
eventloop_handle: eventloop_handle,
shadow_topic: shadow_topic.clone(),
};
device_shadow
}
pub fn get_name(&self) -> String {
return self.shadow_topic.get_name();
}
pub fn get_prefix(&self) -> String {
return self.shadow_topic.get_prefix();
}
pub fn get_shadow_topic(&self) -> ShadowTopic {
return self.shadow_topic.clone();
}
pub fn get_shadow_name_from_aws_topic(&self, aws_topic: &String) -> String {
let v: Vec<&str> = aws_topic.split('/').collect();
return v[5].to_string();
}
pub fn build_shadow_message_update(
&self,
shadow_value: String,
reported_only: bool,
) -> IotMessage {
IotMessage::new(
self.get_shadow_topic().build(UPDATE).to_string(),
build_shadow_message_payload(shadow_value, reported_only),
)
}
async fn put_local_shadow(&self, value: &serde_json::Value) {
let mut shadow_value = self.shadow_value.write().await;
*shadow_value = value.clone();
}
pub async fn retrieve_shadow_delivery(
&mut self,
event: &rumqttc::Publish,
) -> Result<(&str, Value), IoTError> {
let prefix = self.get_prefix();
let topic = event.topic.to_string();
let shadow_payload: Value = serde_json::from_slice(&(event.payload.clone())).unwrap();
let mut shadow_value: Value = serde_json::Value::Null;
let shadow_msg: &str;
match topic.strip_prefix(&prefix) {
Some(UPDATE_ACCEPTED) => {
debug!("Shadow Received: UPDATE ACCEPTED");
self.put_local_shadow(&shadow_payload).await;
(shadow_msg, shadow_value) = (
UPDATE_ACCEPTED,
(shadow_payload["state"]["desired"]).clone(),
);
}
Some(UPDATE_REJECTED) => {
debug!("Shadow Received: UPDATE Rejected");
shadow_msg = UPDATE_REJECTED
}
Some(UPDATE_DELTA) => {
debug!("Shadow Received: UPDATE DELTA");
(shadow_msg, shadow_value) = (UPDATE_DELTA, (shadow_payload["state"]).clone());
}
Some(UPDATE_DOCUMENTS) => {
debug!("Shadow Received: UPDATE DOCUMENTS");
self.put_local_shadow(&shadow_payload).await;
(shadow_msg, shadow_value) = (
UPDATE_DOCUMENTS,
(shadow_payload["current"]["state"]["desired"]).clone(),
);
}
Some(GET_ACCEPTED) => {
debug!("Shadow Received: GET ACCEPTED");
self.put_local_shadow(&shadow_payload).await;
(shadow_msg, shadow_value) =
(GET_ACCEPTED, (shadow_payload["state"]["desired"]).clone());
}
Some(GET_REJECTED) => {
debug!("Shadow Received: GET Rejected");
shadow_msg = GET_REJECTED
}
Some(DELETE_ACCEPTED) => {
debug!("Shadow Received: DELETE Rejected");
self.put_local_shadow(&serde_json::Value::Null).await;
shadow_msg = DELETE_ACCEPTED
}
Some(DELETE_REJECTED) => {
debug!("Shadow Received: DELETE Rejected");
shadow_msg = DELETE_REJECTED
}
Some(_) => {
debug!("Not DeviceShadow topic: {}", topic);
return Err(IoTError::NotDeviceShadowTopicError);
}
None => {
debug!("Prefix not found: {}", prefix);
return Err(IoTError::DeviceShadowError);
}
};
Ok((shadow_msg, shadow_value))
}
pub async fn initiate_shadow(
&self,
eventloop: &mut EventLoop,
xmit: Xmit<IotMessage>,
shadow_initial_update: String,
) -> Result<(), IoTError> {
let mut sub_topics: Vec<SubscribeFilter> = vec![];
for topic in SHADOW_SUB_TOPICS.iter() {
sub_topics.push(SubscribeFilter::new(
self.shadow_topic.build(topic).to_string(),
QoS::AtMostOnce,
));
}
let topic_list = Request::Subscribe(Subscribe::new_many(sub_topics)?);
self.eventloop_handle.send_async(topic_list).await.unwrap();
loop {
match eventloop.poll().await? {
Event::Incoming(event) => {
match event {
Packet::SubAck(r) => {
debug!("SubAck event on iot_receiver: {:?}", r);
if r.return_codes.len() == SHADOW_TOPICS_NUM {
if !shadow_initial_update.is_empty() {
let update_remote_shadow_msg: IotMessage =
connector_aws::IotMessage::new(
self.shadow_topic.build(UPDATE).to_string(),
shadow_initial_update,
);
match xmit.send(update_remote_shadow_msg).await {
Ok(_) => return Ok::<_, IoTError>(()),
Err(_) => return Err::<_, IoTError>(ChannelSendError),
}
}
let get_remote_shadow_msg: IotMessage =
connector_aws::IotMessage::new(
self.shadow_topic.build(GET).to_string(),
serde_json::Value::Null.to_string(),
);
match xmit.send(get_remote_shadow_msg).await {
Ok(_) => return Ok::<_, IoTError>(()),
Err(_) => return Err::<_, IoTError>(ChannelSendError),
}
} else {
return Err::<_, IoTError>(AWSResponseError);
}
}
_ => {
debug!("SHADOW: Got event on iot_receiver: {:?}", event);
}
}
}
_ => (),
}
}
}
}
pub fn build_shadow_message_payload(shadow_value: String, reported_only: bool) -> String {
return match reported_only {
true => format!("{{\"state\":{{\"reported\":{}}}}}", shadow_value), false => format!(
"{{\"state\":{{\"desired\":{},\"reported\":{}}}}}",
shadow_value, shadow_value
),
};
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::CONFIG_DIRNAME;
use crate::mqtt_client;
use find_folder::Search;
use std::env;
fn find_config_dir() -> String {
let mut exe_folder = env::current_exe().unwrap();
println!("EXE_FOLDER: {:#?}", exe_folder);
exe_folder.pop(); let pb: std::path::PathBuf = Search::ParentsThenKids(5, 5)
.of(exe_folder)
.for_folder(CONFIG_DIRNAME)
.expect("Config directory not found");
return pb.into_os_string().into_string().unwrap();
}
#[tokio::test]
async fn get_shadow_elements_test() {
let thing_name: String = "iot_client_id".to_string();
let shadow_name: String = "iot_shadow".to_string();
let aws_topic: String = format!("$aws/things/{}/shadow/name/{}/", thing_name, shadow_name);
let config_dir: String = find_config_dir();
let aws_settings = mqtt_client::ConnectionSettings::new_tls(
thing_name.clone(),
"ENDPOINTID-ats.iot.eu-central-1.amazonaws.com".to_string(),
8883,
format!("{}{}", config_dir, "/certs/AmazonRootCA1.pem"),
format!("{}{}", config_dir, "/certs/IotCertificate.pem"),
format!("{}{}", config_dir, "/certs/IotPrivateKey.pem"),
None,
);
let (iot_client, _) = mqtt_client::AsyncClient::new(aws_settings).await.unwrap();
let device_shadow: DeviceShadow;
device_shadow = DeviceShadow::new(
thing_name,
ShadowType::Named(shadow_name),
iot_client.get_eventloop_handle(),
);
let device_shadow_prefix = device_shadow.get_prefix();
println!("device_shadow_prefix: {:?}", device_shadow_prefix);
assert_eq!(
device_shadow_prefix,
"$aws/things/iot_client_id/shadow/name/iot_shadow/"
);
let device_shadow_topic = device_shadow.get_shadow_topic();
println!("device_shadow_topic: {:?}", device_shadow_topic);
assert_eq!(
device_shadow_topic.prefix,
"$aws/things/iot_client_id/shadow/name/iot_shadow/"
);
let device_shadow_name = device_shadow.get_shadow_name_from_aws_topic(&aws_topic);
println!("device_shadow_name: {:?}", device_shadow_name);
assert_eq!(device_shadow_name, "iot_shadow");
}
#[test]
fn build_shadow_message_payload_test() {
let shadow_value = "{\"iot_registration_status\":\"REGISTERED\"}".to_string();
let payload = build_shadow_message_payload(shadow_value, true);
println!("payload for reported_only=true: {}", payload);
assert_eq!(
payload,
"{\"state\":{\"reported\":{\"iot_registration_status\":\"REGISTERED\"}}}"
);
let shadow_value =
"{\"iot_registration_status\":\"CERTIFICATE_ROTATION_REQUESTED\"}".to_string();
let payload = build_shadow_message_payload(shadow_value, false);
println!("payload for reported_only=false: {}", payload);
assert_eq!(payload, "{\"state\":{\"desired\":{\"iot_registration_status\":\"CERTIFICATE_ROTATION_REQUESTED\"},\"reported\":{\"iot_registration_status\":\"CERTIFICATE_ROTATION_REQUESTED\"}}}");
}
}