Skip to main content

winterbaume_iotdataplane/
state.rs

1use std::collections::HashMap;
2
3use chrono::Utc;
4use thiserror::Error;
5
6use crate::types::*;
7
8#[derive(Debug, Default)]
9pub struct IotDataPlaneState {
10    pub shadows: HashMap<ShadowKey, ThingShadow>,
11    pub published_messages: Vec<PublishedMessage>,
12    pub retained_messages: HashMap<String, RetainedMessage>,
13}
14
15#[derive(Debug, Error)]
16pub enum IotDataPlaneError {
17    #[error("No shadow exists with name: '{shadow_name}'")]
18    ShadowNotFound { shadow_name: String },
19    #[error("No retained message found for topic: '{topic}'")]
20    RetainedMessageNotFound { topic: String },
21    #[error("Invalid JSON in shadow document")]
22    InvalidShadowDocument,
23}
24
25impl IotDataPlaneState {
26    pub fn get_thing_shadow(
27        &self,
28        thing_name: &str,
29        shadow_name: Option<&str>,
30    ) -> Result<&ThingShadow, IotDataPlaneError> {
31        let key = match shadow_name {
32            Some(name) => ShadowKey::named(thing_name, name),
33            None => ShadowKey::classic(thing_name),
34        };
35        self.shadows
36            .get(&key)
37            .ok_or_else(|| IotDataPlaneError::ShadowNotFound {
38                shadow_name: shadow_name.unwrap_or("classic").to_string(),
39            })
40    }
41
42    pub fn update_thing_shadow(
43        &mut self,
44        thing_name: &str,
45        shadow_name: Option<&str>,
46        payload: Vec<u8>,
47    ) -> Result<&ThingShadow, IotDataPlaneError> {
48        let incoming: serde_json::Value = serde_json::from_slice(&payload)
49            .map_err(|_| IotDataPlaneError::InvalidShadowDocument)?;
50
51        let new_state = incoming
52            .get("state")
53            .cloned()
54            .unwrap_or(serde_json::Value::Object(Default::default()));
55
56        let key = match shadow_name {
57            Some(name) => ShadowKey::named(thing_name, name),
58            None => ShadowKey::classic(thing_name),
59        };
60
61        let version = self.shadows.get(&key).map(|s| s.version + 1).unwrap_or(1);
62        let now = Utc::now();
63
64        let full_doc = serde_json::json!({
65            "state": new_state,
66            "metadata": {},
67            "version": version,
68            "timestamp": now.timestamp(),
69        });
70
71        let shadow = ThingShadow {
72            thing_name: thing_name.to_string(),
73            shadow_name: shadow_name.map(|s| s.to_string()),
74            payload: full_doc.to_string().into_bytes(),
75            version,
76            last_modified: now,
77        };
78
79        self.shadows.insert(key.clone(), shadow);
80        Ok(self.shadows.get(&key).unwrap())
81    }
82
83    pub fn delete_thing_shadow(
84        &mut self,
85        thing_name: &str,
86        shadow_name: Option<&str>,
87    ) -> Result<ThingShadow, IotDataPlaneError> {
88        let key = match shadow_name {
89            Some(name) => ShadowKey::named(thing_name, name),
90            None => ShadowKey::classic(thing_name),
91        };
92        self.shadows
93            .remove(&key)
94            .ok_or_else(|| IotDataPlaneError::ShadowNotFound {
95                shadow_name: shadow_name.unwrap_or("classic").to_string(),
96            })
97    }
98
99    pub fn list_named_shadows_for_thing(&self, thing_name: &str) -> Vec<String> {
100        self.shadows
101            .keys()
102            .filter(|k| k.thing_name == thing_name && k.shadow_name.is_some())
103            .filter_map(|k| k.shadow_name.clone())
104            .collect()
105    }
106
107    pub fn publish(&mut self, topic: &str, payload: Vec<u8>, qos: i32, retain: bool) {
108        let now = Utc::now();
109        if retain {
110            let retained = RetainedMessage {
111                topic: topic.to_string(),
112                payload: payload.clone(),
113                qos,
114                last_modified: now,
115            };
116            self.retained_messages.insert(topic.to_string(), retained);
117        }
118        let msg = PublishedMessage {
119            topic: topic.to_string(),
120            payload,
121            qos,
122            retain,
123            published_at: now,
124        };
125        self.published_messages.push(msg);
126    }
127
128    pub fn get_retained_message(&self, topic: &str) -> Result<&RetainedMessage, IotDataPlaneError> {
129        self.retained_messages.get(topic).ok_or_else(|| {
130            IotDataPlaneError::RetainedMessageNotFound {
131                topic: topic.to_string(),
132            }
133        })
134    }
135
136    pub fn list_retained_messages(&self) -> Vec<&RetainedMessage> {
137        self.retained_messages.values().collect()
138    }
139}