Skip to main content

astarte_device_sdk/store/
memory.rs

1// This file is part of Astarte.
2//
3// Copyright 2023-2026 SECO Mind Srl
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//    http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// SPDX-License-Identifier: Apache-2.0
18
19//! In memory store for the properties.
20
21use std::{collections::HashMap, fmt::Display, sync::Arc};
22
23use astarte_interfaces::schema::Ownership;
24use astarte_interfaces::{Properties, Schema};
25use tokio::sync::RwLock;
26use tracing::error;
27
28use super::{OptStoredProp, PropertyMapping, PropertyStore, StoreCapabilities, StoredProp};
29use crate::store::{MissingCapability, PropertyState};
30use crate::types::AstarteData;
31
32/// Error from the memory store.
33///
34/// This error has no variants, but it is defined to allow for future changes.
35#[non_exhaustive]
36#[derive(Debug, Clone, thiserror::Error)]
37pub enum MemoryError {}
38
39/// Data structure providing an implementation of an in memory Key Value Store.
40///
41/// Can be used by an Astarte device to store variables while the device is running.
42#[derive(Debug, Clone, Default)]
43pub struct MemoryStore {
44    // Store the properties in memory
45    store: Arc<RwLock<HashMap<Key, Value>>>,
46}
47
48impl MemoryStore {
49    /// Creates an in memory Key Value Store for the Astarte device.
50    pub fn new() -> Self {
51        MemoryStore {
52            store: Arc::new(RwLock::new(HashMap::new())),
53        }
54    }
55}
56
57impl StoreCapabilities for MemoryStore {
58    type Retention = MissingCapability;
59    type Session = MissingCapability;
60
61    fn get_retention(&self) -> Option<&Self::Retention> {
62        None
63    }
64
65    fn get_session(&self) -> Option<&Self::Session> {
66        None
67    }
68}
69
70impl PropertyStore for MemoryStore {
71    type Err = MemoryError;
72
73    async fn store_prop(
74        &self,
75        StoredProp {
76            interface,
77            path,
78            value,
79            interface_major,
80            ownership,
81        }: StoredProp<&str, &AstarteData>,
82    ) -> Result<(), Self::Err> {
83        let key = Key::new(interface, path);
84        let value = Value {
85            value: Some(value.clone()),
86            interface_major,
87            ownership,
88            state: PropertyState::Changed,
89        };
90
91        let mut store = self.store.write().await;
92
93        store.insert(key, value);
94
95        Ok(())
96    }
97
98    async fn update_state(
99        &self,
100        property: &PropertyMapping<'_>,
101        state: PropertyState,
102        expected: Option<AstarteData>,
103    ) -> Result<bool, Self::Err> {
104        let key = Key::new(property.interface_name(), property.path());
105
106        if let Some(val) = self.store.write().await.get_mut(&key)
107            && val.value == expected
108        {
109            val.state = state;
110
111            Ok(true)
112        } else {
113            Ok(false)
114        }
115    }
116
117    async fn load_prop(
118        &self,
119        property: &PropertyMapping<'_>,
120    ) -> Result<Option<AstarteData>, Self::Err> {
121        let key = Key::new(property.interface_name(), property.path());
122
123        // We need to drop the lock before calling delete_prop
124        let opt_val = {
125            let store = self.store.read().await;
126
127            store.get(&key).cloned()
128        };
129
130        match opt_val {
131            Some(value) if value.interface_major != property.version_major() => {
132                error!(
133                    "Version mismatch for property {}{} (stored {}, interface {}). Deleting.",
134                    property.interface_name(),
135                    property.path(),
136                    value.interface_major,
137                    property.version_major()
138                );
139
140                self.delete_prop(property).await?;
141
142                Ok(None)
143            }
144            Some(value) => Ok(value.value),
145            None => Ok(None),
146        }
147    }
148
149    async fn unset_prop(&self, property: &PropertyMapping<'_>) -> Result<(), Self::Err> {
150        let key = Key::new(property.interface_name(), property.path());
151
152        let mut writer = self.store.write().await;
153
154        if let Some(value) = writer.get_mut(&key) {
155            value.value = None;
156        }
157
158        Ok(())
159    }
160
161    async fn delete_prop(&self, property: &PropertyMapping<'_>) -> Result<(), Self::Err> {
162        let key = Key::new(property.interface_name(), property.path());
163
164        let mut store = self.store.write().await;
165
166        store.remove(&key);
167
168        Ok(())
169    }
170
171    async fn delete_expected_prop(
172        &self,
173        property: &PropertyMapping<'_>,
174        expected: Option<AstarteData>,
175    ) -> Result<bool, Self::Err> {
176        let key = Key::new(property.interface_name(), property.path());
177
178        let mut store = self.store.write().await;
179
180        if let Some(val) = store.get_mut(&key)
181            && val.value == expected
182        {
183            store.remove(&key);
184
185            Ok(true)
186        } else {
187            Ok(false)
188        }
189    }
190
191    async fn clear(&self) -> Result<(), Self::Err> {
192        let mut store = self.store.write().await;
193
194        store.clear();
195
196        Ok(())
197    }
198
199    async fn load_all_props(&self) -> Result<Vec<StoredProp>, Self::Err> {
200        let store = self.store.read().await;
201
202        let props = store.iter().filter_map(|(k, v)| v.as_prop(k)).collect();
203
204        Ok(props)
205    }
206
207    async fn server_props(&self) -> Result<Vec<StoredProp>, Self::Err> {
208        let store = self.store.read().await;
209
210        let props = store
211            .iter()
212            .filter_map(|(k, v)| match v.ownership {
213                Ownership::Device => None,
214                Ownership::Server => v.as_prop(k),
215            })
216            .collect();
217
218        Ok(props)
219    }
220
221    async fn device_props(&self) -> Result<Vec<StoredProp>, Self::Err> {
222        let store = self.store.read().await;
223
224        let props = store
225            .iter()
226            .filter_map(|(k, v)| match v.ownership {
227                Ownership::Device => v.as_prop(k),
228                Ownership::Server => None,
229            })
230            .collect();
231
232        Ok(props)
233    }
234
235    async fn interface_props(&self, interface: &Properties) -> Result<Vec<StoredProp>, Self::Err> {
236        Ok(self
237            .store
238            .read()
239            .await
240            .iter()
241            .filter_map(|(k, v)| {
242                if k.interface == interface.name() {
243                    v.as_prop(k)
244                } else {
245                    None
246                }
247            })
248            .collect())
249    }
250
251    async fn delete_interface(&self, interface: &Properties) -> Result<(), Self::Err> {
252        self.store
253            .write()
254            .await
255            .retain(|k, _v| k.interface != interface.name());
256
257        Ok(())
258    }
259
260    async fn device_props_with_unset(
261        &self,
262        state: PropertyState,
263        limit: usize,
264        offset: usize,
265    ) -> Result<Vec<OptStoredProp>, Self::Err> {
266        let store = self.store.read().await;
267
268        let props = store
269            .iter()
270            .filter_map(|(k, v)| {
271                if v.state != state {
272                    return None;
273                }
274
275                match v.ownership {
276                    Ownership::Device => Some(OptStoredProp::from((k, v))),
277                    Ownership::Server => None,
278                }
279            })
280            .skip(offset)
281            .take(limit)
282            .collect();
283
284        Ok(props)
285    }
286
287    async fn reset_state(&self, ownership: Ownership) -> Result<(), Self::Err> {
288        self.store
289            .write()
290            .await
291            .values_mut()
292            .filter(|v| v.ownership == ownership)
293            .for_each(|v| v.state = PropertyState::Changed);
294
295        Ok(())
296    }
297}
298
299/// Key for the in memory store, this let us customize the hash and equality, and use (&str, &str)
300/// to access the store.
301#[derive(Debug, Clone, Hash, PartialEq, Eq)]
302struct Key {
303    interface: String,
304    path: String,
305}
306
307impl Key {
308    /// Creates a new Key
309    fn new(interface: &str, path: &str) -> Self {
310        Key {
311            interface: interface.to_string(),
312            path: path.to_string(),
313        }
314    }
315}
316
317impl Display for Key {
318    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
319        write!(f, "{}{}", self.interface, self.path)
320    }
321}
322
323/// Value for the memory store
324#[derive(Debug, Clone)]
325struct Value {
326    value: Option<AstarteData>,
327    interface_major: i32,
328    ownership: Ownership,
329    state: PropertyState,
330}
331
332impl Value {
333    fn as_prop(&self, key: &Key) -> Option<StoredProp> {
334        self.value.as_ref().map(|value| StoredProp {
335            interface: key.interface.clone(),
336            path: key.path.clone(),
337            value: value.clone(),
338            interface_major: self.interface_major,
339            ownership: self.ownership,
340        })
341    }
342}
343
344impl From<(&Key, &Value)> for OptStoredProp {
345    fn from((key, value): (&Key, &Value)) -> Self {
346        Self {
347            interface: key.interface.clone(),
348            path: key.path.clone(),
349            value: value.value.clone(),
350            interface_major: value.interface_major,
351            ownership: value.ownership,
352        }
353    }
354}
355
356#[cfg(test)]
357mod tests {
358    use super::*;
359    use crate::store::tests::test_property_store;
360
361    #[tokio::test]
362    async fn test_memory_store() {
363        let db = MemoryStore::new();
364
365        test_property_store(db).await;
366    }
367}