switchgear_components/discovery/
memory.rs

1use crate::discovery::error::DiscoveryBackendStoreError;
2use async_trait::async_trait;
3use secp256k1::PublicKey;
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7use switchgear_service_api::discovery::{
8    DiscoveryBackend, DiscoveryBackendPatch, DiscoveryBackendStore, DiscoveryBackends,
9};
10use tokio::sync::Mutex;
11
12#[derive(Clone, Debug)]
13struct DiscoveryBackendTimestamped {
14    created: chrono::DateTime<chrono::Utc>,
15    backend: DiscoveryBackend,
16}
17
18#[derive(Clone, Debug)]
19pub struct MemoryDiscoveryBackendStore {
20    store: Arc<Mutex<HashMap<PublicKey, DiscoveryBackendTimestamped>>>,
21    etag: Arc<AtomicU64>,
22}
23
24impl Default for MemoryDiscoveryBackendStore {
25    fn default() -> Self {
26        Self::new()
27    }
28}
29
30impl MemoryDiscoveryBackendStore {
31    pub fn new() -> Self {
32        Self {
33            store: Arc::new(Mutex::new(HashMap::new())),
34            etag: Arc::new(Default::default()),
35        }
36    }
37}
38
39#[async_trait]
40impl DiscoveryBackendStore for MemoryDiscoveryBackendStore {
41    type Error = DiscoveryBackendStoreError;
42
43    async fn get(&self, public_key: &PublicKey) -> Result<Option<DiscoveryBackend>, Self::Error> {
44        let store = self.store.lock().await;
45        Ok(store.get(public_key).map(|b| b.backend.clone()))
46    }
47
48    async fn get_all(&self, request_etag: Option<u64>) -> Result<DiscoveryBackends, Self::Error> {
49        let store = self.store.lock().await;
50        let mut backends: Vec<DiscoveryBackendTimestamped> = store.values().cloned().collect();
51
52        backends.sort_by(|a, b| {
53            a.created
54                .cmp(&b.created)
55                .then_with(|| a.backend.public_key.cmp(&b.backend.public_key))
56        });
57
58        let response_etag = self.etag.load(Ordering::Relaxed);
59
60        if request_etag == Some(response_etag) {
61            Ok(DiscoveryBackends {
62                etag: response_etag,
63                backends: None,
64            })
65        } else {
66            let backends = backends.into_iter().map(|b| b.backend).collect();
67            Ok(DiscoveryBackends {
68                etag: response_etag,
69                backends: Some(backends),
70            })
71        }
72    }
73
74    async fn post(&self, backend: DiscoveryBackend) -> Result<Option<PublicKey>, Self::Error> {
75        let mut store = self.store.lock().await;
76        if store.contains_key(&backend.public_key) {
77            return Ok(None);
78        }
79        let key = backend.public_key;
80        store.insert(
81            backend.public_key,
82            DiscoveryBackendTimestamped {
83                created: chrono::Utc::now(),
84                backend,
85            },
86        );
87        self.etag.fetch_add(1, Ordering::Relaxed);
88        Ok(Some(key))
89    }
90
91    async fn put(&self, backend: DiscoveryBackend) -> Result<bool, Self::Error> {
92        let mut store = self.store.lock().await;
93        let key = backend.public_key;
94        let (created, was_new) = match store.get(&key) {
95            Some(existing) => (existing.created, false),
96            None => (chrono::Utc::now(), true),
97        };
98        store.insert(key, DiscoveryBackendTimestamped { created, backend });
99        self.etag.fetch_add(1, Ordering::Relaxed);
100        Ok(was_new)
101    }
102
103    async fn patch(&self, backend: DiscoveryBackendPatch) -> Result<bool, Self::Error> {
104        let mut store = self.store.lock().await;
105        let entry = match store.get_mut(&backend.public_key) {
106            None => return Ok(false),
107            Some(entry) => entry,
108        };
109        if let Some(weight) = backend.backend.weight {
110            entry.backend.backend.weight = weight;
111        }
112        if let Some(enabled) = backend.backend.enabled {
113            entry.backend.backend.enabled = enabled;
114        }
115        if let Some(partitions) = backend.backend.partitions {
116            entry.backend.backend.partitions = partitions;
117        }
118        if let Some(name) = backend.backend.name {
119            entry.backend.backend.name = name;
120        }
121        self.etag.fetch_add(1, Ordering::Relaxed);
122        Ok(true)
123    }
124
125    async fn delete(&self, public_key: &PublicKey) -> Result<bool, Self::Error> {
126        let mut store = self.store.lock().await;
127        let was_found = store.remove(public_key).is_some();
128        if was_found {
129            self.etag.fetch_add(1, Ordering::Relaxed);
130        }
131        Ok(was_found)
132    }
133}