1pub use super::*;
2
3type HubMapT = HashMap<PubKey, (Weak<Conn>, CloseSend<ConnCmd>)>;
4struct HubMap(HubMapT);
5
6impl std::ops::Deref for HubMap {
7 type Target = HubMapT;
8
9 fn deref(&self) -> &Self::Target {
10 &self.0
11 }
12}
13
14impl std::ops::DerefMut for HubMap {
15 fn deref_mut(&mut self) -> &mut Self::Target {
16 &mut self.0
17 }
18}
19
20impl HubMap {
21 pub fn new() -> Self {
22 Self(HashMap::new())
23 }
24}
25
26async fn hub_map_assert(
27 webrtc_config: &Arc<Mutex<WebRtcConfig>>,
28 is_polite: bool,
29 pub_key: PubKey,
30 map: &mut HubMap,
31 client: &Arc<tx5_signal::SignalConnection>,
32 config: &Arc<HubConfig>,
33 hub_cmd_send: &tokio::sync::mpsc::Sender<HubCmd>,
34) -> Result<(Option<ConnRecv>, Arc<Conn>, CloseSend<ConnCmd>)> {
35 let mut found_during_prune = None;
36
37 map.retain(|_, c| {
38 if let Some(conn) = c.0.upgrade() {
39 let cmd_send = c.1.clone();
40 if conn.pub_key() == &pub_key {
41 found_during_prune = Some((conn, cmd_send));
42 }
43 true
44 } else {
45 false
46 }
47 });
48
49 if let Some((conn, cmd_send)) = found_during_prune {
50 return Ok((None, conn, cmd_send));
51 }
52
53 client.assert(&pub_key).await?;
54
55 let (conn, recv, cmd_send) = Conn::priv_new(
58 webrtc_config.lock().unwrap().clone(),
59 is_polite,
60 pub_key.clone(),
61 Arc::downgrade(client),
62 config.clone(),
63 hub_cmd_send.clone(),
64 );
65
66 let weak_conn = Arc::downgrade(&conn);
67
68 let mut store_cmd_send = cmd_send.clone();
69 store_cmd_send.set_close_on_drop(true);
70
71 map.insert(pub_key, (weak_conn, store_cmd_send));
72
73 Ok((Some(recv), conn, cmd_send))
74}
75
76pub(crate) enum HubCmd {
77 CliRecv {
78 pub_key: PubKey,
79 msg: tx5_signal::SignalMessage,
80 },
81 Connect {
82 pub_key: PubKey,
83 resp:
84 tokio::sync::oneshot::Sender<Result<(Option<ConnRecv>, Arc<Conn>)>>,
85 },
86 Disconnect(PubKey),
87 Close,
88}
89
90pub struct HubRecv(tokio::sync::mpsc::Receiver<(Arc<Conn>, ConnRecv)>);
92
93impl HubRecv {
94 pub async fn accept(&mut self) -> Option<(Arc<Conn>, ConnRecv)> {
96 self.0.recv().await
97 }
98}
99
100pub struct Hub {
102 webrtc_config: Arc<Mutex<WebRtcConfig>>,
103 client: Arc<tx5_signal::SignalConnection>,
104 hub_cmd_send: tokio::sync::mpsc::Sender<HubCmd>,
105 task_list: Vec<tokio::task::JoinHandle<()>>,
106}
107
108impl Drop for Hub {
109 fn drop(&mut self) {
110 for task in self.task_list.iter() {
111 task.abort();
112 }
113 }
114}
115
116impl Hub {
117 pub async fn new(
121 webrtc_config: WebRtcConfig,
122 url: &str,
123 config: Arc<HubConfig>,
124 ) -> Result<(Self, HubRecv)> {
125 let webrtc_config = Arc::new(Mutex::new(webrtc_config));
126
127 let (client, mut recv) = tx5_signal::SignalConnection::connect(
128 url,
129 config.signal_config.clone(),
130 )
131 .await?;
132 let client = Arc::new(client);
133
134 tracing::debug!(%url, pub_key = ?client.pub_key(), "hub connected");
135
136 let mut task_list = Vec::new();
137
138 let (hub_cmd_send, mut cmd_recv) = tokio::sync::mpsc::channel(32);
139
140 let hub_cmd_send2 = hub_cmd_send.clone();
141 task_list.push(tokio::task::spawn(async move {
142 while let Some((pub_key, msg)) = recv.recv_message().await {
143 if hub_cmd_send2
144 .send(HubCmd::CliRecv { pub_key, msg })
145 .await
146 .is_err()
147 {
148 break;
149 }
150 }
151
152 let _ = hub_cmd_send2.send(HubCmd::Close).await;
153 }));
154
155 let webrtc_config2 = webrtc_config.clone();
156 let (conn_send, conn_recv) = tokio::sync::mpsc::channel(32);
157 let weak_client = Arc::downgrade(&client);
158 let url = url.to_string();
159 let this_pub_key = client.pub_key().clone();
160 let hub_cmd_send2 = hub_cmd_send.clone();
161 task_list.push(tokio::task::spawn(async move {
162 let mut map = HubMap::new();
163 while let Some(cmd) = cmd_recv.recv().await {
164 match cmd {
165 HubCmd::CliRecv { pub_key, msg } => {
166 if let Some(client) = weak_client.upgrade() {
167 if pub_key == this_pub_key {
168 continue;
170 }
171 let is_polite = pub_key > this_pub_key;
172 let (recv, conn, cmd_send) = match hub_map_assert(
173 &webrtc_config2,
174 is_polite,
175 pub_key,
176 &mut map,
177 &client,
178 &config,
179 &hub_cmd_send2,
180 )
181 .await
182 {
183 Err(err) => {
184 tracing::debug!(
185 ?err,
186 "failed to accept incoming connection"
187 );
188 continue;
189 }
190 Ok(r) => r,
191 };
192 let _ = cmd_send.send(ConnCmd::SigRecv(msg)).await;
193 if let Some(recv) = recv {
194 let _ = conn_send.send((conn, recv)).await;
195 }
196 } else {
197 break;
198 }
199 }
200 HubCmd::Connect { pub_key, resp } => {
201 if pub_key == this_pub_key {
202 let _ = resp.send(Err(Error::other(
203 "cannot connect to self",
204 )));
205 continue;
206 }
207 let is_polite = pub_key > this_pub_key;
208 if let Some(client) = weak_client.upgrade() {
209 let _ = resp.send(
210 hub_map_assert(
211 &webrtc_config2,
212 is_polite,
213 pub_key,
214 &mut map,
215 &client,
216 &config,
217 &hub_cmd_send2,
218 )
219 .await
220 .map(|(recv, conn, _)| (recv, conn)),
221 );
222 } else {
223 break;
224 }
225 }
226 HubCmd::Disconnect(pub_key) => {
227 if let Some(client) = weak_client.upgrade() {
228 let _ = client.close_peer(&pub_key).await;
229 } else {
230 break;
231 }
232 let _ = map.remove(&pub_key);
233 }
234 HubCmd::Close => break,
235 }
236 }
237
238 if let Some(client) = weak_client.upgrade() {
239 client.close().await;
240 }
241
242 tracing::debug!(%url, ?this_pub_key, "hub close");
243 }));
244
245 Ok((
246 Self {
247 webrtc_config,
248 client,
249 hub_cmd_send,
250 task_list,
251 },
252 HubRecv(conn_recv),
253 ))
254 }
255
256 pub fn pub_key(&self) -> &PubKey {
258 self.client.pub_key()
259 }
260
261 pub async fn connect(
263 &self,
264 pub_key: PubKey,
265 ) -> Result<(Arc<Conn>, ConnRecv)> {
266 let (s, r) = tokio::sync::oneshot::channel();
267 self.hub_cmd_send
268 .send(HubCmd::Connect { pub_key, resp: s })
269 .await
270 .map_err(|_| Error::other("closed"))?;
271 let (recv, conn) = r.await.map_err(|_| Error::other("closed"))??;
272 if let Some(recv) = recv {
273 Ok((conn, recv))
274 } else {
275 Err(Error::other("already connected"))
276 }
277 }
278
279 pub fn set_webrtc_config(&self, webrtc_config: WebRtcConfig) {
284 *self.webrtc_config.lock().unwrap() = webrtc_config;
285 }
286}