Skip to main content

winterbaume_iotdataplane/
views.rs

1//! Serde-compatible view types for IoT Data Plane state snapshots.
2
3use std::collections::HashMap;
4
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use winterbaume_core::{StateChangeNotifier, StateViewError, StatefulService};
8
9use crate::handlers::IotDataPlaneService;
10use crate::state::IotDataPlaneState;
11use crate::types::{PublishedMessage, RetainedMessage, ShadowKey, ThingShadow};
12
13#[derive(Debug, Clone, Serialize, Deserialize, Default)]
14pub struct IotDataPlaneStateView {
15    /// Shadows keyed by "{thing_name}:{shadow_name}" or "{thing_name}:" for classic.
16    #[serde(default)]
17    pub shadows: HashMap<String, ThingShadowView>,
18    /// Retained MQTT messages keyed by topic.
19    #[serde(default)]
20    pub retained_messages: HashMap<String, RetainedMessageView>,
21    /// MQTT publish history (newest at the end).
22    #[serde(default)]
23    pub published_messages: Vec<PublishedMessageView>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct ThingShadowView {
28    pub thing_name: String,
29    pub shadow_name: Option<String>,
30    /// Payload stored as hex-encoded bytes.
31    pub payload_hex: String,
32    pub version: i64,
33    pub last_modified: String,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct RetainedMessageView {
38    pub topic: String,
39    /// Payload stored as hex-encoded bytes.
40    pub payload_hex: String,
41    pub qos: i32,
42    pub last_modified: String,
43}
44
45/// Serializable view of a single MQTT publish event.
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct PublishedMessageView {
48    pub topic: String,
49    /// Payload stored as hex-encoded bytes.
50    pub payload_hex: String,
51    pub qos: i32,
52    pub retain: bool,
53    pub published_at: String,
54}
55
56fn shadow_key_str(key: &ShadowKey) -> String {
57    format!(
58        "{}:{}",
59        key.thing_name,
60        key.shadow_name.as_deref().unwrap_or("")
61    )
62}
63
64fn parse_dt(s: &str) -> DateTime<Utc> {
65    DateTime::parse_from_rfc3339(s)
66        .map(|dt| dt.with_timezone(&Utc))
67        .unwrap_or_else(|_| Utc::now())
68}
69
70fn bytes_to_hex(bytes: &[u8]) -> String {
71    bytes.iter().map(|b| format!("{:02x}", b)).collect()
72}
73
74fn hex_to_bytes(hex: &str) -> Vec<u8> {
75    (0..hex.len())
76        .step_by(2)
77        .filter_map(|i| u8::from_str_radix(&hex[i..i + 2], 16).ok())
78        .collect()
79}
80
81// --- From internal types to view types ---
82
83impl From<&IotDataPlaneState> for IotDataPlaneStateView {
84    fn from(state: &IotDataPlaneState) -> Self {
85        IotDataPlaneStateView {
86            shadows: state
87                .shadows
88                .iter()
89                .map(|(key, shadow)| {
90                    (
91                        shadow_key_str(key),
92                        ThingShadowView {
93                            thing_name: shadow.thing_name.clone(),
94                            shadow_name: shadow.shadow_name.clone(),
95                            payload_hex: bytes_to_hex(&shadow.payload),
96                            version: shadow.version,
97                            last_modified: shadow.last_modified.to_rfc3339(),
98                        },
99                    )
100                })
101                .collect(),
102            retained_messages: state
103                .retained_messages
104                .iter()
105                .map(|(topic, msg)| {
106                    (
107                        topic.clone(),
108                        RetainedMessageView {
109                            topic: msg.topic.clone(),
110                            payload_hex: bytes_to_hex(&msg.payload),
111                            qos: msg.qos,
112                            last_modified: msg.last_modified.to_rfc3339(),
113                        },
114                    )
115                })
116                .collect(),
117            published_messages: state
118                .published_messages
119                .iter()
120                .map(|msg| PublishedMessageView {
121                    topic: msg.topic.clone(),
122                    payload_hex: bytes_to_hex(&msg.payload),
123                    qos: msg.qos,
124                    retain: msg.retain,
125                    published_at: msg.published_at.to_rfc3339(),
126                })
127                .collect(),
128        }
129    }
130}
131
132// --- From view types to internal types ---
133
134impl From<IotDataPlaneStateView> for IotDataPlaneState {
135    fn from(view: IotDataPlaneStateView) -> Self {
136        let shadows = view
137            .shadows
138            .into_values()
139            .map(|sv| {
140                let key = match sv.shadow_name.as_deref() {
141                    Some(name) if !name.is_empty() => ShadowKey::named(&sv.thing_name, name),
142                    _ => ShadowKey::classic(&sv.thing_name),
143                };
144                let payload = hex_to_bytes(&sv.payload_hex);
145                let shadow = ThingShadow {
146                    thing_name: sv.thing_name,
147                    shadow_name: sv.shadow_name,
148                    payload,
149                    version: sv.version,
150                    last_modified: parse_dt(&sv.last_modified),
151                };
152                (key, shadow)
153            })
154            .collect();
155
156        let retained_messages = view
157            .retained_messages
158            .into_values()
159            .map(|rv| {
160                let msg = RetainedMessage {
161                    topic: rv.topic.clone(),
162                    payload: hex_to_bytes(&rv.payload_hex),
163                    qos: rv.qos,
164                    last_modified: parse_dt(&rv.last_modified),
165                };
166                (rv.topic, msg)
167            })
168            .collect();
169
170        let published_messages = view
171            .published_messages
172            .into_iter()
173            .map(|pv| PublishedMessage {
174                topic: pv.topic,
175                payload: hex_to_bytes(&pv.payload_hex),
176                qos: pv.qos,
177                retain: pv.retain,
178                published_at: parse_dt(&pv.published_at),
179            })
180            .collect();
181
182        IotDataPlaneState {
183            shadows,
184            published_messages,
185            retained_messages,
186        }
187    }
188}
189
190// --- StatefulService implementation ---
191
192impl StatefulService for IotDataPlaneService {
193    type StateView = IotDataPlaneStateView;
194
195    async fn snapshot(&self, account_id: &str, region: &str) -> Self::StateView {
196        let state = self.state.get(account_id, region);
197        let guard = state.read().await;
198        IotDataPlaneStateView::from(&*guard)
199    }
200
201    async fn restore(
202        &self,
203        account_id: &str,
204        region: &str,
205        view: Self::StateView,
206    ) -> Result<(), StateViewError> {
207        let state = self.state.get(account_id, region);
208        {
209            let mut guard = state.write().await;
210            *guard = IotDataPlaneState::from(view);
211        }
212        self.notify_state_changed(account_id, region).await;
213        Ok(())
214    }
215
216    async fn merge(
217        &self,
218        account_id: &str,
219        region: &str,
220        view: Self::StateView,
221    ) -> Result<(), StateViewError> {
222        let state = self.state.get(account_id, region);
223        {
224            let mut guard = state.write().await;
225            let incoming = IotDataPlaneState::from(view);
226            for (k, v) in incoming.shadows {
227                guard.shadows.insert(k, v);
228            }
229            for (k, v) in incoming.retained_messages {
230                guard.retained_messages.insert(k, v);
231            }
232            guard.published_messages.extend(incoming.published_messages);
233        }
234        self.notify_state_changed(account_id, region).await;
235        Ok(())
236    }
237
238    fn notifier(&self) -> &StateChangeNotifier<Self::StateView> {
239        &self.notifier
240    }
241}