tx5_connection/
hub.rs

1pub use super::*;
2
3type HubMapT = HashMap<PubKey, (Weak<Conn>, CloseSend<ConnCmd>)>;
4struct HubMap(HubMapT);
5
6impl std::ops::Deref for HubMap {
7    type Target = HubMapT;
8
9    fn deref(&self) -> &Self::Target {
10        &self.0
11    }
12}
13
14impl std::ops::DerefMut for HubMap {
15    fn deref_mut(&mut self) -> &mut Self::Target {
16        &mut self.0
17    }
18}
19
20impl HubMap {
21    pub fn new() -> Self {
22        Self(HashMap::new())
23    }
24}
25
26/// Insert or update a peer connection in the hub.
27async fn hub_map_assert(
28    webrtc_config: &Arc<Mutex<WebRtcConfig>>,
29    is_polite: bool,
30    pub_key: PubKey,
31    map: &mut HubMap,
32    client: &Arc<tx5_signal::SignalConnection>,
33    config: &Arc<HubConfig>,
34    hub_cmd_send: &tokio::sync::mpsc::Sender<HubCmd>,
35) -> Result<(Option<ConnRecv>, Arc<Conn>, CloseSend<ConnCmd>)> {
36    let mut found_during_prune = None;
37
38    // remove anything that has been dropped
39    map.retain(|_, c| {
40        if let Some(conn) = c.0.upgrade() {
41            let cmd_send = c.1.clone();
42            if conn.pub_key() == &pub_key {
43                found_during_prune = Some((conn, cmd_send));
44            }
45            true
46        } else {
47            false
48        }
49    });
50
51    // if we happened to find what we were looking for
52    // (that is still valid) in the prune loop above, short circuit here.
53    if let Some((conn, cmd_send)) = found_during_prune {
54        return Ok((None, conn, cmd_send));
55    }
56
57    client.assert(&pub_key).await?;
58
59    // we're not connected to the peer, create a connection
60
61    let (conn, recv, cmd_send) = Conn::priv_new(
62        webrtc_config.lock().unwrap().clone(),
63        is_polite,
64        pub_key.clone(),
65        Arc::downgrade(client),
66        config.clone(),
67        hub_cmd_send.clone(),
68    );
69
70    let weak_conn = Arc::downgrade(&conn);
71
72    let mut store_cmd_send = cmd_send.clone();
73    store_cmd_send.set_close_on_drop(true);
74
75    map.insert(pub_key, (weak_conn, store_cmd_send));
76
77    Ok((Some(recv), conn, cmd_send))
78}
79
80pub(crate) enum HubCmd {
81    CliRecv {
82        pub_key: PubKey,
83        msg: tx5_signal::SignalMessage,
84    },
85    Connect {
86        pub_key: PubKey,
87        resp:
88            tokio::sync::oneshot::Sender<Result<(Option<ConnRecv>, Arc<Conn>)>>,
89    },
90    Disconnect(PubKey),
91    Close,
92}
93
94/// A stream of incoming p2p connections.
95pub struct HubRecv(tokio::sync::mpsc::Receiver<(Arc<Conn>, ConnRecv)>);
96
97impl HubRecv {
98    /// Receive an incoming p2p connection.
99    pub async fn accept(&mut self) -> Option<(Arc<Conn>, ConnRecv)> {
100        self.0.recv().await
101    }
102}
103
104/// A signal server connection from which we can establish tx5 connections.
105pub struct Hub {
106    webrtc_config: Arc<Mutex<WebRtcConfig>>,
107    client: Arc<tx5_signal::SignalConnection>,
108    hub_cmd_send: tokio::sync::mpsc::Sender<HubCmd>,
109    task_list: Vec<tokio::task::JoinHandle<()>>,
110}
111
112impl Drop for Hub {
113    fn drop(&mut self) {
114        for task in self.task_list.iter() {
115            task.abort();
116        }
117    }
118}
119
120impl Hub {
121    /// Create a new Hub based off a connected tx5 signal client.
122    /// Note, if this is not a "listener" client,
123    /// you do not need to ever call accept.
124    pub async fn new(
125        webrtc_config: WebRtcConfig,
126        url: &str,
127        config: Arc<HubConfig>,
128    ) -> Result<(Self, HubRecv)> {
129        let webrtc_config = Arc::new(Mutex::new(webrtc_config));
130
131        let (client, mut recv) = tx5_signal::SignalConnection::connect(
132            url,
133            config.signal_config.clone(),
134        )
135        .await?;
136        let client = Arc::new(client);
137
138        tracing::debug!(%url, pub_key = ?client.pub_key(), "hub connected");
139
140        let mut task_list = Vec::new();
141
142        let (hub_cmd_send, mut cmd_recv) = tokio::sync::mpsc::channel(32);
143
144        // forward received messages to the cmd task
145        let hub_cmd_send2 = hub_cmd_send.clone();
146        task_list.push(tokio::task::spawn(async move {
147            while let Some((pub_key, msg)) = recv.recv_message().await {
148                if hub_cmd_send2
149                    .send(HubCmd::CliRecv { pub_key, msg })
150                    .await
151                    .is_err()
152                {
153                    break;
154                }
155            }
156
157            let _ = hub_cmd_send2.send(HubCmd::Close).await;
158        }));
159
160        // the cmd task is the main event loop of the hub logic
161        let webrtc_config2 = webrtc_config.clone();
162        let (conn_send, conn_recv) = tokio::sync::mpsc::channel(32);
163        let weak_client = Arc::downgrade(&client);
164        let url = url.to_string();
165        let this_pub_key = client.pub_key().clone();
166        let hub_cmd_send2 = hub_cmd_send.clone();
167        task_list.push(tokio::task::spawn(async move {
168            let mut map = HubMap::new();
169            while let Some(cmd) = cmd_recv.recv().await {
170                match cmd {
171                    HubCmd::CliRecv { pub_key, msg } => {
172                        if let Some(client) = weak_client.upgrade() {
173                            if pub_key == this_pub_key {
174                                // ignore self messages
175                                continue;
176                            }
177
178                            // assert a connection for this message
179                            let is_polite = pub_key > this_pub_key;
180                            let (recv, conn, cmd_send) = match hub_map_assert(
181                                &webrtc_config2,
182                                is_polite,
183                                pub_key,
184                                &mut map,
185                                &client,
186                                &config,
187                                &hub_cmd_send2,
188                            )
189                            .await
190                            {
191                                Err(err) => {
192                                    tracing::debug!(
193                                        ?err,
194                                        "failed to accept incoming connection"
195                                    );
196                                    continue;
197                                }
198                                Ok(r) => r,
199                            };
200
201                            // then forward it along
202                            let _ = cmd_send.send(ConnCmd::SigRecv(msg)).await;
203
204                            // if this opened a new connection,
205                            // send that event too
206                            if let Some(recv) = recv {
207                                let _ = conn_send.send((conn, recv)).await;
208                            }
209                        } else {
210                            break;
211                        }
212                    }
213                    HubCmd::Connect { pub_key, resp } => {
214                        // user requested a connect without a message to send
215
216                        if pub_key == this_pub_key {
217                            let _ = resp.send(Err(Error::other(
218                                "cannot connect to self",
219                            )));
220                            continue;
221                        }
222                        let is_polite = pub_key > this_pub_key;
223                        if let Some(client) = weak_client.upgrade() {
224                            let _ = resp.send(
225                                hub_map_assert(
226                                    &webrtc_config2,
227                                    is_polite,
228                                    pub_key,
229                                    &mut map,
230                                    &client,
231                                    &config,
232                                    &hub_cmd_send2,
233                                )
234                                .await
235                                .map(|(recv, conn, _)| (recv, conn)),
236                            );
237                        } else {
238                            break;
239                        }
240                    }
241                    HubCmd::Disconnect(pub_key) => {
242                        // disconnect from a remote peer
243
244                        if let Some(client) = weak_client.upgrade() {
245                            let _ = client.close_peer(&pub_key).await;
246                        } else {
247                            break;
248                        }
249                        let _ = map.remove(&pub_key);
250                    }
251                    HubCmd::Close => break,
252                }
253            }
254
255            if let Some(client) = weak_client.upgrade() {
256                client.close().await;
257            }
258
259            tracing::debug!(%url, ?this_pub_key, "hub close");
260        }));
261
262        Ok((
263            Self {
264                webrtc_config,
265                client,
266                hub_cmd_send,
267                task_list,
268            },
269            HubRecv(conn_recv),
270        ))
271    }
272
273    /// Get the pub_key used by this hub.
274    pub fn pub_key(&self) -> &PubKey {
275        self.client.pub_key()
276    }
277
278    /// Establish a connection to a remote peer.
279    pub async fn connect(
280        &self,
281        pub_key: PubKey,
282    ) -> Result<(Arc<Conn>, ConnRecv)> {
283        let (s, r) = tokio::sync::oneshot::channel();
284        self.hub_cmd_send
285            .send(HubCmd::Connect { pub_key, resp: s })
286            .await
287            .map_err(|_| Error::other("closed"))?;
288        let (recv, conn) = r.await.map_err(|_| Error::other("closed"))??;
289        if let Some(recv) = recv {
290            Ok((conn, recv))
291        } else {
292            Err(Error::other("already connected"))
293        }
294    }
295
296    /// Alter the webrtc_config at runtime.
297    ///
298    /// This will affect all future new outgoing connections, and all connections accepted on the
299    /// receiver. It will not affect any connections already established.
300    pub fn set_webrtc_config(&self, webrtc_config: WebRtcConfig) {
301        *self.webrtc_config.lock().unwrap() = webrtc_config;
302    }
303}