tx5_connection/
hub.rs

1pub use super::*;
2
3type HubMapT = HashMap<PubKey, (Weak<Conn>, CloseSend<ConnCmd>)>;
4struct HubMap(HubMapT);
5
6impl std::ops::Deref for HubMap {
7    type Target = HubMapT;
8
9    fn deref(&self) -> &Self::Target {
10        &self.0
11    }
12}
13
14impl std::ops::DerefMut for HubMap {
15    fn deref_mut(&mut self) -> &mut Self::Target {
16        &mut self.0
17    }
18}
19
20impl HubMap {
21    pub fn new() -> Self {
22        Self(HashMap::new())
23    }
24}
25
26async fn hub_map_assert(
27    webrtc_config: &Arc<Mutex<WebRtcConfig>>,
28    is_polite: bool,
29    pub_key: PubKey,
30    map: &mut HubMap,
31    client: &Arc<tx5_signal::SignalConnection>,
32    config: &Arc<HubConfig>,
33    hub_cmd_send: &tokio::sync::mpsc::Sender<HubCmd>,
34) -> Result<(Option<ConnRecv>, Arc<Conn>, CloseSend<ConnCmd>)> {
35    let mut found_during_prune = None;
36
37    map.retain(|_, c| {
38        if let Some(conn) = c.0.upgrade() {
39            let cmd_send = c.1.clone();
40            if conn.pub_key() == &pub_key {
41                found_during_prune = Some((conn, cmd_send));
42            }
43            true
44        } else {
45            false
46        }
47    });
48
49    if let Some((conn, cmd_send)) = found_during_prune {
50        return Ok((None, conn, cmd_send));
51    }
52
53    client.assert(&pub_key).await?;
54
55    // we're connected to the peer, create a connection
56
57    let (conn, recv, cmd_send) = Conn::priv_new(
58        webrtc_config.lock().unwrap().clone(),
59        is_polite,
60        pub_key.clone(),
61        Arc::downgrade(client),
62        config.clone(),
63        hub_cmd_send.clone(),
64    );
65
66    let weak_conn = Arc::downgrade(&conn);
67
68    let mut store_cmd_send = cmd_send.clone();
69    store_cmd_send.set_close_on_drop(true);
70
71    map.insert(pub_key, (weak_conn, store_cmd_send));
72
73    Ok((Some(recv), conn, cmd_send))
74}
75
76pub(crate) enum HubCmd {
77    CliRecv {
78        pub_key: PubKey,
79        msg: tx5_signal::SignalMessage,
80    },
81    Connect {
82        pub_key: PubKey,
83        resp:
84            tokio::sync::oneshot::Sender<Result<(Option<ConnRecv>, Arc<Conn>)>>,
85    },
86    Disconnect(PubKey),
87    Close,
88}
89
90/// A stream of incoming p2p connections.
91pub struct HubRecv(tokio::sync::mpsc::Receiver<(Arc<Conn>, ConnRecv)>);
92
93impl HubRecv {
94    /// Receive an incoming p2p connection.
95    pub async fn accept(&mut self) -> Option<(Arc<Conn>, ConnRecv)> {
96        self.0.recv().await
97    }
98}
99
100/// A signal server connection from which we can establish tx5 connections.
101pub struct Hub {
102    webrtc_config: Arc<Mutex<WebRtcConfig>>,
103    client: Arc<tx5_signal::SignalConnection>,
104    hub_cmd_send: tokio::sync::mpsc::Sender<HubCmd>,
105    task_list: Vec<tokio::task::JoinHandle<()>>,
106}
107
108impl Drop for Hub {
109    fn drop(&mut self) {
110        for task in self.task_list.iter() {
111            task.abort();
112        }
113    }
114}
115
116impl Hub {
117    /// Create a new Hub based off a connected tx5 signal client.
118    /// Note, if this is not a "listener" client,
119    /// you do not need to ever call accept.
120    pub async fn new(
121        webrtc_config: WebRtcConfig,
122        url: &str,
123        config: Arc<HubConfig>,
124    ) -> Result<(Self, HubRecv)> {
125        let webrtc_config = Arc::new(Mutex::new(webrtc_config));
126
127        let (client, mut recv) = tx5_signal::SignalConnection::connect(
128            url,
129            config.signal_config.clone(),
130        )
131        .await?;
132        let client = Arc::new(client);
133
134        tracing::debug!(%url, pub_key = ?client.pub_key(), "hub connected");
135
136        let mut task_list = Vec::new();
137
138        let (hub_cmd_send, mut cmd_recv) = tokio::sync::mpsc::channel(32);
139
140        let hub_cmd_send2 = hub_cmd_send.clone();
141        task_list.push(tokio::task::spawn(async move {
142            while let Some((pub_key, msg)) = recv.recv_message().await {
143                if hub_cmd_send2
144                    .send(HubCmd::CliRecv { pub_key, msg })
145                    .await
146                    .is_err()
147                {
148                    break;
149                }
150            }
151
152            let _ = hub_cmd_send2.send(HubCmd::Close).await;
153        }));
154
155        let webrtc_config2 = webrtc_config.clone();
156        let (conn_send, conn_recv) = tokio::sync::mpsc::channel(32);
157        let weak_client = Arc::downgrade(&client);
158        let url = url.to_string();
159        let this_pub_key = client.pub_key().clone();
160        let hub_cmd_send2 = hub_cmd_send.clone();
161        task_list.push(tokio::task::spawn(async move {
162            let mut map = HubMap::new();
163            while let Some(cmd) = cmd_recv.recv().await {
164                match cmd {
165                    HubCmd::CliRecv { pub_key, msg } => {
166                        if let Some(client) = weak_client.upgrade() {
167                            if pub_key == this_pub_key {
168                                // ignore self messages
169                                continue;
170                            }
171                            let is_polite = pub_key > this_pub_key;
172                            let (recv, conn, cmd_send) = match hub_map_assert(
173                                &webrtc_config2,
174                                is_polite,
175                                pub_key,
176                                &mut map,
177                                &client,
178                                &config,
179                                &hub_cmd_send2,
180                            )
181                            .await
182                            {
183                                Err(err) => {
184                                    tracing::debug!(
185                                        ?err,
186                                        "failed to accept incoming connection"
187                                    );
188                                    continue;
189                                }
190                                Ok(r) => r,
191                            };
192                            let _ = cmd_send.send(ConnCmd::SigRecv(msg)).await;
193                            if let Some(recv) = recv {
194                                let _ = conn_send.send((conn, recv)).await;
195                            }
196                        } else {
197                            break;
198                        }
199                    }
200                    HubCmd::Connect { pub_key, resp } => {
201                        if pub_key == this_pub_key {
202                            let _ = resp.send(Err(Error::other(
203                                "cannot connect to self",
204                            )));
205                            continue;
206                        }
207                        let is_polite = pub_key > this_pub_key;
208                        if let Some(client) = weak_client.upgrade() {
209                            let _ = resp.send(
210                                hub_map_assert(
211                                    &webrtc_config2,
212                                    is_polite,
213                                    pub_key,
214                                    &mut map,
215                                    &client,
216                                    &config,
217                                    &hub_cmd_send2,
218                                )
219                                .await
220                                .map(|(recv, conn, _)| (recv, conn)),
221                            );
222                        } else {
223                            break;
224                        }
225                    }
226                    HubCmd::Disconnect(pub_key) => {
227                        if let Some(client) = weak_client.upgrade() {
228                            let _ = client.close_peer(&pub_key).await;
229                        } else {
230                            break;
231                        }
232                        let _ = map.remove(&pub_key);
233                    }
234                    HubCmd::Close => break,
235                }
236            }
237
238            if let Some(client) = weak_client.upgrade() {
239                client.close().await;
240            }
241
242            tracing::debug!(%url, ?this_pub_key, "hub close");
243        }));
244
245        Ok((
246            Self {
247                webrtc_config,
248                client,
249                hub_cmd_send,
250                task_list,
251            },
252            HubRecv(conn_recv),
253        ))
254    }
255
256    /// Get the pub_key used by this hub.
257    pub fn pub_key(&self) -> &PubKey {
258        self.client.pub_key()
259    }
260
261    /// Establish a connection to a remote peer.
262    pub async fn connect(
263        &self,
264        pub_key: PubKey,
265    ) -> Result<(Arc<Conn>, ConnRecv)> {
266        let (s, r) = tokio::sync::oneshot::channel();
267        self.hub_cmd_send
268            .send(HubCmd::Connect { pub_key, resp: s })
269            .await
270            .map_err(|_| Error::other("closed"))?;
271        let (recv, conn) = r.await.map_err(|_| Error::other("closed"))??;
272        if let Some(recv) = recv {
273            Ok((conn, recv))
274        } else {
275            Err(Error::other("already connected"))
276        }
277    }
278
279    /// Alter the webrtc_config at runtime.
280    ///
281    /// This will affect all future new outgoing connections, and all connections accepted on the
282    /// receiver. It will not affect any connections already established.
283    pub fn set_webrtc_config(&self, webrtc_config: WebRtcConfig) {
284        *self.webrtc_config.lock().unwrap() = webrtc_config;
285    }
286}