1use crate::*;
2
3enum MaybeReady {
4 Ready(DynBackEp),
5 Wait(Arc<tokio::sync::Semaphore>),
6}
7
8pub(crate) struct Sig {
11 pub(crate) listener: bool,
12 pub(crate) sig_url: SigUrl,
13 ready: Arc<Mutex<MaybeReady>>,
14 task: tokio::task::JoinHandle<()>,
15}
16
17impl Drop for Sig {
18 fn drop(&mut self) {
19 self.task.abort();
20 }
21}
22
23impl Sig {
24 pub fn new(
25 ep: Weak<Mutex<EpInner>>,
26 config: Arc<Config>,
27 sig_url: SigUrl,
28 listener: bool,
29 evt_send: tokio::sync::mpsc::Sender<EndpointEvent>,
30 resp_url: Option<tokio::sync::oneshot::Sender<PeerUrl>>,
31 ) -> Arc<Self> {
32 Arc::new_cyclic(|this| {
33 let wait = Arc::new(tokio::sync::Semaphore::new(0));
34 let ready = Arc::new(Mutex::new(MaybeReady::Wait(wait)));
35
36 let task = tokio::task::spawn(task(
38 ep,
39 this.clone(),
40 config,
41 sig_url.clone(),
42 listener,
43 evt_send,
44 ready.clone(),
45 resp_url,
46 ));
47
48 Self {
49 listener,
50 sig_url,
51 ready,
52 task,
53 }
54 })
55 }
56
57 pub async fn ready(&self) {
59 let w = match &*self.ready.lock().unwrap() {
60 MaybeReady::Ready(_) => return,
61 MaybeReady::Wait(w) => w.clone(),
62 };
63
64 let _ = w.acquire().await;
65 }
66
67 pub fn get_peer_url(&self) -> Option<PeerUrl> {
69 match &*self.ready.lock().unwrap() {
70 MaybeReady::Wait(_) => None,
71 MaybeReady::Ready(hub) => {
72 Some(self.sig_url.to_peer(hub.pub_key().clone()))
73 }
74 }
75 }
76
77 pub async fn connect(&self, pub_key: PubKey) -> Result<DynBackWaitCon> {
79 let ep = match &*self.ready.lock().unwrap() {
80 MaybeReady::Ready(h) => h.clone(),
81 _ => return Err(Error::other("not ready")),
82 };
83 ep.connect(pub_key).await
84 }
85}
86
87async fn connect_loop(
89 config: Arc<Config>,
90 sig_url: SigUrl,
91 listener: bool,
92 mut resp_url: Option<tokio::sync::oneshot::Sender<PeerUrl>>,
93) -> (DynBackEp, DynBackEpRecvCon) {
94 tracing::debug!(
95 target: "NETAUDIT",
96 ?config,
97 ?sig_url,
98 ?listener,
99 m = "tx5",
100 t = "signal",
101 a = "try_connect",
102 );
103
104 let mut wait = config.backoff_start;
105
106 loop {
107 match tokio::time::timeout(
108 config.timeout,
109 config.backend_module.connect(&sig_url, listener, &config),
110 )
111 .await
112 .map_err(Error::other)
113 {
114 Ok(Ok(r)) => {
115 if let Some(resp_url) = resp_url.take() {
116 let _ =
117 resp_url.send(sig_url.to_peer(r.0.pub_key().clone()));
118 }
119 return r;
120 }
121 Err(err) | Ok(Err(err)) => {
122 let _ = resp_url.take();
124 tracing::debug!(
125 target: "NETAUDIT",
126 ?err,
127 m = "tx5",
128 t = "signal",
129 a = "connect_error",
130 );
131 }
132 }
133
134 wait *= 2;
135 if wait > config.backoff_max {
136 wait = config.backoff_max;
137 }
138 tokio::time::sleep(wait).await;
139 }
140}
141
142struct DropSig {
144 inner: Weak<Mutex<EpInner>>,
145 sig_url: SigUrl,
146 local_url: Option<PeerUrl>,
147 sig: Weak<Sig>,
148}
149
150impl Drop for DropSig {
151 fn drop(&mut self) {
152 tracing::debug!(
153 target: "NETAUDIT",
154 sig_url = ?self.sig_url,
155 local_url = ?self.local_url,
156 m = "tx5",
157 t = "signal",
158 a = "drop",
159 );
160
161 if let Some(inner) = self.inner.upgrade() {
162 if let Some(sig) = self.sig.upgrade() {
163 inner.lock().unwrap().drop_sig(sig);
164 }
165 }
166 }
167}
168
169#[allow(clippy::too_many_arguments)]
171async fn task(
172 inner: Weak<Mutex<EpInner>>,
173 this: Weak<Sig>,
174 config: Arc<Config>,
175 sig_url: SigUrl,
176 listener: bool,
177 evt_send: tokio::sync::mpsc::Sender<EndpointEvent>,
178 ready: Arc<Mutex<MaybeReady>>,
179 resp_url: Option<tokio::sync::oneshot::Sender<PeerUrl>>,
180) {
181 let mut drop_g = DropSig {
183 inner: inner.clone(),
184 sig_url: sig_url.clone(),
185 local_url: None,
186 sig: this,
187 };
188
189 let (ep, mut ep_recv) =
191 connect_loop(config.clone(), sig_url.clone(), listener, resp_url).await;
192
193 let local_url = sig_url.to_peer(ep.pub_key().clone());
195 drop_g.local_url = Some(local_url.clone());
196
197 {
200 let mut lock = ready.lock().unwrap();
201 if let MaybeReady::Wait(w) = &*lock {
202 w.close();
203 }
204 *lock = MaybeReady::Ready(ep);
205 }
206
207 drop(ready);
209
210 if listener {
211 let _ = evt_send
213 .send(EndpointEvent::ListeningAddressOpen {
214 local_url: local_url.clone(),
215 })
216 .await;
217 }
218
219 tracing::debug!(
220 target: "NETAUDIT",
221 ?local_url,
222 m = "tx5",
223 t = "signal",
224 a = "connected",
225 );
226
227 while let Some(wc) = ep_recv.recv().await {
230 if let Some(inner) = inner.upgrade() {
231 let peer_url = sig_url.to_peer(wc.pub_key().clone());
232 inner.lock().unwrap().accept_peer(peer_url, wc);
233 }
234 }
235
236 tracing::debug!(
237 target: "NETAUDIT",
238 ?local_url,
239 m = "tx5",
240 t = "signal",
241 a = "closing",
242 );
243
244 tokio::time::sleep(config.backoff_start).await;
246
247 if listener {
248 let _ = evt_send
250 .send(EndpointEvent::ListeningAddressClosed {
251 local_url: local_url.clone(),
252 })
253 .await;
254 }
255
256 tracing::debug!(
257 target: "NETAUDIT",
258 ?local_url,
259 m = "tx5",
260 t = "signal",
261 a = "closed",
262 );
263
264 }