tx5/
sig.rs

1use crate::*;
2
3enum MaybeReady {
4    Ready(DynBackEp),
5    Wait(Arc<tokio::sync::Semaphore>),
6}
7
8/// Manage a backend signal connection. This is actually represented
9/// by the tx5-connection "Hub" type.
10pub(crate) struct Sig {
11    pub(crate) listener: bool,
12    pub(crate) sig_url: SigUrl,
13    ready: Arc<Mutex<MaybeReady>>,
14    task: tokio::task::JoinHandle<()>,
15}
16
17impl Drop for Sig {
18    fn drop(&mut self) {
19        self.task.abort();
20    }
21}
22
23impl Sig {
24    pub fn new(
25        ep: Weak<Mutex<EpInner>>,
26        config: Arc<Config>,
27        sig_url: SigUrl,
28        listener: bool,
29        evt_send: tokio::sync::mpsc::Sender<EndpointEvent>,
30        resp_url: Option<tokio::sync::oneshot::Sender<PeerUrl>>,
31    ) -> Arc<Self> {
32        Arc::new_cyclic(|this| {
33            let wait = Arc::new(tokio::sync::Semaphore::new(0));
34            let ready = Arc::new(Mutex::new(MaybeReady::Wait(wait)));
35
36            // spawn the main event-loop for the signal connection
37            let task = tokio::task::spawn(task(
38                ep,
39                this.clone(),
40                config,
41                sig_url.clone(),
42                listener,
43                evt_send,
44                ready.clone(),
45                resp_url,
46            ));
47
48            Self {
49                listener,
50                sig_url,
51                ready,
52                task,
53            }
54        })
55    }
56
57    /// This future resolves when the signal connection is ready to be used.
58    pub async fn ready(&self) {
59        let w = match &*self.ready.lock().unwrap() {
60            MaybeReady::Ready(_) => return,
61            MaybeReady::Wait(w) => w.clone(),
62        };
63
64        let _ = w.acquire().await;
65    }
66
67    /// Get the address at which this peer will be reachable.
68    pub fn get_peer_url(&self) -> Option<PeerUrl> {
69        match &*self.ready.lock().unwrap() {
70            MaybeReady::Wait(_) => None,
71            MaybeReady::Ready(hub) => {
72                Some(self.sig_url.to_peer(hub.pub_key().clone()))
73            }
74        }
75    }
76
77    /// Attempt to establish a connection to a remote peer.
78    pub async fn connect(&self, pub_key: PubKey) -> Result<DynBackWaitCon> {
79        let ep = match &*self.ready.lock().unwrap() {
80            MaybeReady::Ready(h) => h.clone(),
81            _ => return Err(Error::other("not ready")),
82        };
83        ep.connect(pub_key).await
84    }
85}
86
87/// Loop attempting to establish a signal connection.
88async fn connect_loop(
89    config: Arc<Config>,
90    sig_url: SigUrl,
91    listener: bool,
92    mut resp_url: Option<tokio::sync::oneshot::Sender<PeerUrl>>,
93) -> (DynBackEp, DynBackEpRecvCon) {
94    tracing::debug!(
95        target: "NETAUDIT",
96        ?config,
97        ?sig_url,
98        ?listener,
99        m = "tx5",
100        t = "signal",
101        a = "try_connect",
102    );
103
104    let mut wait = config.backoff_start;
105
106    loop {
107        match tokio::time::timeout(
108            config.timeout,
109            config.backend_module.connect(&sig_url, listener, &config),
110        )
111        .await
112        .map_err(Error::other)
113        {
114            Ok(Ok(r)) => {
115                if let Some(resp_url) = resp_url.take() {
116                    let _ =
117                        resp_url.send(sig_url.to_peer(r.0.pub_key().clone()));
118                }
119                return r;
120            }
121            Err(err) | Ok(Err(err)) => {
122                // drop the response so we can proceed without a peer_url
123                let _ = resp_url.take();
124                tracing::debug!(
125                    target: "NETAUDIT",
126                    ?err,
127                    m = "tx5",
128                    t = "signal",
129                    a = "connect_error",
130                );
131            }
132        }
133
134        wait *= 2;
135        if wait > config.backoff_max {
136            wait = config.backoff_max;
137        }
138        tokio::time::sleep(wait).await;
139    }
140}
141
142/// Helper guard for performing cleanup if the signal event loop task closes.
143struct DropSig {
144    inner: Weak<Mutex<EpInner>>,
145    sig_url: SigUrl,
146    local_url: Option<PeerUrl>,
147    sig: Weak<Sig>,
148}
149
150impl Drop for DropSig {
151    fn drop(&mut self) {
152        tracing::debug!(
153            target: "NETAUDIT",
154            sig_url = ?self.sig_url,
155            local_url = ?self.local_url,
156            m = "tx5",
157            t = "signal",
158            a = "drop",
159        );
160
161        if let Some(inner) = self.inner.upgrade() {
162            if let Some(sig) = self.sig.upgrade() {
163                inner.lock().unwrap().drop_sig(sig);
164            }
165        }
166    }
167}
168
169/// This is the main event-loop task for the signal connection
170#[allow(clippy::too_many_arguments)]
171async fn task(
172    inner: Weak<Mutex<EpInner>>,
173    this: Weak<Sig>,
174    config: Arc<Config>,
175    sig_url: SigUrl,
176    listener: bool,
177    evt_send: tokio::sync::mpsc::Sender<EndpointEvent>,
178    ready: Arc<Mutex<MaybeReady>>,
179    resp_url: Option<tokio::sync::oneshot::Sender<PeerUrl>>,
180) {
181    // establish our drop-guard incase this task is aborted or exits
182    let mut drop_g = DropSig {
183        inner: inner.clone(),
184        sig_url: sig_url.clone(),
185        local_url: None,
186        sig: this,
187    };
188
189    // create a connection to the signal server (tx5-connection "Hub").
190    let (ep, mut ep_recv) =
191        connect_loop(config.clone(), sig_url.clone(), listener, resp_url).await;
192
193    // get our url
194    let local_url = sig_url.to_peer(ep.pub_key().clone());
195    drop_g.local_url = Some(local_url.clone());
196
197    // mark ourselves as ready and store a handle
198    // that will be used for establishing outgoing connections
199    {
200        let mut lock = ready.lock().unwrap();
201        if let MaybeReady::Wait(w) = &*lock {
202            w.close();
203        }
204        *lock = MaybeReady::Ready(ep);
205    }
206
207    // now notify that we are ready
208    drop(ready);
209
210    if listener {
211        // if we are a listener, emit the listening event
212        let _ = evt_send
213            .send(EndpointEvent::ListeningAddressOpen {
214                local_url: local_url.clone(),
215            })
216            .await;
217    }
218
219    tracing::debug!(
220        target: "NETAUDIT",
221        ?local_url,
222        m = "tx5",
223        t = "signal",
224        a = "connected",
225    );
226
227    // now, our only task is to listen for incoming connections
228    // and process them
229    while let Some(wc) = ep_recv.recv().await {
230        if let Some(inner) = inner.upgrade() {
231            let peer_url = sig_url.to_peer(wc.pub_key().clone());
232            inner.lock().unwrap().accept_peer(peer_url, wc);
233        }
234    }
235
236    tracing::debug!(
237        target: "NETAUDIT",
238        ?local_url,
239        m = "tx5",
240        t = "signal",
241        a = "closing",
242    );
243
244    // wait at the end to account for a delay before the next try
245    tokio::time::sleep(config.backoff_start).await;
246
247    if listener {
248        // we are closing. If we are a listener, notify the fact
249        let _ = evt_send
250            .send(EndpointEvent::ListeningAddressClosed {
251                local_url: local_url.clone(),
252            })
253            .await;
254    }
255
256    tracing::debug!(
257        target: "NETAUDIT",
258        ?local_url,
259        m = "tx5",
260        t = "signal",
261        a = "closed",
262    );
263
264    // all other cleanup is handled by the drop guard
265}