winterbaume_iotdataplane/
state.rs1use std::collections::HashMap;
2
3use chrono::Utc;
4use thiserror::Error;
5
6use crate::types::*;
7
8#[derive(Debug, Default)]
9pub struct IotDataPlaneState {
10 pub shadows: HashMap<ShadowKey, ThingShadow>,
11 pub published_messages: Vec<PublishedMessage>,
12 pub retained_messages: HashMap<String, RetainedMessage>,
13}
14
15#[derive(Debug, Error)]
16pub enum IotDataPlaneError {
17 #[error("No shadow exists with name: '{shadow_name}'")]
18 ShadowNotFound { shadow_name: String },
19 #[error("No retained message found for topic: '{topic}'")]
20 RetainedMessageNotFound { topic: String },
21 #[error("Invalid JSON in shadow document")]
22 InvalidShadowDocument,
23}
24
25impl IotDataPlaneState {
26 pub fn get_thing_shadow(
27 &self,
28 thing_name: &str,
29 shadow_name: Option<&str>,
30 ) -> Result<&ThingShadow, IotDataPlaneError> {
31 let key = match shadow_name {
32 Some(name) => ShadowKey::named(thing_name, name),
33 None => ShadowKey::classic(thing_name),
34 };
35 self.shadows
36 .get(&key)
37 .ok_or_else(|| IotDataPlaneError::ShadowNotFound {
38 shadow_name: shadow_name.unwrap_or("classic").to_string(),
39 })
40 }
41
42 pub fn update_thing_shadow(
43 &mut self,
44 thing_name: &str,
45 shadow_name: Option<&str>,
46 payload: Vec<u8>,
47 ) -> Result<&ThingShadow, IotDataPlaneError> {
48 let incoming: serde_json::Value = serde_json::from_slice(&payload)
49 .map_err(|_| IotDataPlaneError::InvalidShadowDocument)?;
50
51 let new_state = incoming
52 .get("state")
53 .cloned()
54 .unwrap_or(serde_json::Value::Object(Default::default()));
55
56 let key = match shadow_name {
57 Some(name) => ShadowKey::named(thing_name, name),
58 None => ShadowKey::classic(thing_name),
59 };
60
61 let version = self.shadows.get(&key).map(|s| s.version + 1).unwrap_or(1);
62 let now = Utc::now();
63
64 let full_doc = serde_json::json!({
65 "state": new_state,
66 "metadata": {},
67 "version": version,
68 "timestamp": now.timestamp(),
69 });
70
71 let shadow = ThingShadow {
72 thing_name: thing_name.to_string(),
73 shadow_name: shadow_name.map(|s| s.to_string()),
74 payload: full_doc.to_string().into_bytes(),
75 version,
76 last_modified: now,
77 };
78
79 self.shadows.insert(key.clone(), shadow);
80 Ok(self.shadows.get(&key).unwrap())
81 }
82
83 pub fn delete_thing_shadow(
84 &mut self,
85 thing_name: &str,
86 shadow_name: Option<&str>,
87 ) -> Result<ThingShadow, IotDataPlaneError> {
88 let key = match shadow_name {
89 Some(name) => ShadowKey::named(thing_name, name),
90 None => ShadowKey::classic(thing_name),
91 };
92 self.shadows
93 .remove(&key)
94 .ok_or_else(|| IotDataPlaneError::ShadowNotFound {
95 shadow_name: shadow_name.unwrap_or("classic").to_string(),
96 })
97 }
98
99 pub fn list_named_shadows_for_thing(&self, thing_name: &str) -> Vec<String> {
100 self.shadows
101 .keys()
102 .filter(|k| k.thing_name == thing_name && k.shadow_name.is_some())
103 .filter_map(|k| k.shadow_name.clone())
104 .collect()
105 }
106
107 pub fn publish(&mut self, topic: &str, payload: Vec<u8>, qos: i32, retain: bool) {
108 let now = Utc::now();
109 if retain {
110 let retained = RetainedMessage {
111 topic: topic.to_string(),
112 payload: payload.clone(),
113 qos,
114 last_modified: now,
115 };
116 self.retained_messages.insert(topic.to_string(), retained);
117 }
118 let msg = PublishedMessage {
119 topic: topic.to_string(),
120 payload,
121 qos,
122 retain,
123 published_at: now,
124 };
125 self.published_messages.push(msg);
126 }
127
128 pub fn get_retained_message(&self, topic: &str) -> Result<&RetainedMessage, IotDataPlaneError> {
129 self.retained_messages.get(topic).ok_or_else(|| {
130 IotDataPlaneError::RetainedMessageNotFound {
131 topic: topic.to_string(),
132 }
133 })
134 }
135
136 pub fn list_retained_messages(&self) -> Vec<&RetainedMessage> {
137 self.retained_messages.values().collect()
138 }
139}