1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
use std::{collections::HashMap, error::Error};

use log::{info, trace, warn};
use tokio::sync::mpsc;

use moonramp_core::{
    log, tokio, NetworkTunnelReceiver, NetworkTunnelSender, TunnelName, TunnelTopic,
};

struct RegistryEntry {
    service_tx: NetworkTunnelSender,
}

pub struct Registry {
    registry_rx: NetworkTunnelReceiver,
    registry: HashMap<TunnelName, RegistryEntry>,
}

impl Registry {
    pub fn new() -> (NetworkTunnelSender, Self) {
        let (registry_tx, registry_rx) = mpsc::channel(1024);
        (
            registry_tx,
            Registry {
                registry_rx,
                registry: HashMap::new(),
            },
        )
    }

    pub fn register(&mut self, service_name: TunnelName, service_tx: NetworkTunnelSender) {
        info!("Adding {} to registry", service_name);
        self.registry
            .insert(service_name, RegistryEntry { service_tx });
    }

    pub async fn run(&mut self) -> Result<(), Box<dyn Error>> {
        info!("Starting registry");
        loop {
            self.inner_run().await?;
        }
    }
    async fn inner_run(&mut self) -> Result<(), Box<dyn Error>> {
        if let Some((response_tx, msg)) = self.registry_rx.recv().await {
            trace!(
                "Registry received {} {} bytes",
                msg.topic,
                msg.tunnel_data.len()
            );
            match msg.topic {
                TunnelTopic::Public(name) => {
                    if let Some(backend) = self.registry.get(&name) {
                        backend.service_tx.send((response_tx, msg)).await?;
                    }
                }
                _ => warn!("Backend for topic {} not found", msg.topic),
            }
        }
        Ok(())
    }
}

#[tokio::test]
async fn test_registry() {
    use moonramp_core::{NetworkTunnel, NetworkTunnelChannel};

    let (w_tx, mut w_rx) = mpsc::channel(1);
    let (r_rx, mut registry) = Registry::new();
    registry.register(TunnelName::Wallet, w_tx);

    let (res_tx, _res_rx) = mpsc::channel(1);
    let msg = NetworkTunnel {
        topic: TunnelTopic::Public(TunnelName::Wallet),
        tunnel_data: vec![0x01, 0x03, 0x03, 0x07],
    };
    assert!(r_rx
        .send((NetworkTunnelChannel::Mpsc(res_tx), msg))
        .await
        .is_ok());
    assert!(registry.inner_run().await.is_ok());
    assert!(w_rx.recv().await.is_some());
}