winterbaume_iotdataplane/
views.rs1use std::collections::HashMap;
4
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use winterbaume_core::{StateChangeNotifier, StateViewError, StatefulService};
8
9use crate::handlers::IotDataPlaneService;
10use crate::state::IotDataPlaneState;
11use crate::types::{PublishedMessage, RetainedMessage, ShadowKey, ThingShadow};
12
13#[derive(Debug, Clone, Serialize, Deserialize, Default)]
14pub struct IotDataPlaneStateView {
15 #[serde(default)]
17 pub shadows: HashMap<String, ThingShadowView>,
18 #[serde(default)]
20 pub retained_messages: HashMap<String, RetainedMessageView>,
21 #[serde(default)]
23 pub published_messages: Vec<PublishedMessageView>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct ThingShadowView {
28 pub thing_name: String,
29 pub shadow_name: Option<String>,
30 pub payload_hex: String,
32 pub version: i64,
33 pub last_modified: String,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct RetainedMessageView {
38 pub topic: String,
39 pub payload_hex: String,
41 pub qos: i32,
42 pub last_modified: String,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct PublishedMessageView {
48 pub topic: String,
49 pub payload_hex: String,
51 pub qos: i32,
52 pub retain: bool,
53 pub published_at: String,
54}
55
56fn shadow_key_str(key: &ShadowKey) -> String {
57 format!(
58 "{}:{}",
59 key.thing_name,
60 key.shadow_name.as_deref().unwrap_or("")
61 )
62}
63
64fn parse_dt(s: &str) -> DateTime<Utc> {
65 DateTime::parse_from_rfc3339(s)
66 .map(|dt| dt.with_timezone(&Utc))
67 .unwrap_or_else(|_| Utc::now())
68}
69
70fn bytes_to_hex(bytes: &[u8]) -> String {
71 bytes.iter().map(|b| format!("{:02x}", b)).collect()
72}
73
74fn hex_to_bytes(hex: &str) -> Vec<u8> {
75 (0..hex.len())
76 .step_by(2)
77 .filter_map(|i| u8::from_str_radix(&hex[i..i + 2], 16).ok())
78 .collect()
79}
80
81impl From<&IotDataPlaneState> for IotDataPlaneStateView {
84 fn from(state: &IotDataPlaneState) -> Self {
85 IotDataPlaneStateView {
86 shadows: state
87 .shadows
88 .iter()
89 .map(|(key, shadow)| {
90 (
91 shadow_key_str(key),
92 ThingShadowView {
93 thing_name: shadow.thing_name.clone(),
94 shadow_name: shadow.shadow_name.clone(),
95 payload_hex: bytes_to_hex(&shadow.payload),
96 version: shadow.version,
97 last_modified: shadow.last_modified.to_rfc3339(),
98 },
99 )
100 })
101 .collect(),
102 retained_messages: state
103 .retained_messages
104 .iter()
105 .map(|(topic, msg)| {
106 (
107 topic.clone(),
108 RetainedMessageView {
109 topic: msg.topic.clone(),
110 payload_hex: bytes_to_hex(&msg.payload),
111 qos: msg.qos,
112 last_modified: msg.last_modified.to_rfc3339(),
113 },
114 )
115 })
116 .collect(),
117 published_messages: state
118 .published_messages
119 .iter()
120 .map(|msg| PublishedMessageView {
121 topic: msg.topic.clone(),
122 payload_hex: bytes_to_hex(&msg.payload),
123 qos: msg.qos,
124 retain: msg.retain,
125 published_at: msg.published_at.to_rfc3339(),
126 })
127 .collect(),
128 }
129 }
130}
131
132impl From<IotDataPlaneStateView> for IotDataPlaneState {
135 fn from(view: IotDataPlaneStateView) -> Self {
136 let shadows = view
137 .shadows
138 .into_values()
139 .map(|sv| {
140 let key = match sv.shadow_name.as_deref() {
141 Some(name) if !name.is_empty() => ShadowKey::named(&sv.thing_name, name),
142 _ => ShadowKey::classic(&sv.thing_name),
143 };
144 let payload = hex_to_bytes(&sv.payload_hex);
145 let shadow = ThingShadow {
146 thing_name: sv.thing_name,
147 shadow_name: sv.shadow_name,
148 payload,
149 version: sv.version,
150 last_modified: parse_dt(&sv.last_modified),
151 };
152 (key, shadow)
153 })
154 .collect();
155
156 let retained_messages = view
157 .retained_messages
158 .into_values()
159 .map(|rv| {
160 let msg = RetainedMessage {
161 topic: rv.topic.clone(),
162 payload: hex_to_bytes(&rv.payload_hex),
163 qos: rv.qos,
164 last_modified: parse_dt(&rv.last_modified),
165 };
166 (rv.topic, msg)
167 })
168 .collect();
169
170 let published_messages = view
171 .published_messages
172 .into_iter()
173 .map(|pv| PublishedMessage {
174 topic: pv.topic,
175 payload: hex_to_bytes(&pv.payload_hex),
176 qos: pv.qos,
177 retain: pv.retain,
178 published_at: parse_dt(&pv.published_at),
179 })
180 .collect();
181
182 IotDataPlaneState {
183 shadows,
184 published_messages,
185 retained_messages,
186 }
187 }
188}
189
190impl StatefulService for IotDataPlaneService {
193 type StateView = IotDataPlaneStateView;
194
195 async fn snapshot(&self, account_id: &str, region: &str) -> Self::StateView {
196 let state = self.state.get(account_id, region);
197 let guard = state.read().await;
198 IotDataPlaneStateView::from(&*guard)
199 }
200
201 async fn restore(
202 &self,
203 account_id: &str,
204 region: &str,
205 view: Self::StateView,
206 ) -> Result<(), StateViewError> {
207 let state = self.state.get(account_id, region);
208 {
209 let mut guard = state.write().await;
210 *guard = IotDataPlaneState::from(view);
211 }
212 self.notify_state_changed(account_id, region).await;
213 Ok(())
214 }
215
216 async fn merge(
217 &self,
218 account_id: &str,
219 region: &str,
220 view: Self::StateView,
221 ) -> Result<(), StateViewError> {
222 let state = self.state.get(account_id, region);
223 {
224 let mut guard = state.write().await;
225 let incoming = IotDataPlaneState::from(view);
226 for (k, v) in incoming.shadows {
227 guard.shadows.insert(k, v);
228 }
229 for (k, v) in incoming.retained_messages {
230 guard.retained_messages.insert(k, v);
231 }
232 guard.published_messages.extend(incoming.published_messages);
233 }
234 self.notify_state_changed(account_id, region).await;
235 Ok(())
236 }
237
238 fn notifier(&self) -> &StateChangeNotifier<Self::StateView> {
239 &self.notifier
240 }
241}