1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use thiserror::Error;
use tokio::sync::mpsc;
use tracing::warn;

use crate::{config::NtsPeerConfig, keyexchange::key_exchange_client};

use super::{BasicSpawner, PeerId, PeerRemovedEvent, SpawnAction, SpawnEvent, SpawnerId};

pub struct NtsSpawner {
    config: NtsPeerConfig,
    network_wait_period: std::time::Duration,
    id: SpawnerId,
}

#[derive(Error, Debug)]
pub enum NtsSpawnError {
    #[error("Channel send error: {0}")]
    SendError(#[from] mpsc::error::SendError<SpawnEvent>),
}

impl NtsSpawner {
    pub fn new(config: NtsPeerConfig, network_wait_period: std::time::Duration) -> NtsSpawner {
        NtsSpawner {
            config,
            network_wait_period,
            id: Default::default(),
        }
    }

    async fn spawn(&mut self, action_tx: &mpsc::Sender<SpawnEvent>) -> Result<(), NtsSpawnError> {
        let ke = loop {
            match key_exchange_client(
                self.config.ke_addr.server_name.clone(),
                self.config.ke_addr.port,
                &self.config.certificates,
            )
            .await
            {
                Ok(res) => break res,
                Err(e) => {
                    warn!(error = ?e, "error while attempting key exchange");
                    tokio::time::sleep(self.network_wait_period).await;
                }
            };
        };

        let addr = loop {
            let address = (ke.remote.as_str(), ke.port);
            match tokio::net::lookup_host(address).await {
                Ok(mut addresses) => match addresses.next() {
                    None => {
                        warn!("Could not resolve peer address, retrying");
                        tokio::time::sleep(self.network_wait_period).await
                    }
                    Some(first) => {
                        break first;
                    }
                },
                Err(e) => {
                    warn!(error = ?e, "error while resolving peer address, retrying");
                    tokio::time::sleep(self.network_wait_period).await
                }
            }
        };

        action_tx
            .send(SpawnEvent::new(
                self.id,
                SpawnAction::create(
                    PeerId::new(),
                    addr,
                    self.config.ke_addr.clone(),
                    Some(ke.nts),
                ),
            ))
            .await?;
        Ok(())
    }
}

#[async_trait::async_trait]
impl BasicSpawner for NtsSpawner {
    type Error = NtsSpawnError;

    async fn handle_init(
        &mut self,
        action_tx: &mpsc::Sender<SpawnEvent>,
    ) -> Result<(), NtsSpawnError> {
        self.spawn(action_tx).await
    }

    async fn handle_peer_removed(
        &mut self,
        _removed_peer: PeerRemovedEvent,
        action_tx: &mpsc::Sender<SpawnEvent>,
    ) -> Result<(), NtsSpawnError> {
        self.handle_init(action_tx).await
    }

    fn get_id(&self) -> SpawnerId {
        self.id
    }

    fn get_addr_description(&self) -> String {
        self.config.ke_addr.to_string()
    }

    fn get_description(&self) -> &str {
        "nts"
    }
}