enigma_node_registry/
store.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use blake3::Hasher;
5use enigma_node_types::{NodeInfo, Presence, PublicIdentity, UserId};
6use tokio::sync::RwLock;
7
8use crate::error::{RegistryError, RegistryResult};
9
10#[cfg(feature = "persistence")]
11use persistent::PersistentStore;
12use volatile::VolatileStore;
13
14pub enum StoreBackend {
15    Volatile(VolatileStore),
16    #[cfg(feature = "persistence")]
17    Persistent(PersistentStore),
18}
19
20#[derive(Clone)]
21pub struct Store {
22    backend: Arc<StoreBackend>,
23    pepper: [u8; 32],
24}
25
26impl Store {
27    pub fn new_in_memory(pepper: [u8; 32], max_nodes: usize) -> Self {
28        Store {
29            backend: Arc::new(StoreBackend::Volatile(VolatileStore::new(
30                pepper, max_nodes,
31            ))),
32            pepper,
33        }
34    }
35
36    #[cfg(feature = "persistence")]
37    pub fn new_persistent(pepper: [u8; 32], path: &str, max_nodes: usize) -> RegistryResult<Self> {
38        Ok(Store {
39            backend: Arc::new(StoreBackend::Persistent(PersistentStore::new(
40                pepper, path, max_nodes,
41            )?)),
42            pepper,
43        })
44    }
45
46    pub fn pepper(&self) -> [u8; 32] {
47        self.pepper
48    }
49
50    pub async fn register(&self, identity: PublicIdentity) -> RegistryResult<()> {
51        match self.backend.as_ref() {
52            StoreBackend::Volatile(store) => store.register(identity).await,
53            #[cfg(feature = "persistence")]
54            StoreBackend::Persistent(store) => store.register(identity).await,
55        }
56    }
57
58    pub async fn resolve(&self, user_id: &UserId) -> RegistryResult<Option<PublicIdentity>> {
59        match self.backend.as_ref() {
60            StoreBackend::Volatile(store) => store.resolve(user_id).await,
61            #[cfg(feature = "persistence")]
62            StoreBackend::Persistent(store) => store.resolve(user_id).await,
63        }
64    }
65
66    pub async fn check_user(&self, user_id: &UserId) -> RegistryResult<bool> {
67        match self.backend.as_ref() {
68            StoreBackend::Volatile(store) => store.check_user(user_id).await,
69            #[cfg(feature = "persistence")]
70            StoreBackend::Persistent(store) => store.check_user(user_id).await,
71        }
72    }
73
74    pub async fn announce(&self, presence: Presence) -> RegistryResult<()> {
75        match self.backend.as_ref() {
76            StoreBackend::Volatile(store) => store.announce(presence).await,
77            #[cfg(feature = "persistence")]
78            StoreBackend::Persistent(store) => store.announce(presence).await,
79        }
80    }
81
82    pub async fn sync_identities(&self, identities: Vec<PublicIdentity>) -> RegistryResult<usize> {
83        match self.backend.as_ref() {
84            StoreBackend::Volatile(store) => store.sync_identities(identities).await,
85            #[cfg(feature = "persistence")]
86            StoreBackend::Persistent(store) => store.sync_identities(identities).await,
87        }
88    }
89
90    pub async fn list_nodes(&self) -> RegistryResult<Vec<NodeInfo>> {
91        match self.backend.as_ref() {
92            StoreBackend::Volatile(store) => store.list_nodes().await,
93            #[cfg(feature = "persistence")]
94            StoreBackend::Persistent(store) => store.list_nodes().await,
95        }
96    }
97
98    pub async fn add_nodes(&self, nodes: Vec<NodeInfo>) -> RegistryResult<usize> {
99        match self.backend.as_ref() {
100            StoreBackend::Volatile(store) => store.add_nodes(nodes).await,
101            #[cfg(feature = "persistence")]
102            StoreBackend::Persistent(store) => store.add_nodes(nodes).await,
103        }
104    }
105
106    pub async fn purge_presences(&self, now_ms: u64, ttl_secs: u64) -> RegistryResult<usize> {
107        match self.backend.as_ref() {
108            StoreBackend::Volatile(store) => store.purge_presences(now_ms, ttl_secs).await,
109            #[cfg(feature = "persistence")]
110            StoreBackend::Persistent(store) => store.purge_presences(now_ms, ttl_secs).await,
111        }
112    }
113}
114
115fn blind_index(pepper: [u8; 32], user_id: &UserId) -> [u8; 32] {
116    let mut hasher = Hasher::new();
117    hasher.update(b"enigma:registry:blind:v1");
118    hasher.update(&pepper);
119    hasher.update(user_id.as_bytes());
120    let digest = hasher.finalize();
121    let mut out = [0u8; 32];
122    out.copy_from_slice(digest.as_bytes());
123    out
124}
125
126#[derive(Clone)]
127struct StoredIdentity {
128    identity: PublicIdentity,
129    _blind: [u8; 32],
130}
131
132struct NodeState {
133    entries: Vec<NodeInfo>,
134    max: usize,
135}
136
137impl NodeState {
138    fn new(max: usize) -> Self {
139        NodeState {
140            entries: Vec::new(),
141            max: max.max(1),
142        }
143    }
144
145    fn list(&self) -> Vec<NodeInfo> {
146        self.entries.clone()
147    }
148
149    fn add(&mut self, nodes: Vec<NodeInfo>) -> RegistryResult<usize> {
150        let mut validated = Vec::new();
151        for node in nodes {
152            node.validate()
153                .map_err(|_| RegistryError::InvalidInput("node".to_string()))?;
154            validated.push(NodeInfo {
155                base_url: node.base_url.trim().to_string(),
156            });
157        }
158        let mut existing: HashSet<String> =
159            self.entries.iter().map(|n| n.base_url.clone()).collect();
160        let mut inserted = 0usize;
161        for node in validated {
162            if self.entries.len() >= self.max {
163                break;
164            }
165            if existing.contains(&node.base_url) {
166                continue;
167            }
168            existing.insert(node.base_url.clone());
169            self.entries.push(node);
170            inserted = inserted.saturating_add(1);
171            if self.entries.len() >= self.max {
172                break;
173            }
174        }
175        Ok(inserted)
176    }
177}
178
179fn validate_identity(identity: &PublicIdentity) -> RegistryResult<()> {
180    identity
181        .validate()
182        .map_err(|_| RegistryError::InvalidInput("identity".to_string()))
183}
184
185fn ensure_handle_matches(identity: &PublicIdentity) -> RegistryResult<()> {
186    let derived = identity.user_id;
187    let recomputed = UserId::from_hex(&identity.user_id.to_hex())
188        .map_err(|_| RegistryError::InvalidInput("user_id".to_string()))?;
189    if derived != recomputed {
190        return Err(RegistryError::InvalidInput(
191            "identity user_id mismatch".to_string(),
192        ));
193    }
194    Ok(())
195}
196
197mod volatile {
198    use super::*;
199
200    struct VolatileState {
201        identities: HashMap<UserId, StoredIdentity>,
202        blind: HashMap<[u8; 32], UserId>,
203        presences: HashMap<UserId, Presence>,
204        nodes: NodeState,
205    }
206
207    #[derive(Clone)]
208    pub struct VolatileStore {
209        state: Arc<RwLock<VolatileState>>,
210        pepper: [u8; 32],
211    }
212
213    impl VolatileStore {
214        pub fn new(pepper: [u8; 32], max_nodes: usize) -> Self {
215            VolatileStore {
216                state: Arc::new(RwLock::new(VolatileState {
217                    identities: HashMap::new(),
218                    blind: HashMap::new(),
219                    presences: HashMap::new(),
220                    nodes: NodeState::new(max_nodes),
221                })),
222                pepper,
223            }
224        }
225
226        pub async fn register(&self, identity: PublicIdentity) -> RegistryResult<()> {
227            validate_identity(&identity)?;
228            ensure_handle_matches(&identity)?;
229            let blind = blind_index(self.pepper, &identity.user_id);
230            let mut guard = self.state.write().await;
231            if guard.identities.contains_key(&identity.user_id) {
232                return Err(RegistryError::Conflict);
233            }
234            guard.blind.insert(blind, identity.user_id);
235            guard.identities.insert(
236                identity.user_id,
237                StoredIdentity {
238                    identity: identity.clone(),
239                    _blind: blind,
240                },
241            );
242            Ok(())
243        }
244
245        pub async fn resolve(&self, user_id: &UserId) -> RegistryResult<Option<PublicIdentity>> {
246            let guard = self.state.read().await;
247            Ok(guard.identities.get(user_id).map(|s| s.identity.clone()))
248        }
249
250        pub async fn check_user(&self, user_id: &UserId) -> RegistryResult<bool> {
251            let guard = self.state.read().await;
252            Ok(guard.identities.contains_key(user_id))
253        }
254
255        pub async fn announce(&self, presence: Presence) -> RegistryResult<()> {
256            presence
257                .validate()
258                .map_err(|_| RegistryError::InvalidInput("presence".to_string()))?;
259            let mut guard = self.state.write().await;
260            guard.presences.insert(presence.user_id, presence);
261            Ok(())
262        }
263
264        pub async fn sync_identities(
265            &self,
266            identities: Vec<PublicIdentity>,
267        ) -> RegistryResult<usize> {
268            let mut inserted = 0usize;
269            let mut guard = self.state.write().await;
270            for identity in identities {
271                if guard.identities.contains_key(&identity.user_id) {
272                    continue;
273                }
274                if validate_identity(&identity).is_err() {
275                    continue;
276                }
277                ensure_handle_matches(&identity)?;
278                let blind = blind_index(self.pepper, &identity.user_id);
279                guard.blind.insert(blind, identity.user_id);
280                guard.identities.insert(
281                    identity.user_id,
282                    StoredIdentity {
283                        identity: identity.clone(),
284                        _blind: blind,
285                    },
286                );
287                inserted = inserted.saturating_add(1);
288            }
289            Ok(inserted)
290        }
291
292        pub async fn list_nodes(&self) -> RegistryResult<Vec<NodeInfo>> {
293            let guard = self.state.read().await;
294            Ok(guard.nodes.list())
295        }
296
297        pub async fn add_nodes(&self, nodes: Vec<NodeInfo>) -> RegistryResult<usize> {
298            let mut guard = self.state.write().await;
299            guard.nodes.add(nodes)
300        }
301
302        pub async fn purge_presences(&self, now_ms: u64, ttl_secs: u64) -> RegistryResult<usize> {
303            let ttl_ms = ttl_secs.saturating_mul(1000);
304            let mut guard = self.state.write().await;
305            let before = guard.presences.len();
306            guard.presences.retain(|_, presence| {
307                let age = now_ms.saturating_sub(presence.ts_ms);
308                age < ttl_ms
309            });
310            Ok(before.saturating_sub(guard.presences.len()))
311        }
312    }
313}
314
315#[cfg(feature = "persistence")]
316mod persistent {
317    use super::*;
318    use tokio::task::spawn_blocking;
319
320    #[derive(Clone)]
321    pub struct PersistentStore {
322        db: sled::Db,
323        pepper: [u8; 32],
324        max_nodes: usize,
325    }
326
327    impl PersistentStore {
328        pub fn new(pepper: [u8; 32], path: &str, max_nodes: usize) -> RegistryResult<Self> {
329            let db = sled::open(path)
330                .map_err(|e| RegistryError::Config(format!("failed to open sled db: {}", e)))?;
331            Ok(PersistentStore {
332                db,
333                pepper,
334                max_nodes: max_nodes.max(1),
335            })
336        }
337
338        pub async fn register(&self, identity: PublicIdentity) -> RegistryResult<()> {
339            validate_identity(&identity)?;
340            ensure_handle_matches(&identity)?;
341            let handle = identity.user_id;
342            let blind = blind_index(self.pepper, &handle);
343            let db = self.db.clone();
344            let value = serde_json::to_vec(&identity)
345                .map_err(|_| RegistryError::InvalidInput("identity".to_string()))?;
346            spawn_blocking(move || {
347                let identities = db.open_tree("identities")?;
348                let blinds = db.open_tree("blind")?;
349                let key = handle.as_bytes();
350                match identities.compare_and_swap(key, None as Option<&[u8]>, Some(value)) {
351                    Ok(Ok(_)) => {
352                        blinds.insert(blind, key)?;
353                        Ok(())
354                    }
355                    Ok(Err(_)) => Err(RegistryError::Conflict),
356                    Err(e) => Err(RegistryError::Internal
357                        .with_details(serde_json::json!({ "error": e.to_string() }))),
358                }
359            })
360            .await
361            .map_err(|_| RegistryError::Internal)?
362        }
363
364        pub async fn resolve(&self, user_id: &UserId) -> RegistryResult<Option<PublicIdentity>> {
365            let db = self.db.clone();
366            let key = *user_id.as_bytes();
367            spawn_blocking(move || {
368                let identities = db.open_tree("identities")?;
369                Ok(match identities.get(key)? {
370                    Some(value) => Some(
371                        serde_json::from_slice::<PublicIdentity>(&value)
372                            .map_err(|_| RegistryError::Internal)?,
373                    ),
374                    None => None,
375                })
376            })
377            .await
378            .map_err(|_| RegistryError::Internal)?
379        }
380
381        pub async fn check_user(&self, user_id: &UserId) -> RegistryResult<bool> {
382            let db = self.db.clone();
383            let key = *user_id.as_bytes();
384            spawn_blocking(move || {
385                let identities = db.open_tree("identities")?;
386                Ok(identities.contains_key(key)?)
387            })
388            .await
389            .map_err(|_| RegistryError::Internal)?
390        }
391
392        pub async fn announce(&self, presence: Presence) -> RegistryResult<()> {
393            presence
394                .validate()
395                .map_err(|_| RegistryError::InvalidInput("presence".to_string()))?;
396            let db = self.db.clone();
397            let key = *presence.user_id.as_bytes();
398            let value = serde_json::to_vec(&presence)
399                .map_err(|_| RegistryError::InvalidInput("presence".to_string()))?;
400            spawn_blocking(move || {
401                let presences = db.open_tree("presences")?;
402                presences.insert(key, value)?;
403                Ok(())
404            })
405            .await
406            .map_err(|_| RegistryError::Internal)?
407        }
408
409        pub async fn sync_identities(
410            &self,
411            identities: Vec<PublicIdentity>,
412        ) -> RegistryResult<usize> {
413            let db = self.db.clone();
414            let pepper = self.pepper;
415            spawn_blocking(move || {
416                let identities_tree = db.open_tree("identities")?;
417                let blinds = db.open_tree("blind")?;
418                let mut inserted = 0usize;
419                for identity in identities {
420                    if identities_tree.contains_key(identity.user_id.as_bytes())? {
421                        continue;
422                    }
423                    if validate_identity(&identity).is_err() {
424                        continue;
425                    }
426                    ensure_handle_matches(&identity)?;
427                    let value = match serde_json::to_vec(&identity) {
428                        Ok(v) => v,
429                        Err(_) => continue,
430                    };
431                    identities_tree.insert(identity.user_id.as_bytes(), value)?;
432                    let blind = blind_index(pepper, &identity.user_id);
433                    blinds.insert(blind, identity.user_id.as_bytes())?;
434                    inserted = inserted.saturating_add(1);
435                }
436                Ok(inserted)
437            })
438            .await
439            .map_err(|_| RegistryError::Internal)?
440        }
441
442        pub async fn list_nodes(&self) -> RegistryResult<Vec<NodeInfo>> {
443            let db = self.db.clone();
444            spawn_blocking(move || {
445                let nodes = db.open_tree("nodes")?;
446                let mut out = Vec::new();
447                for item in nodes.iter() {
448                    let (_, value) = item?;
449                    if let Ok(node) = serde_json::from_slice::<NodeInfo>(&value) {
450                        out.push(node);
451                    }
452                }
453                Ok(out)
454            })
455            .await
456            .map_err(|_| RegistryError::Internal)?
457        }
458
459        pub async fn add_nodes(&self, nodes: Vec<NodeInfo>) -> RegistryResult<usize> {
460            let db = self.db.clone();
461            let max_nodes = self.max_nodes;
462            spawn_blocking(move || {
463                let tree = db.open_tree("nodes")?;
464                let mut current = tree.len();
465                let mut inserted = 0usize;
466                for node in nodes {
467                    node.validate()
468                        .map_err(|_| RegistryError::InvalidInput("node".to_string()))?;
469                    if current as usize >= max_nodes {
470                        break;
471                    }
472                    let key = node.base_url.as_bytes();
473                    if tree.contains_key(key)? {
474                        continue;
475                    }
476                    let value = serde_json::to_vec(&node).map_err(|_| RegistryError::Internal)?;
477                    tree.insert(key, value)?;
478                    current = current.saturating_add(1);
479                    inserted = inserted.saturating_add(1);
480                }
481                Ok(inserted)
482            })
483            .await
484            .map_err(|_| RegistryError::Internal)?
485        }
486
487        pub async fn purge_presences(&self, now_ms: u64, ttl_secs: u64) -> RegistryResult<usize> {
488            let db = self.db.clone();
489            spawn_blocking(move || {
490                let ttl_ms = ttl_secs.saturating_mul(1000);
491                let presences = db.open_tree("presences")?;
492                let mut removed = 0usize;
493                for item in presences.iter() {
494                    let (key, value) = item?;
495                    let presence: Presence = match serde_json::from_slice(&value) {
496                        Ok(p) => p,
497                        Err(_) => {
498                            presences.remove(key)?;
499                            removed = removed.saturating_add(1);
500                            continue;
501                        }
502                    };
503                    let age = now_ms.saturating_sub(presence.ts_ms);
504                    if age >= ttl_ms {
505                        presences.remove(key)?;
506                        removed = removed.saturating_add(1);
507                    }
508                }
509                Ok(removed)
510            })
511            .await
512            .map_err(|_| RegistryError::Internal)?
513        }
514    }
515}