pub mod dao;
pub mod data_types;
mod error;
mod shadow_diff;
pub mod topics;
use mqttrust::{Mqtt, QoS};
pub use data_types::Patch;
pub use error::Error;
use serde::de::DeserializeOwned;
pub use shadow_derive as derive;
pub use shadow_diff::ShadowPatch;
use data_types::{AcceptedResponse, DeltaResponse, ErrorResponse};
use topics::{Direction, Subscribe, Topic, Unsubscribe};
use self::dao::ShadowDAO;
const MAX_TOPIC_LEN: usize = 128;
const MAX_PAYLOAD_SIZE: usize = 512;
const PARTIAL_REQUEST_OVERHEAD: usize = 64;
pub trait ShadowState: ShadowPatch {
const NAME: Option<&'static str>;
const MAX_PAYLOAD_SIZE: usize = MAX_PAYLOAD_SIZE;
}
pub struct PersistedShadow<'a, S: ShadowState + DeserializeOwned, M: Mqtt, D: ShadowDAO<S>> {
shadow: Shadow<'a, S, M>,
pub(crate) dao: D,
}
impl<'a, S, M, D> PersistedShadow<'a, S, M, D>
where
S: ShadowState + DeserializeOwned,
M: Mqtt,
D: ShadowDAO<S>,
{
pub fn new(
initial_state: S,
mqtt: &'a M,
mut dao: D,
auto_subscribe: bool,
) -> Result<Self, Error> {
let state = dao.read().unwrap_or(initial_state);
Ok(Self {
shadow: Shadow::new(state, mqtt, auto_subscribe)?,
dao,
})
}
pub fn subscribe(&self) -> Result<(), Error> {
self.shadow.subscribe()
}
pub fn unsubscribe(&self) -> Result<(), Error> {
self.shadow.unsubscribe()
}
pub fn should_handle_topic(&mut self, topic: &str) -> bool {
self.shadow.should_handle_topic(topic)
}
#[must_use]
pub fn handle_message(
&mut self,
topic: &str,
payload: &[u8],
) -> Result<(&S, Option<S::PatchState>), Error> {
let result = self.shadow.handle_message(topic, payload);
if let Ok((state, Some(_))) = result {
self.dao.write(state)?;
}
result
}
pub fn get(&self) -> &S {
self.shadow.get()
}
pub fn get_shadow(&self) -> Result<(), Error> {
self.shadow.get_shadow()
}
pub fn report_shadow(&mut self) -> Result<(), Error> {
self.shadow.report_shadow()
}
pub fn update<F: FnOnce(&S, &mut S::PatchState)>(&mut self, f: F) -> Result<(), Error> {
self.shadow.update(f)?;
self.dao.write(self.shadow.get())?;
Ok(())
}
pub fn delete_shadow(&mut self) -> Result<(), Error> {
self.shadow.delete_shadow()
}
}
pub struct Shadow<'a, S: ShadowState, M: Mqtt> {
state: S,
mqtt: &'a M,
}
impl<'a, S, M> Shadow<'a, S, M>
where
S: ShadowState,
M: Mqtt,
{
pub fn new(state: S, mqtt: &'a M, auto_subscribe: bool) -> Result<Self, Error> {
let handler = Shadow { mqtt, state };
if auto_subscribe {
handler.subscribe()?;
}
Ok(handler)
}
pub fn subscribe(&self) -> Result<(), Error> {
Subscribe::<7>::new()
.topic(Topic::GetAccepted, QoS::AtLeastOnce)
.topic(Topic::GetRejected, QoS::AtLeastOnce)
.topic(Topic::DeleteAccepted, QoS::AtLeastOnce)
.topic(Topic::DeleteRejected, QoS::AtLeastOnce)
.topic(Topic::UpdateAccepted, QoS::AtLeastOnce)
.topic(Topic::UpdateRejected, QoS::AtLeastOnce)
.topic(Topic::UpdateDelta, QoS::AtLeastOnce)
.send(self.mqtt, S::NAME)?;
Ok(())
}
pub fn unsubscribe(&self) -> Result<(), Error> {
Unsubscribe::<7>::new()
.topic(Topic::GetAccepted)
.topic(Topic::GetRejected)
.topic(Topic::DeleteAccepted)
.topic(Topic::DeleteRejected)
.topic(Topic::UpdateAccepted)
.topic(Topic::UpdateRejected)
.topic(Topic::UpdateDelta)
.send(self.mqtt, S::NAME)?;
Ok(())
}
fn change_shadow_value(
&mut self,
delta: Option<S::PatchState>,
update_desired: Option<bool>,
) -> Result<(), Error> {
if let Some(ref delta) = delta {
self.state.apply_patch(delta.clone());
}
debug!(
"[{:?}] Updating reported shadow value. Update_desired: {:?}",
S::NAME.unwrap_or_else(|| "Classic"),
update_desired
);
if let Some(update_desired) = update_desired {
let desired = if update_desired {
Some(&self.state)
} else {
None
};
let request = data_types::Request {
state: data_types::State {
reported: Some(&self.state),
desired,
},
client_token: None,
version: None,
};
let payload = serde_json_core::to_vec::<
_,
{ MAX_PAYLOAD_SIZE + PARTIAL_REQUEST_OVERHEAD },
>(&request)
.map_err(|_| Error::Overflow)?;
let update_topic =
Topic::Update.format::<MAX_TOPIC_LEN>(self.mqtt.client_id(), S::NAME)?;
self.mqtt
.publish(update_topic.as_str(), &payload, QoS::AtLeastOnce)?;
}
Ok(())
}
pub fn should_handle_topic(&mut self, topic: &str) -> bool {
if let Some((_, thing_name, shadow_name)) = Topic::from_str(topic) {
return thing_name == self.mqtt.client_id() && shadow_name == S::NAME;
}
false
}
#[must_use]
pub fn handle_message(
&mut self,
topic: &str,
payload: &[u8],
) -> Result<(&S, Option<S::PatchState>), Error> {
let (topic, thing_name, shadow_name) =
Topic::from_str(topic).ok_or(Error::WrongShadowName)?;
assert_eq!(thing_name, self.mqtt.client_id());
assert_eq!(topic.direction(), Direction::Incoming);
if shadow_name != S::NAME {
return Err(Error::WrongShadowName);
}
let delta = match topic {
Topic::GetAccepted => {
serde_json_core::from_slice::<AcceptedResponse<S::PatchState>>(payload)
.map_err(|_| Error::InvalidPayload)
.and_then(|(response, _)| {
if let Some(_) = response.state.delta {
debug!(
"[{:?}] Received delta state",
S::NAME.unwrap_or_else(|| "Classic")
);
self.change_shadow_value(response.state.delta.clone(), Some(false))?;
} else if let Some(_) = response.state.reported {
self.change_shadow_value(response.state.reported, None)?;
}
Ok(response.state.delta)
})?
}
Topic::GetRejected | Topic::UpdateRejected => {
if let Ok((error, _)) = serde_json_core::from_slice::<ErrorResponse>(payload) {
if error.code == 404 && matches!(topic, Topic::GetRejected) {
debug!(
"[{:?}] Thing has no shadow document. Creating with defaults...",
S::NAME.unwrap_or_else(|| "Classic")
);
self.report_shadow()?;
} else {
error!(
"{:?} request was rejected. code: {:?} message:'{:?}'",
if matches!(topic, Topic::GetRejected) {
"Get"
} else {
"Update"
},
error.code,
error.message
);
}
}
None
}
Topic::UpdateDelta => {
debug!(
"[{:?}] Received shadow delta event.",
S::NAME.unwrap_or_else(|| "Classic"),
);
serde_json_core::from_slice::<DeltaResponse<S::PatchState>>(payload)
.map_err(|_| Error::InvalidPayload)
.and_then(|(delta, _)| {
if let Some(_) = delta.state {
debug!(
"[{:?}] Delta reports new desired value. Changing local value...",
S::NAME.unwrap_or_else(|| "Classic"),
);
}
self.change_shadow_value(delta.state.clone(), Some(false))?;
Ok(delta.state)
})?
}
Topic::UpdateAccepted => {
debug!(
"[{:?}] Finished updating reported shadow value.",
S::NAME.unwrap_or_else(|| "Classic")
);
None
}
_ => None,
};
Ok((self.get(), delta))
}
pub fn get(&self) -> &S {
&self.state
}
pub fn get_shadow(&self) -> Result<(), Error> {
let get_topic = Topic::Get.format::<MAX_TOPIC_LEN>(self.mqtt.client_id(), S::NAME)?;
self.mqtt
.publish(get_topic.as_str(), b"", QoS::AtLeastOnce)?;
Ok(())
}
pub fn report_shadow(&mut self) -> Result<(), Error> {
self.change_shadow_value(None, Some(false))?;
Ok(())
}
pub fn update<F: FnOnce(&S, &mut S::PatchState)>(&mut self, f: F) -> Result<(), Error> {
let mut desired = S::PatchState::default();
f(&self.state, &mut desired);
self.change_shadow_value(Some(desired), Some(false))?;
Ok(())
}
pub fn delete_shadow(&mut self) -> Result<(), Error> {
let delete_topic = Topic::Delete.format::<MAX_TOPIC_LEN>(self.mqtt.client_id(), S::NAME)?;
self.mqtt
.publish(delete_topic.as_str(), b"", QoS::AtLeastOnce)?;
Ok(())
}
}
impl<'a, S, M> core::fmt::Debug for Shadow<'a, S, M>
where
S: ShadowState + core::fmt::Debug,
M: Mqtt,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(
f,
"[{:?}] = {:?}",
S::NAME.unwrap_or_else(|| "Classic"),
self.get()
)
}
}
#[cfg(feature = "defmt")]
impl<'a, S, M> defmt::Format for Shadow<'a, S, M>
where
S: ShadowState + defmt::Format,
M: Mqtt,
{
fn format(&self, fmt: defmt::Formatter) {
defmt::write!(
fmt,
"[{:?}] = {:?}",
S::NAME.unwrap_or_else(|| "Classic"),
self.get()
)
}
}
impl<'a, S, M> Drop for Shadow<'a, S, M>
where
S: ShadowState,
M: Mqtt,
{
fn drop(&mut self) {
self.unsubscribe().ok();
}
}