1use anyhow::Result;
13use futures::{SinkExt, StreamExt};
14use nostr::{nips::nip44, ClientMessage, EventBuilder, Filter, JsonUtil, Keys, Kind, PublicKey, RelayMessage, Tag};
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use tokio::sync::{mpsc, Mutex, RwLock};
19use tokio_tungstenite::{connect_async, tungstenite::Message};
20use tracing::{debug, error, info, warn};
21
22use super::peer::{ContentStore, Peer, PendingRequest};
23use super::types::{
24 PeerDirection, PeerId, PeerPool, PeerStateEvent, PeerStatus, SignalingMessage, WebRTCConfig, WEBRTC_KIND, HELLO_TAG,
25};
26
27pub type PeerClassifier = Arc<dyn Fn(&str) -> PeerPool + Send + Sync>;
29
30#[derive(Debug, Clone, PartialEq)]
32pub enum ConnectionState {
33 Discovered,
34 Connecting,
35 Connected,
36 Failed,
37}
38
39impl std::fmt::Display for ConnectionState {
40 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 match self {
42 ConnectionState::Discovered => write!(f, "discovered"),
43 ConnectionState::Connecting => write!(f, "connecting"),
44 ConnectionState::Connected => write!(f, "connected"),
45 ConnectionState::Failed => write!(f, "failed"),
46 }
47 }
48}
49
50pub struct PeerEntry {
52 pub peer_id: PeerId,
53 pub direction: PeerDirection,
54 pub state: ConnectionState,
55 pub last_seen: Instant,
56 pub peer: Option<Peer>,
57 pub pool: PeerPool,
58}
59
60pub struct WebRTCState {
62 pub peers: RwLock<HashMap<String, PeerEntry>>,
63 pub connected_count: std::sync::atomic::AtomicUsize,
64}
65
66impl WebRTCState {
67 pub fn new() -> Self {
68 Self {
69 peers: RwLock::new(HashMap::new()),
70 connected_count: std::sync::atomic::AtomicUsize::new(0),
71 }
72 }
73
74 pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
78 use super::types::{DataRequest, MAX_HTL, encode_request};
79
80 let peers = self.peers.read().await;
81
82 let peer_refs: Vec<_> = peers
85 .values()
86 .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
87 .filter_map(|p| {
88 p.peer.as_ref().map(|peer| {
89 (p.peer_id.short(), peer.data_channel.clone(), peer.pending_requests.clone())
90 })
91 })
92 .collect();
93
94 drop(peers); let mut connected_peers: Vec<(String, Arc<Mutex<HashMap<String, PendingRequest>>>, Arc<webrtc::data_channel::RTCDataChannel>)> = Vec::new();
98 for (peer_id, dc_mutex, pending) in peer_refs {
99 let dc_guard = dc_mutex.lock().await;
100 if let Some(dc) = dc_guard.as_ref() {
101 connected_peers.push((peer_id, pending, dc.clone()));
102 }
103 }
104
105 if connected_peers.is_empty() {
106 debug!("No connected peers to query for {}", &hash_hex[..8.min(hash_hex.len())]);
107 return None;
108 }
109
110 debug!(
111 "Querying {} connected peers for {} (sequential with 500ms delay)",
112 connected_peers.len(),
113 &hash_hex[..8.min(hash_hex.len())]
114 );
115
116 let hash_bytes = match hex::decode(hash_hex) {
118 Ok(b) => b,
119 Err(_) => return None,
120 };
121
122 for (_i, (peer_id, pending_requests, dc)) in connected_peers.into_iter().enumerate() {
124 debug!("Querying peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
125
126 let (tx, rx) = tokio::sync::oneshot::channel();
128
129 {
131 let mut pending = pending_requests.lock().await;
132 pending.insert(
133 hash_hex.to_string(),
134 super::PendingRequest {
135 hash: hash_bytes.clone(),
136 response_tx: tx,
137 },
138 );
139 }
140
141 let req = DataRequest {
143 h: hash_bytes.clone(),
144 htl: MAX_HTL,
145 };
146 if let Ok(wire) = encode_request(&req) {
147 if dc.send(&bytes::Bytes::from(wire)).await.is_ok() {
148 match tokio::time::timeout(std::time::Duration::from_millis(500), rx).await {
150 Ok(Ok(Some(data))) => {
151 debug!("Got response from peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
152 return Some(data);
153 }
154 _ => {
155 debug!("No response from peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
157 }
158 }
159 }
160 }
161
162 let mut pending = pending_requests.lock().await;
164 pending.remove(hash_hex);
165 }
166
167 debug!("No peer had data for {}", &hash_hex[..8.min(hash_hex.len())]);
168 None
169 }
170}
171
172pub struct WebRTCManager {
174 config: WebRTCConfig,
175 my_peer_id: PeerId,
176 keys: Keys,
177 state: Arc<WebRTCState>,
178 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
179 shutdown_rx: tokio::sync::watch::Receiver<bool>,
180 signaling_tx: mpsc::Sender<SignalingMessage>,
182 signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
183 store: Option<Arc<dyn ContentStore>>,
185 peer_classifier: PeerClassifier,
187 state_event_tx: mpsc::Sender<PeerStateEvent>,
189 state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
190}
191
192impl WebRTCManager {
193 pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
195 let pubkey = keys.public_key().to_hex();
196 let my_peer_id = PeerId::new(pubkey, None);
197 let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
198 let (signaling_tx, signaling_rx) = mpsc::channel(100);
199 let (state_event_tx, state_event_rx) = mpsc::channel(100);
200
201 let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
203
204 Self {
205 config,
206 my_peer_id,
207 keys,
208 state: Arc::new(WebRTCState::new()),
209 shutdown: Arc::new(shutdown),
210 shutdown_rx,
211 signaling_tx,
212 signaling_rx: Some(signaling_rx),
213 store: None,
214 peer_classifier,
215 state_event_tx,
216 state_event_rx: Some(state_event_rx),
217 }
218 }
219
220 pub fn new_with_classifier(keys: Keys, config: WebRTCConfig, classifier: PeerClassifier) -> Self {
222 let mut manager = Self::new(keys, config);
223 manager.peer_classifier = classifier;
224 manager
225 }
226
227 pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
229 let mut manager = Self::new(keys, config);
230 manager.store = Some(store);
231 manager
232 }
233
234 pub fn new_with_store_and_classifier(
236 keys: Keys,
237 config: WebRTCConfig,
238 store: Arc<dyn ContentStore>,
239 classifier: PeerClassifier,
240 ) -> Self {
241 let mut manager = Self::new(keys, config);
242 manager.store = Some(store);
243 manager.peer_classifier = classifier;
244 manager
245 }
246
247 pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
249 self.store = Some(store);
250 }
251
252 pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
254 self.peer_classifier = classifier;
255 }
256
257 pub fn my_peer_id(&self) -> &PeerId {
259 &self.my_peer_id
260 }
261
262 pub fn state(&self) -> Arc<WebRTCState> {
264 self.state.clone()
265 }
266
267 pub fn shutdown(&self) {
269 let _ = self.shutdown.send(true);
270 }
271
272 pub async fn connected_count(&self) -> usize {
274 self.state
275 .connected_count
276 .load(std::sync::atomic::Ordering::Relaxed)
277 }
278
279 pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
281 self.state
282 .peers
283 .read()
284 .await
285 .values()
286 .map(|p| PeerStatus {
287 peer_id: p.peer_id.to_string(),
288 pubkey: p.peer_id.pubkey.clone(),
289 state: p.state.to_string(),
290 direction: p.direction,
291 connected_at: Some(p.last_seen),
292 pool: p.pool,
293 })
294 .collect()
295 }
296
297 pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
301 let peers = self.state.peers.read().await;
302 let mut follows_connected = 0;
303 let mut follows_active = 0;
304 let mut other_connected = 0;
305 let mut other_active = 0;
306
307 for entry in peers.values() {
308 let is_active = entry.state == ConnectionState::Connected
311 || entry.state == ConnectionState::Connecting;
312
313 match entry.pool {
314 PeerPool::Follows => {
315 if is_active {
316 follows_active += 1;
317 }
318 if entry.state == ConnectionState::Connected {
319 follows_connected += 1;
320 }
321 }
322 PeerPool::Other => {
323 if is_active {
324 other_active += 1;
325 }
326 if entry.state == ConnectionState::Connected {
327 other_connected += 1;
328 }
329 }
330 }
331 }
332
333 (follows_connected, follows_active, other_connected, other_active)
334 }
335
336 fn can_accept_peer(&self, pool: PeerPool, pool_counts: &(usize, usize, usize, usize)) -> bool {
338 let (_, follows_active, _, other_active) = *pool_counts;
339 match pool {
340 PeerPool::Follows => follows_active < self.config.pools.follows.max_connections,
341 PeerPool::Other => other_active < self.config.pools.other.max_connections,
342 }
343 }
344
345 #[allow(dead_code)]
347 fn is_pool_satisfied(&self, pool: PeerPool, pool_counts: &(usize, usize, usize, usize)) -> bool {
348 let (follows_connected, _, other_connected, _) = *pool_counts;
349 match pool {
350 PeerPool::Follows => follows_connected >= self.config.pools.follows.satisfied_connections,
351 PeerPool::Other => other_connected >= self.config.pools.other.satisfied_connections,
352 }
353 }
354
355 #[allow(dead_code)]
357 fn is_satisfied(&self, pool_counts: &(usize, usize, usize, usize)) -> bool {
358 self.is_pool_satisfied(PeerPool::Follows, pool_counts)
359 && self.is_pool_satisfied(PeerPool::Other, pool_counts)
360 }
361
362 fn should_initiate(&self, their_uuid: &str) -> bool {
365 self.my_peer_id.uuid < their_uuid.to_string()
366 }
367
368 pub async fn run(&mut self) -> Result<()> {
370 info!(
371 "Starting WebRTC manager with peer ID: {}",
372 self.my_peer_id.short()
373 );
374
375 let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr::Event)>(100);
376
377 let mut signaling_rx = self.signaling_rx.take().expect("signaling_rx already taken");
379
380 let mut state_event_rx = self.state_event_rx.take().expect("state_event_rx already taken");
382
383 let (relay_write_tx, _) = tokio::sync::broadcast::channel::<SignalingMessage>(100);
385
386 for relay_url in &self.config.relays {
388 let url = relay_url.clone();
389 let event_tx = event_tx.clone();
390 let shutdown_rx = self.shutdown_rx.clone();
391 let keys = self.keys.clone();
392 let my_peer_id = self.my_peer_id.clone();
393 let hello_interval = Duration::from_millis(self.config.hello_interval_ms);
394 let relay_write_rx = relay_write_tx.subscribe();
395
396 tokio::spawn(async move {
397 if let Err(e) = Self::relay_task(
398 url.clone(),
399 event_tx,
400 shutdown_rx,
401 keys,
402 my_peer_id,
403 hello_interval,
404 relay_write_rx,
405 )
406 .await
407 {
408 error!("Relay {} error: {}", url, e);
409 }
410 });
411 }
412
413 let mut shutdown_rx = self.shutdown_rx.clone();
415 let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
417 loop {
418 tokio::select! {
419 _ = shutdown_rx.changed() => {
420 if *shutdown_rx.borrow() {
421 info!("WebRTC manager shutting down");
422 break;
423 }
424 }
425 Some((relay, event)) = event_rx.recv() => {
426 if let Err(e) = self.handle_event(&relay, &event, &relay_write_tx).await {
427 debug!("Error handling event from {}: {}", relay, e);
428 }
429 }
430 Some(msg) = signaling_rx.recv() => {
431 let _ = relay_write_tx.send(msg);
433 }
434 Some(event) = state_event_rx.recv() => {
435 self.handle_peer_state_event(event).await;
437 }
438 _ = cleanup_interval.tick() => {
439 self.cleanup_stale_peers().await;
441 }
442 }
443 }
444
445 Ok(())
446 }
447
448 async fn relay_task(
450 url: String,
451 event_tx: mpsc::Sender<(String, nostr::Event)>,
452 mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
453 keys: Keys,
454 my_peer_id: PeerId,
455 hello_interval: Duration,
456 mut signaling_rx: tokio::sync::broadcast::Receiver<SignalingMessage>,
457 ) -> Result<()> {
458 info!("Connecting to relay: {}", url);
459
460 let (ws_stream, _) = connect_async(&url).await?;
461 let (mut write, mut read) = ws_stream.split();
462
463 let hello_filter = Filter::new()
467 .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
468 .custom_tag(
469 nostr::SingleLetterTag::lowercase(nostr::Alphabet::L),
470 vec![HELLO_TAG],
471 )
472 .since(nostr::Timestamp::now() - Duration::from_secs(60));
473
474 let directed_filter = Filter::new()
475 .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
476 .custom_tag(
477 nostr::SingleLetterTag::lowercase(nostr::Alphabet::P),
478 vec![keys.public_key().to_hex()],
479 )
480 .since(nostr::Timestamp::now() - Duration::from_secs(60));
481
482 let sub_id = nostr::SubscriptionId::generate();
483 let sub_msg = ClientMessage::req(sub_id.clone(), vec![hello_filter, directed_filter]);
484 write.send(Message::Text(sub_msg.as_json().into())).await?;
485
486 info!("Subscribed to {} for WebRTC events (kind {})", url, WEBRTC_KIND);
487
488 let mut last_hello = Instant::now() - hello_interval; let mut hello_ticker = tokio::time::interval(Duration::from_secs(1));
490
491 loop {
492 tokio::select! {
493 _ = shutdown_rx.changed() => {
494 if *shutdown_rx.borrow() {
495 break;
496 }
497 }
498 _ = hello_ticker.tick() => {
499 if last_hello.elapsed() >= hello_interval {
501 let hello = SignalingMessage::hello(&my_peer_id.uuid);
502 if let Ok(event) = Self::create_signaling_event(&keys, &hello).await {
503 let msg = ClientMessage::event(event);
504 if write.send(Message::Text(msg.as_json().into())).await.is_ok() {
505 debug!("Sent hello to {}", url);
506 }
507 }
508 last_hello = Instant::now();
509 }
510 }
511 Ok(signaling_msg) = signaling_rx.recv() => {
513 info!("Sending {} via {}", signaling_msg.msg_type(), url);
514 if let Ok(event) = Self::create_signaling_event(&keys, &signaling_msg).await {
515 let event_id = event.id.to_string();
516 let msg = ClientMessage::event(event);
517 if write.send(Message::Text(msg.as_json().into())).await.is_ok() {
518 info!("Sent {} to {} (event id: {})", signaling_msg.msg_type(), url, &event_id[..16]);
519 }
520 }
521 }
522 msg = read.next() => {
523 match msg {
524 Some(Ok(Message::Text(text))) => {
525 if let Ok(relay_msg) = RelayMessage::from_json(&text) {
526 if let RelayMessage::Event { event, .. } = relay_msg {
527 let _ = event_tx.send((url.clone(), *event)).await;
528 }
529 }
530 }
531 Some(Err(e)) => {
532 error!("WebSocket error from {}: {}", url, e);
533 break;
534 }
535 None => {
536 warn!("WebSocket closed: {}", url);
537 break;
538 }
539 _ => {}
540 }
541 }
542 }
543 }
544
545 Ok(())
546 }
547
548 async fn create_signaling_event(keys: &Keys, msg: &SignalingMessage) -> Result<nostr::Event> {
554 if let Some(recipient_str) = msg.recipient() {
556 if let Some(peer_id) = PeerId::from_string(recipient_str) {
558 let recipient_pubkey = PublicKey::from_hex(&peer_id.pubkey)?;
559
560 let seal = serde_json::json!({
562 "pubkey": keys.public_key().to_hex(),
563 "kind": WEBRTC_KIND,
564 "content": serde_json::to_string(msg)?,
565 "tags": []
566 });
567
568 let ephemeral_keys = Keys::generate();
570
571 let encrypted_content = nip44::encrypt(
573 ephemeral_keys.secret_key(),
574 &recipient_pubkey,
575 &seal.to_string(),
576 nip44::Version::V2
577 )?;
578
579 let created_at = nostr::Timestamp::now();
581 let expiration = created_at + Duration::from_secs(5 * 60); let tags = vec![
584 Tag::parse(["p", &recipient_pubkey.to_hex()])?,
585 Tag::parse(["expiration", &expiration.as_u64().to_string()])?,
586 ];
587
588 let event = EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), encrypted_content)
589 .tags(tags)
590 .sign(&ephemeral_keys)
591 .await?;
592
593 return Ok(event);
594 }
595 }
596
597 let tags = vec![
599 Tag::parse(["l", HELLO_TAG])?,
600 Tag::parse(["peerId", msg.peer_id()])?,
601 ];
602
603 let event = EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), "")
604 .tags(tags)
605 .sign(keys)
606 .await?;
607
608 Ok(event)
609 }
610
611 async fn handle_event(
617 &self,
618 relay: &str,
619 event: &nostr::Event,
620 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
621 ) -> Result<()> {
622 if event.kind != Kind::Ephemeral(WEBRTC_KIND as u16) {
624 return Ok(());
625 }
626
627 let get_tag = |name: &str| -> Option<String> {
629 event.tags.iter().find_map(|tag| {
630 let v: Vec<String> = tag.clone().to_vec();
631 if v.len() >= 2 && v[0] == name {
632 Some(v[1].clone())
633 } else {
634 None
635 }
636 })
637 };
638
639 let l_tag = get_tag("l");
641 if l_tag.as_deref() == Some(HELLO_TAG) {
642 let sender_pubkey = event.pubkey.to_hex();
643
644 if sender_pubkey == self.my_peer_id.pubkey {
646 return Ok(());
647 }
648
649 if let Some(their_uuid) = get_tag("peerId") {
650 debug!("Received hello from {} via {}", &sender_pubkey[..8], relay);
651 self.handle_hello(&sender_pubkey, &their_uuid, relay_write_tx)
652 .await?;
653 }
654 return Ok(());
655 }
656
657 let p_tag = get_tag("p");
659 if p_tag.as_deref() != Some(&self.keys.public_key().to_hex()) {
660 return Ok(());
662 }
663
664 if event.content.is_empty() {
666 return Ok(());
667 }
668
669 let seal: serde_json::Value = match nip44::decrypt(self.keys.secret_key(), &event.pubkey, &event.content) {
671 Ok(plaintext) => {
672 match serde_json::from_str(&plaintext) {
673 Ok(v) => v,
674 Err(_) => return Ok(()),
675 }
676 }
677 Err(_) => {
678 return Ok(());
680 }
681 };
682
683 let sender_pubkey = seal.get("pubkey")
685 .and_then(|v| v.as_str())
686 .ok_or_else(|| anyhow::anyhow!("Missing pubkey in seal"))?;
687
688 if sender_pubkey == self.my_peer_id.pubkey {
690 return Ok(());
691 }
692
693 let content = seal.get("content")
694 .and_then(|v| v.as_str())
695 .ok_or_else(|| anyhow::anyhow!("Missing content in seal"))?;
696
697 let msg: SignalingMessage = serde_json::from_str(content)?;
698
699 debug!(
700 "Received {} from {} via {} (gift-wrapped)",
701 msg.msg_type(),
702 &sender_pubkey[..8],
703 relay
704 );
705
706 match msg {
707 SignalingMessage::Hello { .. } => {
708 return Ok(());
710 }
711 SignalingMessage::Offer {
712 recipient,
713 peer_id: their_uuid,
714 offer,
715 } => {
716 if recipient != self.my_peer_id.to_string() {
717 return Ok(()); }
719 self.handle_offer(&sender_pubkey, &their_uuid, offer, relay_write_tx)
720 .await?;
721 }
722 SignalingMessage::Answer {
723 recipient,
724 peer_id: their_uuid,
725 answer,
726 } => {
727 if recipient != self.my_peer_id.to_string() {
728 return Ok(());
729 }
730 self.handle_answer(&sender_pubkey, &their_uuid, answer)
731 .await?;
732 }
733 SignalingMessage::Candidate {
734 recipient,
735 peer_id: their_uuid,
736 candidate,
737 } => {
738 if recipient != self.my_peer_id.to_string() {
739 return Ok(());
740 }
741 self.handle_candidate(&sender_pubkey, &their_uuid, candidate)
742 .await?;
743 }
744 SignalingMessage::Candidates {
745 recipient,
746 peer_id: their_uuid,
747 candidates,
748 } => {
749 if recipient != self.my_peer_id.to_string() {
750 return Ok(());
751 }
752 self.handle_candidates(&sender_pubkey, &their_uuid, candidates)
753 .await?;
754 }
755 }
756
757 Ok(())
758 }
759
760 async fn handle_hello(
762 &self,
763 sender_pubkey: &str,
764 their_uuid: &str,
765 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
766 ) -> Result<()> {
767 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
768 let peer_key = full_peer_id.to_string();
769
770 {
772 let peers = self.state.peers.read().await;
773 if let Some(entry) = peers.get(&peer_key) {
774 if entry.state == ConnectionState::Connected
776 || entry.state == ConnectionState::Connecting
777 {
778 return Ok(());
779 }
780 }
781 }
782
783 let pool = (self.peer_classifier)(sender_pubkey);
785
786 let pool_counts = self.get_pool_counts().await;
788 if !self.can_accept_peer(pool, &pool_counts) {
789 debug!("Ignoring hello from {} - pool {:?} is full", full_peer_id.short(), pool);
790 return Ok(());
791 }
792
793 let should_initiate = self.should_initiate(their_uuid);
795
796 let pool_satisfied = self.is_pool_satisfied(pool, &pool_counts);
799 let will_initiate = should_initiate && !pool_satisfied;
800
801 info!(
802 "Discovered peer: {} (pool: {:?}, initiate: {}, pool_satisfied: {})",
803 full_peer_id.short(),
804 pool,
805 will_initiate,
806 pool_satisfied
807 );
808
809 if !will_initiate && pool_satisfied {
812 debug!("Pool {:?} is satisfied, not tracking peer {}", pool, full_peer_id.short());
813 return Ok(());
814 }
815
816 {
818 let mut peers = self.state.peers.write().await;
819 peers.insert(
820 peer_key.clone(),
821 PeerEntry {
822 peer_id: full_peer_id.clone(),
823 direction: if will_initiate {
824 PeerDirection::Outbound
825 } else {
826 PeerDirection::Inbound
827 },
828 state: ConnectionState::Discovered,
829 last_seen: Instant::now(),
830 peer: None,
831 pool,
832 },
833 );
834 }
835
836 if will_initiate {
838 self.initiate_connection(&full_peer_id, pool, relay_write_tx)
839 .await?;
840 }
841
842 Ok(())
843 }
844
845 async fn initiate_connection(
847 &self,
848 peer_id: &PeerId,
849 pool: PeerPool,
850 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
851 ) -> Result<()> {
852 let peer_key = peer_id.to_string();
853
854 info!("Initiating connection to {} (pool: {:?})", peer_id.short(), pool);
855
856 let mut peer = Peer::new_with_store_and_events(
858 peer_id.clone(),
859 PeerDirection::Outbound,
860 self.my_peer_id.clone(),
861 self.signaling_tx.clone(),
862 self.config.stun_servers.clone(),
863 self.store.clone(),
864 Some(self.state_event_tx.clone()),
865 )
866 .await?;
867
868 peer.setup_handlers().await?;
869
870 let offer = peer.connect().await?;
872
873 {
875 let mut peers = self.state.peers.write().await;
876 if let Some(entry) = peers.get_mut(&peer_key) {
877 entry.state = ConnectionState::Connecting;
878 entry.peer = Some(peer);
879 entry.pool = pool;
880 }
881 }
882
883 let offer_msg = SignalingMessage::Offer {
885 offer,
886 recipient: peer_id.to_string(),
887 peer_id: self.my_peer_id.uuid.clone(),
888 };
889 if relay_write_tx.send(offer_msg).is_err() {
890 warn!("Failed to broadcast offer to {}", peer_id.short());
891 }
892
893 info!("Sent offer to {}", peer_id.short());
894
895 Ok(())
896 }
897
898 async fn handle_offer(
900 &self,
901 sender_pubkey: &str,
902 their_uuid: &str,
903 offer: serde_json::Value,
904 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
905 ) -> Result<()> {
906 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
907 let peer_key = full_peer_id.to_string();
908
909 let pool = (self.peer_classifier)(sender_pubkey);
911
912 info!("Received offer from {} (pool: {:?})", full_peer_id.short(), pool);
913
914 {
916 let peers = self.state.peers.read().await;
917 if let Some(entry) = peers.get(&peer_key) {
918 if entry.peer.is_some() {
920 debug!("Already have peer {} with connection, skipping offer", full_peer_id.short());
921 return Ok(());
922 }
923 }
924 }
925
926 let pool_counts = self.get_pool_counts().await;
928 if !self.can_accept_peer(pool, &pool_counts) {
929 warn!("Rejecting offer from {} - pool {:?} is full", full_peer_id.short(), pool);
930 return Ok(());
931 }
932 let mut peer = Peer::new_with_store_and_events(
934 full_peer_id.clone(),
935 PeerDirection::Inbound,
936 self.my_peer_id.clone(),
937 self.signaling_tx.clone(),
938 self.config.stun_servers.clone(),
939 self.store.clone(),
940 Some(self.state_event_tx.clone()),
941 )
942 .await?;
943
944 peer.setup_handlers().await?;
945
946 let answer = peer.handle_offer(offer).await?;
948
949 {
951 let mut peers = self.state.peers.write().await;
952 peers.insert(
953 peer_key,
954 PeerEntry {
955 peer_id: full_peer_id.clone(),
956 direction: PeerDirection::Inbound,
957 state: ConnectionState::Connecting,
958 last_seen: Instant::now(),
959 peer: Some(peer),
960 pool,
961 },
962 );
963 }
964
965 let answer_msg = SignalingMessage::Answer {
967 answer,
968 recipient: full_peer_id.to_string(),
969 peer_id: self.my_peer_id.to_string(),
970 };
971 if relay_write_tx.send(answer_msg).is_err() {
972 warn!("Failed to send answer to {}", full_peer_id.short());
973 }
974 info!("Sent answer to {}", full_peer_id.short());
975
976 Ok(())
977 }
978
979 async fn handle_answer(
981 &self,
982 sender_pubkey: &str,
983 their_uuid: &str,
984 answer: serde_json::Value,
985 ) -> Result<()> {
986 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
987 let peer_key = full_peer_id.to_string();
988
989 info!("Received answer from {}", full_peer_id.short());
990
991 let mut peers = self.state.peers.write().await;
992 if let Some(entry) = peers.get_mut(&peer_key) {
993 if entry.state == ConnectionState::Connected {
995 debug!("Ignoring duplicate answer from {} - already connected", full_peer_id.short());
996 return Ok(());
997 }
998 if let Some(ref mut peer) = entry.peer {
999 use webrtc::peer_connection::signaling_state::RTCSignalingState;
1001 let signaling_state = peer.signaling_state();
1002 if signaling_state != RTCSignalingState::HaveLocalOffer {
1003 debug!(
1004 "Ignoring answer from {} - signaling state is {:?}, not HaveLocalOffer",
1005 full_peer_id.short(),
1006 signaling_state
1007 );
1008 return Ok(());
1009 }
1010 peer.handle_answer(answer).await?;
1011 info!("Applied answer from {}", full_peer_id.short());
1012 }
1013 }
1014
1015 Ok(())
1016 }
1017
1018 async fn handle_candidate(
1020 &self,
1021 sender_pubkey: &str,
1022 their_uuid: &str,
1023 candidate: serde_json::Value,
1024 ) -> Result<()> {
1025 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1026 let peer_key = full_peer_id.to_string();
1027
1028 info!("Received ICE candidate from {}", full_peer_id.short());
1029
1030 let mut peers = self.state.peers.write().await;
1031 if let Some(entry) = peers.get_mut(&peer_key) {
1032 if let Some(ref mut peer) = entry.peer {
1033 peer.handle_candidate(candidate).await?;
1034 }
1035 }
1036
1037 Ok(())
1038 }
1039
1040 async fn handle_candidates(
1042 &self,
1043 sender_pubkey: &str,
1044 their_uuid: &str,
1045 candidates: Vec<serde_json::Value>,
1046 ) -> Result<()> {
1047 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1048 let peer_key = full_peer_id.to_string();
1049
1050 debug!(
1051 "Received {} candidates from {}",
1052 candidates.len(),
1053 full_peer_id.short()
1054 );
1055
1056 let mut peers = self.state.peers.write().await;
1057 if let Some(entry) = peers.get_mut(&peer_key) {
1058 if let Some(ref mut peer) = entry.peer {
1059 for candidate in candidates {
1060 if let Err(e) = peer.handle_candidate(candidate).await {
1061 debug!("Failed to add candidate: {}", e);
1062 }
1063 }
1064 }
1065 }
1066
1067 Ok(())
1068 }
1069
1070 async fn handle_peer_state_event(&self, event: PeerStateEvent) {
1072 match event {
1073 PeerStateEvent::Connected(peer_id) => {
1074 let peer_key = peer_id.to_string();
1075 let mut peers = self.state.peers.write().await;
1076 if let Some(entry) = peers.get_mut(&peer_key) {
1077 if entry.state != ConnectionState::Connected {
1078 info!("Peer {} connected (via state event)", peer_id.short());
1079 entry.state = ConnectionState::Connected;
1080 self.state.connected_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1082 }
1083 }
1084 }
1085 PeerStateEvent::Failed(peer_id) => {
1086 let peer_key = peer_id.to_string();
1087 info!("Peer {} connection failed - removing from pool", peer_id.short());
1088 let mut peers = self.state.peers.write().await;
1089 if let Some(entry) = peers.remove(&peer_key) {
1090 if entry.state == ConnectionState::Connected {
1092 self.state.connected_count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
1093 }
1094 if let Some(peer) = entry.peer {
1096 let _ = peer.close().await;
1097 }
1098 }
1099 }
1100 PeerStateEvent::Disconnected(peer_id) => {
1101 let peer_key = peer_id.to_string();
1102 info!("Peer {} disconnected - removing from pool", peer_id.short());
1103 let mut peers = self.state.peers.write().await;
1104 if let Some(entry) = peers.remove(&peer_key) {
1105 if entry.state == ConnectionState::Connected {
1107 self.state.connected_count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
1108 }
1109 if let Some(peer) = entry.peer {
1111 let _ = peer.close().await;
1112 }
1113 }
1114 }
1115 }
1116 }
1117
1118 async fn cleanup_stale_peers(&self) {
1120 let mut peers = self.state.peers.write().await;
1121 let mut connected_count = 0;
1122 let mut to_remove = Vec::new();
1123 let stale_timeout = Duration::from_secs(60); for (key, entry) in peers.iter_mut() {
1126 if let Some(ref peer) = entry.peer {
1127 if peer.is_connected() {
1129 if entry.state != ConnectionState::Connected {
1130 info!("Peer {} is now connected (sync fallback)", entry.peer_id.short());
1131 entry.state = ConnectionState::Connected;
1132 }
1133 connected_count += 1;
1134 } else if entry.state == ConnectionState::Connecting && entry.last_seen.elapsed() > stale_timeout {
1135 info!("Removing stale peer {} (stuck in Connecting for {:?})", entry.peer_id.short(), entry.last_seen.elapsed());
1137 to_remove.push(key.clone());
1138 }
1139 } else if entry.state == ConnectionState::Discovered && entry.last_seen.elapsed() > stale_timeout {
1140 debug!("Removing stale discovered peer {}", entry.peer_id.short());
1142 to_remove.push(key.clone());
1143 }
1144 }
1145
1146 for key in to_remove {
1148 if let Some(entry) = peers.remove(&key) {
1149 if let Some(peer) = entry.peer {
1150 let _ = peer.close().await;
1151 }
1152 }
1153 }
1154
1155 self.state
1156 .connected_count
1157 .store(connected_count, std::sync::atomic::Ordering::Relaxed);
1158 }
1159}
1160
1161#[allow(dead_code)]
1163#[derive(Debug, Clone)]
1164pub struct PeerState {
1165 pub peer_id: PeerId,
1166 pub direction: PeerDirection,
1167 pub state: String,
1168 pub last_seen: Instant,
1169}