1pub use super::*;
2
3type HubMapT = HashMap<PubKey, (Weak<Conn>, CloseSend<ConnCmd>)>;
4struct HubMap(HubMapT);
5
6impl std::ops::Deref for HubMap {
7 type Target = HubMapT;
8
9 fn deref(&self) -> &Self::Target {
10 &self.0
11 }
12}
13
14impl std::ops::DerefMut for HubMap {
15 fn deref_mut(&mut self) -> &mut Self::Target {
16 &mut self.0
17 }
18}
19
20impl HubMap {
21 pub fn new() -> Self {
22 Self(HashMap::new())
23 }
24}
25
26async fn hub_map_assert(
28 webrtc_config: &Arc<Mutex<WebRtcConfig>>,
29 is_polite: bool,
30 pub_key: PubKey,
31 map: &mut HubMap,
32 client: &Arc<tx5_signal::SignalConnection>,
33 config: &Arc<HubConfig>,
34 hub_cmd_send: &tokio::sync::mpsc::Sender<HubCmd>,
35) -> Result<(Option<ConnRecv>, Arc<Conn>, CloseSend<ConnCmd>)> {
36 let mut found_during_prune = None;
37
38 map.retain(|_, c| {
40 if let Some(conn) = c.0.upgrade() {
41 let cmd_send = c.1.clone();
42 if conn.pub_key() == &pub_key {
43 found_during_prune = Some((conn, cmd_send));
44 }
45 true
46 } else {
47 false
48 }
49 });
50
51 if let Some((conn, cmd_send)) = found_during_prune {
54 return Ok((None, conn, cmd_send));
55 }
56
57 client.assert(&pub_key).await?;
58
59 let (conn, recv, cmd_send) = Conn::priv_new(
62 webrtc_config.lock().unwrap().clone(),
63 is_polite,
64 pub_key.clone(),
65 Arc::downgrade(client),
66 config.clone(),
67 hub_cmd_send.clone(),
68 );
69
70 let weak_conn = Arc::downgrade(&conn);
71
72 let mut store_cmd_send = cmd_send.clone();
73 store_cmd_send.set_close_on_drop(true);
74
75 map.insert(pub_key, (weak_conn, store_cmd_send));
76
77 Ok((Some(recv), conn, cmd_send))
78}
79
80pub(crate) enum HubCmd {
81 CliRecv {
82 pub_key: PubKey,
83 msg: tx5_signal::SignalMessage,
84 },
85 Connect {
86 pub_key: PubKey,
87 resp:
88 tokio::sync::oneshot::Sender<Result<(Option<ConnRecv>, Arc<Conn>)>>,
89 },
90 Disconnect(PubKey),
91 Close,
92}
93
94pub struct HubRecv(tokio::sync::mpsc::Receiver<(Arc<Conn>, ConnRecv)>);
96
97impl HubRecv {
98 pub async fn accept(&mut self) -> Option<(Arc<Conn>, ConnRecv)> {
100 self.0.recv().await
101 }
102}
103
104pub struct Hub {
106 webrtc_config: Arc<Mutex<WebRtcConfig>>,
107 client: Arc<tx5_signal::SignalConnection>,
108 hub_cmd_send: tokio::sync::mpsc::Sender<HubCmd>,
109 task_list: Vec<tokio::task::JoinHandle<()>>,
110}
111
112impl Drop for Hub {
113 fn drop(&mut self) {
114 for task in self.task_list.iter() {
115 task.abort();
116 }
117 }
118}
119
120impl Hub {
121 pub async fn new(
125 webrtc_config: WebRtcConfig,
126 url: &str,
127 config: Arc<HubConfig>,
128 ) -> Result<(Self, HubRecv)> {
129 let webrtc_config = Arc::new(Mutex::new(webrtc_config));
130
131 let (client, mut recv) = tx5_signal::SignalConnection::connect(
132 url,
133 config.signal_config.clone(),
134 )
135 .await?;
136 let client = Arc::new(client);
137
138 tracing::debug!(%url, pub_key = ?client.pub_key(), "hub connected");
139
140 let mut task_list = Vec::new();
141
142 let (hub_cmd_send, mut cmd_recv) = tokio::sync::mpsc::channel(32);
143
144 let hub_cmd_send2 = hub_cmd_send.clone();
146 task_list.push(tokio::task::spawn(async move {
147 while let Some((pub_key, msg)) = recv.recv_message().await {
148 if hub_cmd_send2
149 .send(HubCmd::CliRecv { pub_key, msg })
150 .await
151 .is_err()
152 {
153 break;
154 }
155 }
156
157 let _ = hub_cmd_send2.send(HubCmd::Close).await;
158 }));
159
160 let webrtc_config2 = webrtc_config.clone();
162 let (conn_send, conn_recv) = tokio::sync::mpsc::channel(32);
163 let weak_client = Arc::downgrade(&client);
164 let url = url.to_string();
165 let this_pub_key = client.pub_key().clone();
166 let hub_cmd_send2 = hub_cmd_send.clone();
167 task_list.push(tokio::task::spawn(async move {
168 let mut map = HubMap::new();
169 while let Some(cmd) = cmd_recv.recv().await {
170 match cmd {
171 HubCmd::CliRecv { pub_key, msg } => {
172 if let Some(client) = weak_client.upgrade() {
173 if pub_key == this_pub_key {
174 continue;
176 }
177
178 let is_polite = pub_key > this_pub_key;
180 let (recv, conn, cmd_send) = match hub_map_assert(
181 &webrtc_config2,
182 is_polite,
183 pub_key,
184 &mut map,
185 &client,
186 &config,
187 &hub_cmd_send2,
188 )
189 .await
190 {
191 Err(err) => {
192 tracing::debug!(
193 ?err,
194 "failed to accept incoming connection"
195 );
196 continue;
197 }
198 Ok(r) => r,
199 };
200
201 let _ = cmd_send.send(ConnCmd::SigRecv(msg)).await;
203
204 if let Some(recv) = recv {
207 let _ = conn_send.send((conn, recv)).await;
208 }
209 } else {
210 break;
211 }
212 }
213 HubCmd::Connect { pub_key, resp } => {
214 if pub_key == this_pub_key {
217 let _ = resp.send(Err(Error::other(
218 "cannot connect to self",
219 )));
220 continue;
221 }
222 let is_polite = pub_key > this_pub_key;
223 if let Some(client) = weak_client.upgrade() {
224 let _ = resp.send(
225 hub_map_assert(
226 &webrtc_config2,
227 is_polite,
228 pub_key,
229 &mut map,
230 &client,
231 &config,
232 &hub_cmd_send2,
233 )
234 .await
235 .map(|(recv, conn, _)| (recv, conn)),
236 );
237 } else {
238 break;
239 }
240 }
241 HubCmd::Disconnect(pub_key) => {
242 if let Some(client) = weak_client.upgrade() {
245 let _ = client.close_peer(&pub_key).await;
246 } else {
247 break;
248 }
249 let _ = map.remove(&pub_key);
250 }
251 HubCmd::Close => break,
252 }
253 }
254
255 if let Some(client) = weak_client.upgrade() {
256 client.close().await;
257 }
258
259 tracing::debug!(%url, ?this_pub_key, "hub close");
260 }));
261
262 Ok((
263 Self {
264 webrtc_config,
265 client,
266 hub_cmd_send,
267 task_list,
268 },
269 HubRecv(conn_recv),
270 ))
271 }
272
273 pub fn pub_key(&self) -> &PubKey {
275 self.client.pub_key()
276 }
277
278 pub async fn connect(
280 &self,
281 pub_key: PubKey,
282 ) -> Result<(Arc<Conn>, ConnRecv)> {
283 let (s, r) = tokio::sync::oneshot::channel();
284 self.hub_cmd_send
285 .send(HubCmd::Connect { pub_key, resp: s })
286 .await
287 .map_err(|_| Error::other("closed"))?;
288 let (recv, conn) = r.await.map_err(|_| Error::other("closed"))??;
289 if let Some(recv) = recv {
290 Ok((conn, recv))
291 } else {
292 Err(Error::other("already connected"))
293 }
294 }
295
296 pub fn set_webrtc_config(&self, webrtc_config: WebRtcConfig) {
301 *self.webrtc_config.lock().unwrap() = webrtc_config;
302 }
303}