1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
use std::{collections::HashMap, error::Error};
use log::{info, trace, warn};
use tokio::sync::mpsc;
use moonramp_core::{
log, tokio, NetworkTunnelReceiver, NetworkTunnelSender, TunnelName, TunnelTopic,
};
struct RegistryEntry {
service_tx: NetworkTunnelSender,
}
pub struct Registry {
registry_rx: NetworkTunnelReceiver,
registry: HashMap<TunnelName, RegistryEntry>,
}
impl Registry {
pub fn new() -> (NetworkTunnelSender, Self) {
let (registry_tx, registry_rx) = mpsc::channel(1024);
(
registry_tx,
Registry {
registry_rx,
registry: HashMap::new(),
},
)
}
pub fn register(&mut self, service_name: TunnelName, service_tx: NetworkTunnelSender) {
info!("Adding {} to registry", service_name);
self.registry
.insert(service_name, RegistryEntry { service_tx });
}
pub async fn run(&mut self) -> Result<(), Box<dyn Error>> {
info!("Starting registry");
loop {
self.inner_run().await?;
}
}
async fn inner_run(&mut self) -> Result<(), Box<dyn Error>> {
if let Some((response_tx, msg)) = self.registry_rx.recv().await {
trace!(
"Registry received {} {} bytes",
msg.topic,
msg.tunnel_data.len()
);
match msg.topic {
TunnelTopic::Public(name) => {
if let Some(backend) = self.registry.get(&name) {
backend.service_tx.send((response_tx, msg)).await?;
}
}
_ => warn!("Backend for topic {} not found", msg.topic),
}
}
Ok(())
}
}
#[tokio::test]
async fn test_registry() {
use moonramp_core::{NetworkTunnel, NetworkTunnelChannel};
let (w_tx, mut w_rx) = mpsc::channel(1);
let (r_rx, mut registry) = Registry::new();
registry.register(TunnelName::Wallet, w_tx);
let (res_tx, _res_rx) = mpsc::channel(1);
let msg = NetworkTunnel {
topic: TunnelTopic::Public(TunnelName::Wallet),
tunnel_data: vec![0x01, 0x03, 0x03, 0x07],
};
assert!(r_rx
.send((NetworkTunnelChannel::Mpsc(res_tx), msg))
.await
.is_ok());
assert!(registry.inner_run().await.is_ok());
assert!(w_rx.recv().await.is_some());
}