1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use blake3::Hasher;
5use enigma_node_types::{NodeInfo, Presence, PublicIdentity, UserId};
6use tokio::sync::RwLock;
7
8use crate::error::{RegistryError, RegistryResult};
9
10#[cfg(feature = "persistence")]
11use persistent::PersistentStore;
12use volatile::VolatileStore;
13
14pub enum StoreBackend {
15 Volatile(VolatileStore),
16 #[cfg(feature = "persistence")]
17 Persistent(PersistentStore),
18}
19
20#[derive(Clone)]
21pub struct Store {
22 backend: Arc<StoreBackend>,
23 pepper: [u8; 32],
24}
25
26impl Store {
27 pub fn new_in_memory(pepper: [u8; 32], max_nodes: usize) -> Self {
28 Store {
29 backend: Arc::new(StoreBackend::Volatile(VolatileStore::new(
30 pepper, max_nodes,
31 ))),
32 pepper,
33 }
34 }
35
36 #[cfg(feature = "persistence")]
37 pub fn new_persistent(pepper: [u8; 32], path: &str, max_nodes: usize) -> RegistryResult<Self> {
38 Ok(Store {
39 backend: Arc::new(StoreBackend::Persistent(PersistentStore::new(
40 pepper, path, max_nodes,
41 )?)),
42 pepper,
43 })
44 }
45
46 pub fn pepper(&self) -> [u8; 32] {
47 self.pepper
48 }
49
50 pub async fn register(&self, identity: PublicIdentity) -> RegistryResult<()> {
51 match self.backend.as_ref() {
52 StoreBackend::Volatile(store) => store.register(identity).await,
53 #[cfg(feature = "persistence")]
54 StoreBackend::Persistent(store) => store.register(identity).await,
55 }
56 }
57
58 pub async fn resolve(&self, user_id: &UserId) -> RegistryResult<Option<PublicIdentity>> {
59 match self.backend.as_ref() {
60 StoreBackend::Volatile(store) => store.resolve(user_id).await,
61 #[cfg(feature = "persistence")]
62 StoreBackend::Persistent(store) => store.resolve(user_id).await,
63 }
64 }
65
66 pub async fn check_user(&self, user_id: &UserId) -> RegistryResult<bool> {
67 match self.backend.as_ref() {
68 StoreBackend::Volatile(store) => store.check_user(user_id).await,
69 #[cfg(feature = "persistence")]
70 StoreBackend::Persistent(store) => store.check_user(user_id).await,
71 }
72 }
73
74 pub async fn announce(&self, presence: Presence) -> RegistryResult<()> {
75 match self.backend.as_ref() {
76 StoreBackend::Volatile(store) => store.announce(presence).await,
77 #[cfg(feature = "persistence")]
78 StoreBackend::Persistent(store) => store.announce(presence).await,
79 }
80 }
81
82 pub async fn sync_identities(&self, identities: Vec<PublicIdentity>) -> RegistryResult<usize> {
83 match self.backend.as_ref() {
84 StoreBackend::Volatile(store) => store.sync_identities(identities).await,
85 #[cfg(feature = "persistence")]
86 StoreBackend::Persistent(store) => store.sync_identities(identities).await,
87 }
88 }
89
90 pub async fn list_nodes(&self) -> RegistryResult<Vec<NodeInfo>> {
91 match self.backend.as_ref() {
92 StoreBackend::Volatile(store) => store.list_nodes().await,
93 #[cfg(feature = "persistence")]
94 StoreBackend::Persistent(store) => store.list_nodes().await,
95 }
96 }
97
98 pub async fn add_nodes(&self, nodes: Vec<NodeInfo>) -> RegistryResult<usize> {
99 match self.backend.as_ref() {
100 StoreBackend::Volatile(store) => store.add_nodes(nodes).await,
101 #[cfg(feature = "persistence")]
102 StoreBackend::Persistent(store) => store.add_nodes(nodes).await,
103 }
104 }
105
106 pub async fn purge_presences(&self, now_ms: u64, ttl_secs: u64) -> RegistryResult<usize> {
107 match self.backend.as_ref() {
108 StoreBackend::Volatile(store) => store.purge_presences(now_ms, ttl_secs).await,
109 #[cfg(feature = "persistence")]
110 StoreBackend::Persistent(store) => store.purge_presences(now_ms, ttl_secs).await,
111 }
112 }
113}
114
115fn blind_index(pepper: [u8; 32], user_id: &UserId) -> [u8; 32] {
116 let mut hasher = Hasher::new();
117 hasher.update(b"enigma:registry:blind:v1");
118 hasher.update(&pepper);
119 hasher.update(user_id.as_bytes());
120 let digest = hasher.finalize();
121 let mut out = [0u8; 32];
122 out.copy_from_slice(digest.as_bytes());
123 out
124}
125
126#[derive(Clone)]
127struct StoredIdentity {
128 identity: PublicIdentity,
129 _blind: [u8; 32],
130}
131
132struct NodeState {
133 entries: Vec<NodeInfo>,
134 max: usize,
135}
136
137impl NodeState {
138 fn new(max: usize) -> Self {
139 NodeState {
140 entries: Vec::new(),
141 max: max.max(1),
142 }
143 }
144
145 fn list(&self) -> Vec<NodeInfo> {
146 self.entries.clone()
147 }
148
149 fn add(&mut self, nodes: Vec<NodeInfo>) -> RegistryResult<usize> {
150 let mut validated = Vec::new();
151 for node in nodes {
152 node.validate()
153 .map_err(|_| RegistryError::InvalidInput("node".to_string()))?;
154 validated.push(NodeInfo {
155 base_url: node.base_url.trim().to_string(),
156 });
157 }
158 let mut existing: HashSet<String> =
159 self.entries.iter().map(|n| n.base_url.clone()).collect();
160 let mut inserted = 0usize;
161 for node in validated {
162 if self.entries.len() >= self.max {
163 break;
164 }
165 if existing.contains(&node.base_url) {
166 continue;
167 }
168 existing.insert(node.base_url.clone());
169 self.entries.push(node);
170 inserted = inserted.saturating_add(1);
171 if self.entries.len() >= self.max {
172 break;
173 }
174 }
175 Ok(inserted)
176 }
177}
178
179fn validate_identity(identity: &PublicIdentity) -> RegistryResult<()> {
180 identity
181 .validate()
182 .map_err(|_| RegistryError::InvalidInput("identity".to_string()))
183}
184
185fn ensure_handle_matches(identity: &PublicIdentity) -> RegistryResult<()> {
186 let derived = identity.user_id;
187 let recomputed = UserId::from_hex(&identity.user_id.to_hex())
188 .map_err(|_| RegistryError::InvalidInput("user_id".to_string()))?;
189 if derived != recomputed {
190 return Err(RegistryError::InvalidInput(
191 "identity user_id mismatch".to_string(),
192 ));
193 }
194 Ok(())
195}
196
197mod volatile {
198 use super::*;
199
200 struct VolatileState {
201 identities: HashMap<UserId, StoredIdentity>,
202 blind: HashMap<[u8; 32], UserId>,
203 presences: HashMap<UserId, Presence>,
204 nodes: NodeState,
205 }
206
207 #[derive(Clone)]
208 pub struct VolatileStore {
209 state: Arc<RwLock<VolatileState>>,
210 pepper: [u8; 32],
211 }
212
213 impl VolatileStore {
214 pub fn new(pepper: [u8; 32], max_nodes: usize) -> Self {
215 VolatileStore {
216 state: Arc::new(RwLock::new(VolatileState {
217 identities: HashMap::new(),
218 blind: HashMap::new(),
219 presences: HashMap::new(),
220 nodes: NodeState::new(max_nodes),
221 })),
222 pepper,
223 }
224 }
225
226 pub async fn register(&self, identity: PublicIdentity) -> RegistryResult<()> {
227 validate_identity(&identity)?;
228 ensure_handle_matches(&identity)?;
229 let blind = blind_index(self.pepper, &identity.user_id);
230 let mut guard = self.state.write().await;
231 if guard.identities.contains_key(&identity.user_id) {
232 return Err(RegistryError::Conflict);
233 }
234 guard.blind.insert(blind, identity.user_id);
235 guard.identities.insert(
236 identity.user_id,
237 StoredIdentity {
238 identity: identity.clone(),
239 _blind: blind,
240 },
241 );
242 Ok(())
243 }
244
245 pub async fn resolve(&self, user_id: &UserId) -> RegistryResult<Option<PublicIdentity>> {
246 let guard = self.state.read().await;
247 Ok(guard.identities.get(user_id).map(|s| s.identity.clone()))
248 }
249
250 pub async fn check_user(&self, user_id: &UserId) -> RegistryResult<bool> {
251 let guard = self.state.read().await;
252 Ok(guard.identities.contains_key(user_id))
253 }
254
255 pub async fn announce(&self, presence: Presence) -> RegistryResult<()> {
256 presence
257 .validate()
258 .map_err(|_| RegistryError::InvalidInput("presence".to_string()))?;
259 let mut guard = self.state.write().await;
260 guard.presences.insert(presence.user_id, presence);
261 Ok(())
262 }
263
264 pub async fn sync_identities(
265 &self,
266 identities: Vec<PublicIdentity>,
267 ) -> RegistryResult<usize> {
268 let mut inserted = 0usize;
269 let mut guard = self.state.write().await;
270 for identity in identities {
271 if guard.identities.contains_key(&identity.user_id) {
272 continue;
273 }
274 if validate_identity(&identity).is_err() {
275 continue;
276 }
277 ensure_handle_matches(&identity)?;
278 let blind = blind_index(self.pepper, &identity.user_id);
279 guard.blind.insert(blind, identity.user_id);
280 guard.identities.insert(
281 identity.user_id,
282 StoredIdentity {
283 identity: identity.clone(),
284 _blind: blind,
285 },
286 );
287 inserted = inserted.saturating_add(1);
288 }
289 Ok(inserted)
290 }
291
292 pub async fn list_nodes(&self) -> RegistryResult<Vec<NodeInfo>> {
293 let guard = self.state.read().await;
294 Ok(guard.nodes.list())
295 }
296
297 pub async fn add_nodes(&self, nodes: Vec<NodeInfo>) -> RegistryResult<usize> {
298 let mut guard = self.state.write().await;
299 guard.nodes.add(nodes)
300 }
301
302 pub async fn purge_presences(&self, now_ms: u64, ttl_secs: u64) -> RegistryResult<usize> {
303 let ttl_ms = ttl_secs.saturating_mul(1000);
304 let mut guard = self.state.write().await;
305 let before = guard.presences.len();
306 guard.presences.retain(|_, presence| {
307 let age = now_ms.saturating_sub(presence.ts_ms);
308 age < ttl_ms
309 });
310 Ok(before.saturating_sub(guard.presences.len()))
311 }
312 }
313}
314
315#[cfg(feature = "persistence")]
316mod persistent {
317 use super::*;
318 use tokio::task::spawn_blocking;
319
320 #[derive(Clone)]
321 pub struct PersistentStore {
322 db: sled::Db,
323 pepper: [u8; 32],
324 max_nodes: usize,
325 }
326
327 impl PersistentStore {
328 pub fn new(pepper: [u8; 32], path: &str, max_nodes: usize) -> RegistryResult<Self> {
329 let db = sled::open(path)
330 .map_err(|e| RegistryError::Config(format!("failed to open sled db: {}", e)))?;
331 Ok(PersistentStore {
332 db,
333 pepper,
334 max_nodes: max_nodes.max(1),
335 })
336 }
337
338 pub async fn register(&self, identity: PublicIdentity) -> RegistryResult<()> {
339 validate_identity(&identity)?;
340 ensure_handle_matches(&identity)?;
341 let handle = identity.user_id;
342 let blind = blind_index(self.pepper, &handle);
343 let db = self.db.clone();
344 let value = serde_json::to_vec(&identity)
345 .map_err(|_| RegistryError::InvalidInput("identity".to_string()))?;
346 spawn_blocking(move || {
347 let identities = db.open_tree("identities")?;
348 let blinds = db.open_tree("blind")?;
349 let key = handle.as_bytes();
350 match identities.compare_and_swap(key, None as Option<&[u8]>, Some(value)) {
351 Ok(Ok(_)) => {
352 blinds.insert(blind, key)?;
353 Ok(())
354 }
355 Ok(Err(_)) => Err(RegistryError::Conflict),
356 Err(e) => Err(RegistryError::Internal
357 .with_details(serde_json::json!({ "error": e.to_string() }))),
358 }
359 })
360 .await
361 .map_err(|_| RegistryError::Internal)?
362 }
363
364 pub async fn resolve(&self, user_id: &UserId) -> RegistryResult<Option<PublicIdentity>> {
365 let db = self.db.clone();
366 let key = *user_id.as_bytes();
367 spawn_blocking(move || {
368 let identities = db.open_tree("identities")?;
369 Ok(match identities.get(key)? {
370 Some(value) => Some(
371 serde_json::from_slice::<PublicIdentity>(&value)
372 .map_err(|_| RegistryError::Internal)?,
373 ),
374 None => None,
375 })
376 })
377 .await
378 .map_err(|_| RegistryError::Internal)?
379 }
380
381 pub async fn check_user(&self, user_id: &UserId) -> RegistryResult<bool> {
382 let db = self.db.clone();
383 let key = *user_id.as_bytes();
384 spawn_blocking(move || {
385 let identities = db.open_tree("identities")?;
386 Ok(identities.contains_key(key)?)
387 })
388 .await
389 .map_err(|_| RegistryError::Internal)?
390 }
391
392 pub async fn announce(&self, presence: Presence) -> RegistryResult<()> {
393 presence
394 .validate()
395 .map_err(|_| RegistryError::InvalidInput("presence".to_string()))?;
396 let db = self.db.clone();
397 let key = *presence.user_id.as_bytes();
398 let value = serde_json::to_vec(&presence)
399 .map_err(|_| RegistryError::InvalidInput("presence".to_string()))?;
400 spawn_blocking(move || {
401 let presences = db.open_tree("presences")?;
402 presences.insert(key, value)?;
403 Ok(())
404 })
405 .await
406 .map_err(|_| RegistryError::Internal)?
407 }
408
409 pub async fn sync_identities(
410 &self,
411 identities: Vec<PublicIdentity>,
412 ) -> RegistryResult<usize> {
413 let db = self.db.clone();
414 let pepper = self.pepper;
415 spawn_blocking(move || {
416 let identities_tree = db.open_tree("identities")?;
417 let blinds = db.open_tree("blind")?;
418 let mut inserted = 0usize;
419 for identity in identities {
420 if identities_tree.contains_key(identity.user_id.as_bytes())? {
421 continue;
422 }
423 if validate_identity(&identity).is_err() {
424 continue;
425 }
426 ensure_handle_matches(&identity)?;
427 let value = match serde_json::to_vec(&identity) {
428 Ok(v) => v,
429 Err(_) => continue,
430 };
431 identities_tree.insert(identity.user_id.as_bytes(), value)?;
432 let blind = blind_index(pepper, &identity.user_id);
433 blinds.insert(blind, identity.user_id.as_bytes())?;
434 inserted = inserted.saturating_add(1);
435 }
436 Ok(inserted)
437 })
438 .await
439 .map_err(|_| RegistryError::Internal)?
440 }
441
442 pub async fn list_nodes(&self) -> RegistryResult<Vec<NodeInfo>> {
443 let db = self.db.clone();
444 spawn_blocking(move || {
445 let nodes = db.open_tree("nodes")?;
446 let mut out = Vec::new();
447 for item in nodes.iter() {
448 let (_, value) = item?;
449 if let Ok(node) = serde_json::from_slice::<NodeInfo>(&value) {
450 out.push(node);
451 }
452 }
453 Ok(out)
454 })
455 .await
456 .map_err(|_| RegistryError::Internal)?
457 }
458
459 pub async fn add_nodes(&self, nodes: Vec<NodeInfo>) -> RegistryResult<usize> {
460 let db = self.db.clone();
461 let max_nodes = self.max_nodes;
462 spawn_blocking(move || {
463 let tree = db.open_tree("nodes")?;
464 let mut current = tree.len();
465 let mut inserted = 0usize;
466 for node in nodes {
467 node.validate()
468 .map_err(|_| RegistryError::InvalidInput("node".to_string()))?;
469 if current as usize >= max_nodes {
470 break;
471 }
472 let key = node.base_url.as_bytes();
473 if tree.contains_key(key)? {
474 continue;
475 }
476 let value = serde_json::to_vec(&node).map_err(|_| RegistryError::Internal)?;
477 tree.insert(key, value)?;
478 current = current.saturating_add(1);
479 inserted = inserted.saturating_add(1);
480 }
481 Ok(inserted)
482 })
483 .await
484 .map_err(|_| RegistryError::Internal)?
485 }
486
487 pub async fn purge_presences(&self, now_ms: u64, ttl_secs: u64) -> RegistryResult<usize> {
488 let db = self.db.clone();
489 spawn_blocking(move || {
490 let ttl_ms = ttl_secs.saturating_mul(1000);
491 let presences = db.open_tree("presences")?;
492 let mut removed = 0usize;
493 for item in presences.iter() {
494 let (key, value) = item?;
495 let presence: Presence = match serde_json::from_slice(&value) {
496 Ok(p) => p,
497 Err(_) => {
498 presences.remove(key)?;
499 removed = removed.saturating_add(1);
500 continue;
501 }
502 };
503 let age = now_ms.saturating_sub(presence.ts_ms);
504 if age >= ttl_ms {
505 presences.remove(key)?;
506 removed = removed.saturating_add(1);
507 }
508 }
509 Ok(removed)
510 })
511 .await
512 .map_err(|_| RegistryError::Internal)?
513 }
514 }
515}