astarte_device_sdk/store/
memory.rs1use std::{collections::HashMap, fmt::Display, sync::Arc};
22
23use astarte_interfaces::schema::Ownership;
24use astarte_interfaces::{Properties, Schema};
25use tokio::sync::RwLock;
26use tracing::error;
27
28use super::{OptStoredProp, PropertyMapping, PropertyStore, StoreCapabilities, StoredProp};
29use crate::store::{MissingCapability, PropertyState};
30use crate::types::AstarteData;
31
32#[non_exhaustive]
36#[derive(Debug, Clone, thiserror::Error)]
37pub enum MemoryError {}
38
39#[derive(Debug, Clone, Default)]
43pub struct MemoryStore {
44 store: Arc<RwLock<HashMap<Key, Value>>>,
46}
47
48impl MemoryStore {
49 pub fn new() -> Self {
51 MemoryStore {
52 store: Arc::new(RwLock::new(HashMap::new())),
53 }
54 }
55}
56
57impl StoreCapabilities for MemoryStore {
58 type Retention = MissingCapability;
59 type Session = MissingCapability;
60
61 fn get_retention(&self) -> Option<&Self::Retention> {
62 None
63 }
64
65 fn get_session(&self) -> Option<&Self::Session> {
66 None
67 }
68}
69
70impl PropertyStore for MemoryStore {
71 type Err = MemoryError;
72
73 async fn store_prop(
74 &self,
75 StoredProp {
76 interface,
77 path,
78 value,
79 interface_major,
80 ownership,
81 }: StoredProp<&str, &AstarteData>,
82 ) -> Result<(), Self::Err> {
83 let key = Key::new(interface, path);
84 let value = Value {
85 value: Some(value.clone()),
86 interface_major,
87 ownership,
88 state: PropertyState::Changed,
89 };
90
91 let mut store = self.store.write().await;
92
93 store.insert(key, value);
94
95 Ok(())
96 }
97
98 async fn update_state(
99 &self,
100 property: &PropertyMapping<'_>,
101 state: PropertyState,
102 expected: Option<AstarteData>,
103 ) -> Result<bool, Self::Err> {
104 let key = Key::new(property.interface_name(), property.path());
105
106 if let Some(val) = self.store.write().await.get_mut(&key)
107 && val.value == expected
108 {
109 val.state = state;
110
111 Ok(true)
112 } else {
113 Ok(false)
114 }
115 }
116
117 async fn load_prop(
118 &self,
119 property: &PropertyMapping<'_>,
120 ) -> Result<Option<AstarteData>, Self::Err> {
121 let key = Key::new(property.interface_name(), property.path());
122
123 let opt_val = {
125 let store = self.store.read().await;
126
127 store.get(&key).cloned()
128 };
129
130 match opt_val {
131 Some(value) if value.interface_major != property.version_major() => {
132 error!(
133 "Version mismatch for property {}{} (stored {}, interface {}). Deleting.",
134 property.interface_name(),
135 property.path(),
136 value.interface_major,
137 property.version_major()
138 );
139
140 self.delete_prop(property).await?;
141
142 Ok(None)
143 }
144 Some(value) => Ok(value.value),
145 None => Ok(None),
146 }
147 }
148
149 async fn unset_prop(&self, property: &PropertyMapping<'_>) -> Result<(), Self::Err> {
150 let key = Key::new(property.interface_name(), property.path());
151
152 let mut writer = self.store.write().await;
153
154 if let Some(value) = writer.get_mut(&key) {
155 value.value = None;
156 }
157
158 Ok(())
159 }
160
161 async fn delete_prop(&self, property: &PropertyMapping<'_>) -> Result<(), Self::Err> {
162 let key = Key::new(property.interface_name(), property.path());
163
164 let mut store = self.store.write().await;
165
166 store.remove(&key);
167
168 Ok(())
169 }
170
171 async fn delete_expected_prop(
172 &self,
173 property: &PropertyMapping<'_>,
174 expected: Option<AstarteData>,
175 ) -> Result<bool, Self::Err> {
176 let key = Key::new(property.interface_name(), property.path());
177
178 let mut store = self.store.write().await;
179
180 if let Some(val) = store.get_mut(&key)
181 && val.value == expected
182 {
183 store.remove(&key);
184
185 Ok(true)
186 } else {
187 Ok(false)
188 }
189 }
190
191 async fn clear(&self) -> Result<(), Self::Err> {
192 let mut store = self.store.write().await;
193
194 store.clear();
195
196 Ok(())
197 }
198
199 async fn load_all_props(&self) -> Result<Vec<StoredProp>, Self::Err> {
200 let store = self.store.read().await;
201
202 let props = store.iter().filter_map(|(k, v)| v.as_prop(k)).collect();
203
204 Ok(props)
205 }
206
207 async fn server_props(&self) -> Result<Vec<StoredProp>, Self::Err> {
208 let store = self.store.read().await;
209
210 let props = store
211 .iter()
212 .filter_map(|(k, v)| match v.ownership {
213 Ownership::Device => None,
214 Ownership::Server => v.as_prop(k),
215 })
216 .collect();
217
218 Ok(props)
219 }
220
221 async fn device_props(&self) -> Result<Vec<StoredProp>, Self::Err> {
222 let store = self.store.read().await;
223
224 let props = store
225 .iter()
226 .filter_map(|(k, v)| match v.ownership {
227 Ownership::Device => v.as_prop(k),
228 Ownership::Server => None,
229 })
230 .collect();
231
232 Ok(props)
233 }
234
235 async fn interface_props(&self, interface: &Properties) -> Result<Vec<StoredProp>, Self::Err> {
236 Ok(self
237 .store
238 .read()
239 .await
240 .iter()
241 .filter_map(|(k, v)| {
242 if k.interface == interface.name() {
243 v.as_prop(k)
244 } else {
245 None
246 }
247 })
248 .collect())
249 }
250
251 async fn delete_interface(&self, interface: &Properties) -> Result<(), Self::Err> {
252 self.store
253 .write()
254 .await
255 .retain(|k, _v| k.interface != interface.name());
256
257 Ok(())
258 }
259
260 async fn device_props_with_unset(
261 &self,
262 state: PropertyState,
263 limit: usize,
264 offset: usize,
265 ) -> Result<Vec<OptStoredProp>, Self::Err> {
266 let store = self.store.read().await;
267
268 let props = store
269 .iter()
270 .filter_map(|(k, v)| {
271 if v.state != state {
272 return None;
273 }
274
275 match v.ownership {
276 Ownership::Device => Some(OptStoredProp::from((k, v))),
277 Ownership::Server => None,
278 }
279 })
280 .skip(offset)
281 .take(limit)
282 .collect();
283
284 Ok(props)
285 }
286
287 async fn reset_state(&self, ownership: Ownership) -> Result<(), Self::Err> {
288 self.store
289 .write()
290 .await
291 .values_mut()
292 .filter(|v| v.ownership == ownership)
293 .for_each(|v| v.state = PropertyState::Changed);
294
295 Ok(())
296 }
297}
298
299#[derive(Debug, Clone, Hash, PartialEq, Eq)]
302struct Key {
303 interface: String,
304 path: String,
305}
306
307impl Key {
308 fn new(interface: &str, path: &str) -> Self {
310 Key {
311 interface: interface.to_string(),
312 path: path.to_string(),
313 }
314 }
315}
316
317impl Display for Key {
318 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
319 write!(f, "{}{}", self.interface, self.path)
320 }
321}
322
323#[derive(Debug, Clone)]
325struct Value {
326 value: Option<AstarteData>,
327 interface_major: i32,
328 ownership: Ownership,
329 state: PropertyState,
330}
331
332impl Value {
333 fn as_prop(&self, key: &Key) -> Option<StoredProp> {
334 self.value.as_ref().map(|value| StoredProp {
335 interface: key.interface.clone(),
336 path: key.path.clone(),
337 value: value.clone(),
338 interface_major: self.interface_major,
339 ownership: self.ownership,
340 })
341 }
342}
343
344impl From<(&Key, &Value)> for OptStoredProp {
345 fn from((key, value): (&Key, &Value)) -> Self {
346 Self {
347 interface: key.interface.clone(),
348 path: key.path.clone(),
349 value: value.value.clone(),
350 interface_major: value.interface_major,
351 ownership: value.ownership,
352 }
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 use super::*;
359 use crate::store::tests::test_property_store;
360
361 #[tokio::test]
362 async fn test_memory_store() {
363 let db = MemoryStore::new();
364
365 test_property_store(db).await;
366 }
367}