1use thiserror::Error;
2use tokio::sync::mpsc;
3use tracing::warn;
4
5use crate::{config::NtsPeerConfig, keyexchange::key_exchange_client};
6
7use super::{BasicSpawner, PeerId, PeerRemovedEvent, SpawnAction, SpawnEvent, SpawnerId};
8
9pub struct NtsSpawner {
10 config: NtsPeerConfig,
11 network_wait_period: std::time::Duration,
12 id: SpawnerId,
13}
14
15#[derive(Error, Debug)]
16pub enum NtsSpawnError {
17 #[error("Channel send error: {0}")]
18 SendError(#[from] mpsc::error::SendError<SpawnEvent>),
19}
20
21impl NtsSpawner {
22 pub fn new(config: NtsPeerConfig, network_wait_period: std::time::Duration) -> NtsSpawner {
23 NtsSpawner {
24 config,
25 network_wait_period,
26 id: Default::default(),
27 }
28 }
29
30 async fn spawn(&mut self, action_tx: &mpsc::Sender<SpawnEvent>) -> Result<(), NtsSpawnError> {
31 let ke = loop {
32 match key_exchange_client(
33 self.config.ke_addr.server_name.clone(),
34 self.config.ke_addr.port,
35 &self.config.certificates,
36 )
37 .await
38 {
39 Ok(res) => break res,
40 Err(e) => {
41 warn!(error = ?e, "error while attempting key exchange");
42 tokio::time::sleep(self.network_wait_period).await;
43 }
44 };
45 };
46
47 let addr = loop {
48 let address = (ke.remote.as_str(), ke.port);
49 match tokio::net::lookup_host(address).await {
50 Ok(mut addresses) => match addresses.next() {
51 None => {
52 warn!("Could not resolve peer address, retrying");
53 tokio::time::sleep(self.network_wait_period).await
54 }
55 Some(first) => {
56 break first;
57 }
58 },
59 Err(e) => {
60 warn!(error = ?e, "error while resolving peer address, retrying");
61 tokio::time::sleep(self.network_wait_period).await
62 }
63 }
64 };
65
66 action_tx
67 .send(SpawnEvent::new(
68 self.id,
69 SpawnAction::create(
70 PeerId::new(),
71 addr,
72 self.config.ke_addr.clone(),
73 Some(ke.nts),
74 ),
75 ))
76 .await?;
77 Ok(())
78 }
79}
80
81#[async_trait::async_trait]
82impl BasicSpawner for NtsSpawner {
83 type Error = NtsSpawnError;
84
85 async fn handle_init(
86 &mut self,
87 action_tx: &mpsc::Sender<SpawnEvent>,
88 ) -> Result<(), NtsSpawnError> {
89 self.spawn(action_tx).await
90 }
91
92 async fn handle_peer_removed(
93 &mut self,
94 _removed_peer: PeerRemovedEvent,
95 action_tx: &mpsc::Sender<SpawnEvent>,
96 ) -> Result<(), NtsSpawnError> {
97 self.handle_init(action_tx).await
98 }
99
100 fn get_id(&self) -> SpawnerId {
101 self.id
102 }
103
104 fn get_addr_description(&self) -> String {
105 self.config.ke_addr.to_string()
106 }
107
108 fn get_description(&self) -> &str {
109 "nts"
110 }
111}