ntp_daemon/spawn/
nts.rs

1use thiserror::Error;
2use tokio::sync::mpsc;
3use tracing::warn;
4
5use crate::{config::NtsPeerConfig, keyexchange::key_exchange_client};
6
7use super::{BasicSpawner, PeerId, PeerRemovedEvent, SpawnAction, SpawnEvent, SpawnerId};
8
9pub struct NtsSpawner {
10    config: NtsPeerConfig,
11    network_wait_period: std::time::Duration,
12    id: SpawnerId,
13}
14
15#[derive(Error, Debug)]
16pub enum NtsSpawnError {
17    #[error("Channel send error: {0}")]
18    SendError(#[from] mpsc::error::SendError<SpawnEvent>),
19}
20
21impl NtsSpawner {
22    pub fn new(config: NtsPeerConfig, network_wait_period: std::time::Duration) -> NtsSpawner {
23        NtsSpawner {
24            config,
25            network_wait_period,
26            id: Default::default(),
27        }
28    }
29
30    async fn spawn(&mut self, action_tx: &mpsc::Sender<SpawnEvent>) -> Result<(), NtsSpawnError> {
31        let ke = loop {
32            match key_exchange_client(
33                self.config.ke_addr.server_name.clone(),
34                self.config.ke_addr.port,
35                &self.config.certificates,
36            )
37            .await
38            {
39                Ok(res) => break res,
40                Err(e) => {
41                    warn!(error = ?e, "error while attempting key exchange");
42                    tokio::time::sleep(self.network_wait_period).await;
43                }
44            };
45        };
46
47        let addr = loop {
48            let address = (ke.remote.as_str(), ke.port);
49            match tokio::net::lookup_host(address).await {
50                Ok(mut addresses) => match addresses.next() {
51                    None => {
52                        warn!("Could not resolve peer address, retrying");
53                        tokio::time::sleep(self.network_wait_period).await
54                    }
55                    Some(first) => {
56                        break first;
57                    }
58                },
59                Err(e) => {
60                    warn!(error = ?e, "error while resolving peer address, retrying");
61                    tokio::time::sleep(self.network_wait_period).await
62                }
63            }
64        };
65
66        action_tx
67            .send(SpawnEvent::new(
68                self.id,
69                SpawnAction::create(
70                    PeerId::new(),
71                    addr,
72                    self.config.ke_addr.clone(),
73                    Some(ke.nts),
74                ),
75            ))
76            .await?;
77        Ok(())
78    }
79}
80
81#[async_trait::async_trait]
82impl BasicSpawner for NtsSpawner {
83    type Error = NtsSpawnError;
84
85    async fn handle_init(
86        &mut self,
87        action_tx: &mpsc::Sender<SpawnEvent>,
88    ) -> Result<(), NtsSpawnError> {
89        self.spawn(action_tx).await
90    }
91
92    async fn handle_peer_removed(
93        &mut self,
94        _removed_peer: PeerRemovedEvent,
95        action_tx: &mpsc::Sender<SpawnEvent>,
96    ) -> Result<(), NtsSpawnError> {
97        self.handle_init(action_tx).await
98    }
99
100    fn get_id(&self) -> SpawnerId {
101        self.id
102    }
103
104    fn get_addr_description(&self) -> String {
105        self.config.ke_addr.to_string()
106    }
107
108    fn get_description(&self) -> &str {
109        "nts"
110    }
111}