1use anyhow::Result;
13use futures::{SinkExt, StreamExt};
14use nostr::{nips::nip44, ClientMessage, EventBuilder, Filter, JsonUtil, Keys, Kind, PublicKey, RelayMessage, Tag};
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use tokio::sync::{mpsc, Mutex, RwLock};
19use tokio_tungstenite::{connect_async, tungstenite::Message};
20use tracing::{debug, error, info, warn};
21
22use super::peer::{ContentStore, Peer, PendingRequest};
23use super::types::{
24 PeerDirection, PeerId, PeerPool, PeerStateEvent, PeerStatus, SignalingMessage, WebRTCConfig, WEBRTC_KIND, HELLO_TAG,
25};
26
27pub type PeerClassifier = Arc<dyn Fn(&str) -> PeerPool + Send + Sync>;
29
30#[derive(Debug, Clone, PartialEq)]
32pub enum ConnectionState {
33 Discovered,
34 Connecting,
35 Connected,
36 Failed,
37}
38
39impl std::fmt::Display for ConnectionState {
40 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 match self {
42 ConnectionState::Discovered => write!(f, "discovered"),
43 ConnectionState::Connecting => write!(f, "connecting"),
44 ConnectionState::Connected => write!(f, "connected"),
45 ConnectionState::Failed => write!(f, "failed"),
46 }
47 }
48}
49
50pub struct PeerEntry {
52 pub peer_id: PeerId,
53 pub direction: PeerDirection,
54 pub state: ConnectionState,
55 pub last_seen: Instant,
56 pub peer: Option<Peer>,
57 pub pool: PeerPool,
58}
59
60pub struct WebRTCState {
62 pub peers: RwLock<HashMap<String, PeerEntry>>,
63 pub connected_count: std::sync::atomic::AtomicUsize,
64}
65
66impl WebRTCState {
67 pub fn new() -> Self {
68 Self {
69 peers: RwLock::new(HashMap::new()),
70 connected_count: std::sync::atomic::AtomicUsize::new(0),
71 }
72 }
73
74 pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
78 use super::types::{DataRequest, MAX_HTL, encode_request};
79
80 let peers = self.peers.read().await;
81
82 let peer_refs: Vec<_> = peers
85 .values()
86 .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
87 .filter_map(|p| {
88 p.peer.as_ref().map(|peer| {
89 (p.peer_id.short(), peer.data_channel.clone(), peer.pending_requests.clone())
90 })
91 })
92 .collect();
93
94 drop(peers); let mut connected_peers: Vec<(String, Arc<Mutex<HashMap<String, PendingRequest>>>, Arc<webrtc::data_channel::RTCDataChannel>)> = Vec::new();
98 for (peer_id, dc_mutex, pending) in peer_refs {
99 let dc_guard = dc_mutex.lock().await;
100 if let Some(dc) = dc_guard.as_ref() {
101 connected_peers.push((peer_id, pending, dc.clone()));
102 }
103 }
104
105 if connected_peers.is_empty() {
106 debug!("No connected peers to query for {}", &hash_hex[..8.min(hash_hex.len())]);
107 return None;
108 }
109
110 debug!(
111 "Querying {} connected peers for {} (sequential with 500ms delay)",
112 connected_peers.len(),
113 &hash_hex[..8.min(hash_hex.len())]
114 );
115
116 let hash_bytes = match hex::decode(hash_hex) {
118 Ok(b) => b,
119 Err(_) => return None,
120 };
121
122 for (_i, (peer_id, pending_requests, dc)) in connected_peers.into_iter().enumerate() {
124 debug!("Querying peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
125
126 let (tx, rx) = tokio::sync::oneshot::channel();
128
129 {
131 let mut pending = pending_requests.lock().await;
132 pending.insert(
133 hash_hex.to_string(),
134 super::PendingRequest {
135 hash: hash_bytes.clone(),
136 response_tx: tx,
137 },
138 );
139 }
140
141 let req = DataRequest {
143 h: hash_bytes.clone(),
144 htl: MAX_HTL,
145 };
146 if let Ok(wire) = encode_request(&req) {
147 if dc.send(&bytes::Bytes::from(wire)).await.is_ok() {
148 match tokio::time::timeout(std::time::Duration::from_millis(500), rx).await {
150 Ok(Ok(Some(data))) => {
151 debug!("Got response from peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
152 return Some(data);
153 }
154 _ => {
155 debug!("No response from peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
157 }
158 }
159 }
160 }
161
162 let mut pending = pending_requests.lock().await;
164 pending.remove(hash_hex);
165 }
166
167 debug!("No peer had data for {}", &hash_hex[..8.min(hash_hex.len())]);
168 None
169 }
170}
171
172pub struct WebRTCManager {
174 config: WebRTCConfig,
175 my_peer_id: PeerId,
176 keys: Keys,
177 state: Arc<WebRTCState>,
178 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
179 shutdown_rx: tokio::sync::watch::Receiver<bool>,
180 signaling_tx: mpsc::Sender<SignalingMessage>,
182 signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
183 store: Option<Arc<dyn ContentStore>>,
185 peer_classifier: PeerClassifier,
187 state_event_tx: mpsc::Sender<PeerStateEvent>,
189 state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
190}
191
192impl WebRTCManager {
193 pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
195 let pubkey = keys.public_key().to_hex();
196 let my_peer_id = PeerId::new(pubkey, None);
197 let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
198 let (signaling_tx, signaling_rx) = mpsc::channel(100);
199 let (state_event_tx, state_event_rx) = mpsc::channel(100);
200
201 let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
203
204 Self {
205 config,
206 my_peer_id,
207 keys,
208 state: Arc::new(WebRTCState::new()),
209 shutdown: Arc::new(shutdown),
210 shutdown_rx,
211 signaling_tx,
212 signaling_rx: Some(signaling_rx),
213 store: None,
214 peer_classifier,
215 state_event_tx,
216 state_event_rx: Some(state_event_rx),
217 }
218 }
219
220 pub fn new_with_classifier(keys: Keys, config: WebRTCConfig, classifier: PeerClassifier) -> Self {
222 let mut manager = Self::new(keys, config);
223 manager.peer_classifier = classifier;
224 manager
225 }
226
227 pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
229 let mut manager = Self::new(keys, config);
230 manager.store = Some(store);
231 manager
232 }
233
234 pub fn new_with_store_and_classifier(
236 keys: Keys,
237 config: WebRTCConfig,
238 store: Arc<dyn ContentStore>,
239 classifier: PeerClassifier,
240 ) -> Self {
241 let mut manager = Self::new(keys, config);
242 manager.store = Some(store);
243 manager.peer_classifier = classifier;
244 manager
245 }
246
247 pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
249 self.store = Some(store);
250 }
251
252 pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
254 self.peer_classifier = classifier;
255 }
256
257 pub fn my_peer_id(&self) -> &PeerId {
259 &self.my_peer_id
260 }
261
262 pub fn state(&self) -> Arc<WebRTCState> {
264 self.state.clone()
265 }
266
267 pub fn shutdown(&self) {
269 let _ = self.shutdown.send(true);
270 }
271
272 pub async fn connected_count(&self) -> usize {
274 self.state
275 .connected_count
276 .load(std::sync::atomic::Ordering::Relaxed)
277 }
278
279 pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
281 self.state
282 .peers
283 .read()
284 .await
285 .values()
286 .map(|p| PeerStatus {
287 peer_id: p.peer_id.to_string(),
288 pubkey: p.peer_id.pubkey.clone(),
289 state: p.state.to_string(),
290 direction: p.direction,
291 connected_at: Some(p.last_seen),
292 pool: p.pool,
293 })
294 .collect()
295 }
296
297 pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
301 let peers = self.state.peers.read().await;
302 let mut follows_connected = 0;
303 let mut follows_active = 0;
304 let mut other_connected = 0;
305 let mut other_active = 0;
306
307 for entry in peers.values() {
308 let is_active = entry.state == ConnectionState::Connected
311 || entry.state == ConnectionState::Connecting;
312
313 match entry.pool {
314 PeerPool::Follows => {
315 if is_active {
316 follows_active += 1;
317 }
318 if entry.state == ConnectionState::Connected {
319 follows_connected += 1;
320 }
321 }
322 PeerPool::Other => {
323 if is_active {
324 other_active += 1;
325 }
326 if entry.state == ConnectionState::Connected {
327 other_connected += 1;
328 }
329 }
330 }
331 }
332
333 (follows_connected, follows_active, other_connected, other_active)
334 }
335
336 fn can_accept_peer(&self, pool: PeerPool, pool_counts: &(usize, usize, usize, usize)) -> bool {
338 let (_, follows_active, _, other_active) = *pool_counts;
339 match pool {
340 PeerPool::Follows => follows_active < self.config.pools.follows.max_connections,
341 PeerPool::Other => other_active < self.config.pools.other.max_connections,
342 }
343 }
344
345 #[allow(dead_code)]
347 fn is_pool_satisfied(&self, pool: PeerPool, pool_counts: &(usize, usize, usize, usize)) -> bool {
348 let (follows_connected, _, other_connected, _) = *pool_counts;
349 match pool {
350 PeerPool::Follows => follows_connected >= self.config.pools.follows.satisfied_connections,
351 PeerPool::Other => other_connected >= self.config.pools.other.satisfied_connections,
352 }
353 }
354
355 #[allow(dead_code)]
357 fn is_satisfied(&self, pool_counts: &(usize, usize, usize, usize)) -> bool {
358 self.is_pool_satisfied(PeerPool::Follows, pool_counts)
359 && self.is_pool_satisfied(PeerPool::Other, pool_counts)
360 }
361
362 fn should_initiate(&self, their_uuid: &str) -> bool {
365 self.my_peer_id.uuid < their_uuid.to_string()
366 }
367
368 pub async fn run(&mut self) -> Result<()> {
370 info!(
371 "Starting WebRTC manager with peer ID: {}",
372 self.my_peer_id.short()
373 );
374
375 let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr::Event)>(100);
376
377 let mut signaling_rx = self.signaling_rx.take().expect("signaling_rx already taken");
379
380 let mut state_event_rx = self.state_event_rx.take().expect("state_event_rx already taken");
382
383 let (relay_write_tx, _) = tokio::sync::broadcast::channel::<SignalingMessage>(100);
385
386 for relay_url in &self.config.relays {
388 let url = relay_url.clone();
389 let event_tx = event_tx.clone();
390 let shutdown_rx = self.shutdown_rx.clone();
391 let keys = self.keys.clone();
392 let my_peer_id = self.my_peer_id.clone();
393 let hello_interval = Duration::from_millis(self.config.hello_interval_ms);
394 let relay_write_rx = relay_write_tx.subscribe();
395
396 tokio::spawn(async move {
397 if let Err(e) = Self::relay_task(
398 url.clone(),
399 event_tx,
400 shutdown_rx,
401 keys,
402 my_peer_id,
403 hello_interval,
404 relay_write_rx,
405 )
406 .await
407 {
408 error!("Relay {} error: {}", url, e);
409 }
410 });
411 }
412
413 let mut shutdown_rx = self.shutdown_rx.clone();
415 let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
417 loop {
418 tokio::select! {
419 _ = shutdown_rx.changed() => {
420 if *shutdown_rx.borrow() {
421 info!("WebRTC manager shutting down");
422 break;
423 }
424 }
425 Some((relay, event)) = event_rx.recv() => {
426 if let Err(e) = self.handle_event(&relay, &event, &relay_write_tx).await {
427 debug!("Error handling event from {}: {}", relay, e);
428 }
429 }
430 Some(msg) = signaling_rx.recv() => {
431 let _ = relay_write_tx.send(msg);
433 }
434 Some(event) = state_event_rx.recv() => {
435 self.handle_peer_state_event(event).await;
437 }
438 _ = cleanup_interval.tick() => {
439 self.cleanup_stale_peers().await;
441 }
442 }
443 }
444
445 Ok(())
446 }
447
448 async fn relay_task(
450 url: String,
451 event_tx: mpsc::Sender<(String, nostr::Event)>,
452 mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
453 keys: Keys,
454 my_peer_id: PeerId,
455 hello_interval: Duration,
456 mut signaling_rx: tokio::sync::broadcast::Receiver<SignalingMessage>,
457 ) -> Result<()> {
458 info!("Connecting to relay: {}", url);
459
460 let (ws_stream, _) = connect_async(&url).await?;
461 let (mut write, mut read) = ws_stream.split();
462
463 let hello_filter = Filter::new()
467 .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
468 .custom_tag(
469 nostr::SingleLetterTag::lowercase(nostr::Alphabet::L),
470 vec![HELLO_TAG],
471 )
472 .since(nostr::Timestamp::now() - Duration::from_secs(60));
473
474 let directed_filter = Filter::new()
475 .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
476 .custom_tag(
477 nostr::SingleLetterTag::lowercase(nostr::Alphabet::P),
478 vec![keys.public_key().to_hex()],
479 )
480 .since(nostr::Timestamp::now() - Duration::from_secs(60));
481
482 let sub_id = nostr::SubscriptionId::generate();
483 let sub_msg = ClientMessage::req(sub_id.clone(), vec![hello_filter, directed_filter]);
484 write.send(Message::Text(sub_msg.as_json().into())).await?;
485
486 info!("Subscribed to {} for WebRTC events (kind {})", url, WEBRTC_KIND);
487
488 let mut last_hello = Instant::now() - hello_interval; let mut hello_ticker = tokio::time::interval(Duration::from_secs(1));
490
491 loop {
492 tokio::select! {
493 _ = shutdown_rx.changed() => {
494 if *shutdown_rx.borrow() {
495 break;
496 }
497 }
498 _ = hello_ticker.tick() => {
499 if last_hello.elapsed() >= hello_interval {
501 let hello = SignalingMessage::hello(&my_peer_id.uuid);
502 if let Ok(event) = Self::create_signaling_event(&keys, &hello).await {
503 let msg = ClientMessage::event(event);
504 if write.send(Message::Text(msg.as_json().into())).await.is_ok() {
505 debug!("Sent hello to {}", url);
506 }
507 }
508 last_hello = Instant::now();
509 }
510 }
511 Ok(signaling_msg) = signaling_rx.recv() => {
513 info!("Sending {} via {}", signaling_msg.msg_type(), url);
514 if let Ok(event) = Self::create_signaling_event(&keys, &signaling_msg).await {
515 let event_id = event.id.to_string();
516 let msg = ClientMessage::event(event);
517 if write.send(Message::Text(msg.as_json().into())).await.is_ok() {
518 info!("Sent {} to {} (event id: {})", signaling_msg.msg_type(), url, &event_id[..16]);
519 }
520 }
521 }
522 msg = read.next() => {
523 match msg {
524 Some(Ok(Message::Text(text))) => {
525 if let Ok(relay_msg) = RelayMessage::from_json(&text) {
526 if let RelayMessage::Event { event, .. } = relay_msg {
527 let _ = event_tx.send((url.clone(), *event)).await;
528 }
529 }
530 }
531 Some(Err(e)) => {
532 error!("WebSocket error from {}: {}", url, e);
533 break;
534 }
535 None => {
536 warn!("WebSocket closed: {}", url);
537 break;
538 }
539 _ => {}
540 }
541 }
542 }
543 }
544
545 Ok(())
546 }
547
548 async fn create_signaling_event(keys: &Keys, msg: &SignalingMessage) -> Result<nostr::Event> {
554 if let Some(recipient_str) = msg.recipient() {
556 if let Some(peer_id) = PeerId::from_string(recipient_str) {
558 let recipient_pubkey = PublicKey::from_hex(&peer_id.pubkey)?;
559
560 let seal = serde_json::json!({
562 "pubkey": keys.public_key().to_hex(),
563 "kind": WEBRTC_KIND,
564 "content": serde_json::to_string(msg)?,
565 "tags": []
566 });
567
568 let ephemeral_keys = Keys::generate();
570
571 let encrypted_content = nip44::encrypt(
573 ephemeral_keys.secret_key(),
574 &recipient_pubkey,
575 &seal.to_string(),
576 nip44::Version::V2
577 )?;
578
579 let created_at = nostr::Timestamp::now();
581 let expiration = created_at + Duration::from_secs(5 * 60); let tags = vec![
584 Tag::parse(&["p", &recipient_pubkey.to_hex()])?,
585 Tag::parse(&["expiration", &expiration.as_u64().to_string()])?,
586 ];
587
588 let event = EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), encrypted_content, tags)
589 .to_event(&ephemeral_keys)?;
590
591 return Ok(event);
592 }
593 }
594
595 let tags = vec![
597 Tag::parse(&["l", HELLO_TAG])?,
598 Tag::parse(&["peerId", msg.peer_id()])?,
599 ];
600
601 let event = EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), "", tags)
602 .to_event(keys)?;
603
604 Ok(event)
605 }
606
607 async fn handle_event(
613 &self,
614 relay: &str,
615 event: &nostr::Event,
616 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
617 ) -> Result<()> {
618 if event.kind != Kind::Ephemeral(WEBRTC_KIND as u16) {
620 return Ok(());
621 }
622
623 let get_tag = |name: &str| -> Option<String> {
625 event.tags.iter().find_map(|tag| {
626 let v: Vec<String> = tag.clone().to_vec();
627 if v.len() >= 2 && v[0] == name {
628 Some(v[1].clone())
629 } else {
630 None
631 }
632 })
633 };
634
635 let l_tag = get_tag("l");
637 if l_tag.as_deref() == Some(HELLO_TAG) {
638 let sender_pubkey = event.pubkey.to_hex();
639
640 if sender_pubkey == self.my_peer_id.pubkey {
642 return Ok(());
643 }
644
645 if let Some(their_uuid) = get_tag("peerId") {
646 debug!("Received hello from {} via {}", &sender_pubkey[..8], relay);
647 self.handle_hello(&sender_pubkey, &their_uuid, relay_write_tx)
648 .await?;
649 }
650 return Ok(());
651 }
652
653 let p_tag = get_tag("p");
655 if p_tag.as_deref() != Some(&self.keys.public_key().to_hex()) {
656 return Ok(());
658 }
659
660 if event.content.is_empty() {
662 return Ok(());
663 }
664
665 let seal: serde_json::Value = match nip44::decrypt(self.keys.secret_key(), &event.pubkey, &event.content) {
667 Ok(plaintext) => {
668 match serde_json::from_str(&plaintext) {
669 Ok(v) => v,
670 Err(_) => return Ok(()),
671 }
672 }
673 Err(_) => {
674 return Ok(());
676 }
677 };
678
679 let sender_pubkey = seal.get("pubkey")
681 .and_then(|v| v.as_str())
682 .ok_or_else(|| anyhow::anyhow!("Missing pubkey in seal"))?;
683
684 if sender_pubkey == self.my_peer_id.pubkey {
686 return Ok(());
687 }
688
689 let content = seal.get("content")
690 .and_then(|v| v.as_str())
691 .ok_or_else(|| anyhow::anyhow!("Missing content in seal"))?;
692
693 let msg: SignalingMessage = serde_json::from_str(content)?;
694
695 debug!(
696 "Received {} from {} via {} (gift-wrapped)",
697 msg.msg_type(),
698 &sender_pubkey[..8],
699 relay
700 );
701
702 match msg {
703 SignalingMessage::Hello { .. } => {
704 return Ok(());
706 }
707 SignalingMessage::Offer {
708 recipient,
709 peer_id: their_uuid,
710 offer,
711 } => {
712 if recipient != self.my_peer_id.to_string() {
713 return Ok(()); }
715 self.handle_offer(&sender_pubkey, &their_uuid, offer, relay_write_tx)
716 .await?;
717 }
718 SignalingMessage::Answer {
719 recipient,
720 peer_id: their_uuid,
721 answer,
722 } => {
723 if recipient != self.my_peer_id.to_string() {
724 return Ok(());
725 }
726 self.handle_answer(&sender_pubkey, &their_uuid, answer)
727 .await?;
728 }
729 SignalingMessage::Candidate {
730 recipient,
731 peer_id: their_uuid,
732 candidate,
733 } => {
734 if recipient != self.my_peer_id.to_string() {
735 return Ok(());
736 }
737 self.handle_candidate(&sender_pubkey, &their_uuid, candidate)
738 .await?;
739 }
740 SignalingMessage::Candidates {
741 recipient,
742 peer_id: their_uuid,
743 candidates,
744 } => {
745 if recipient != self.my_peer_id.to_string() {
746 return Ok(());
747 }
748 self.handle_candidates(&sender_pubkey, &their_uuid, candidates)
749 .await?;
750 }
751 }
752
753 Ok(())
754 }
755
756 async fn handle_hello(
758 &self,
759 sender_pubkey: &str,
760 their_uuid: &str,
761 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
762 ) -> Result<()> {
763 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
764 let peer_key = full_peer_id.to_string();
765
766 {
768 let peers = self.state.peers.read().await;
769 if let Some(entry) = peers.get(&peer_key) {
770 if entry.state == ConnectionState::Connected
772 || entry.state == ConnectionState::Connecting
773 {
774 return Ok(());
775 }
776 }
777 }
778
779 let pool = (self.peer_classifier)(sender_pubkey);
781
782 let pool_counts = self.get_pool_counts().await;
784 if !self.can_accept_peer(pool, &pool_counts) {
785 debug!("Ignoring hello from {} - pool {:?} is full", full_peer_id.short(), pool);
786 return Ok(());
787 }
788
789 let should_initiate = self.should_initiate(their_uuid);
791
792 let pool_satisfied = self.is_pool_satisfied(pool, &pool_counts);
795 let will_initiate = should_initiate && !pool_satisfied;
796
797 info!(
798 "Discovered peer: {} (pool: {:?}, initiate: {}, pool_satisfied: {})",
799 full_peer_id.short(),
800 pool,
801 will_initiate,
802 pool_satisfied
803 );
804
805 if !will_initiate && pool_satisfied {
808 debug!("Pool {:?} is satisfied, not tracking peer {}", pool, full_peer_id.short());
809 return Ok(());
810 }
811
812 {
814 let mut peers = self.state.peers.write().await;
815 peers.insert(
816 peer_key.clone(),
817 PeerEntry {
818 peer_id: full_peer_id.clone(),
819 direction: if will_initiate {
820 PeerDirection::Outbound
821 } else {
822 PeerDirection::Inbound
823 },
824 state: ConnectionState::Discovered,
825 last_seen: Instant::now(),
826 peer: None,
827 pool,
828 },
829 );
830 }
831
832 if will_initiate {
834 self.initiate_connection(&full_peer_id, pool, relay_write_tx)
835 .await?;
836 }
837
838 Ok(())
839 }
840
841 async fn initiate_connection(
843 &self,
844 peer_id: &PeerId,
845 pool: PeerPool,
846 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
847 ) -> Result<()> {
848 let peer_key = peer_id.to_string();
849
850 info!("Initiating connection to {} (pool: {:?})", peer_id.short(), pool);
851
852 let mut peer = Peer::new_with_store_and_events(
854 peer_id.clone(),
855 PeerDirection::Outbound,
856 self.my_peer_id.clone(),
857 self.signaling_tx.clone(),
858 self.config.stun_servers.clone(),
859 self.store.clone(),
860 Some(self.state_event_tx.clone()),
861 )
862 .await?;
863
864 peer.setup_handlers().await?;
865
866 let offer = peer.connect().await?;
868
869 {
871 let mut peers = self.state.peers.write().await;
872 if let Some(entry) = peers.get_mut(&peer_key) {
873 entry.state = ConnectionState::Connecting;
874 entry.peer = Some(peer);
875 entry.pool = pool;
876 }
877 }
878
879 let offer_msg = SignalingMessage::Offer {
881 offer,
882 recipient: peer_id.to_string(),
883 peer_id: self.my_peer_id.uuid.clone(),
884 };
885 if relay_write_tx.send(offer_msg).is_err() {
886 warn!("Failed to broadcast offer to {}", peer_id.short());
887 }
888
889 info!("Sent offer to {}", peer_id.short());
890
891 Ok(())
892 }
893
894 async fn handle_offer(
896 &self,
897 sender_pubkey: &str,
898 their_uuid: &str,
899 offer: serde_json::Value,
900 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
901 ) -> Result<()> {
902 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
903 let peer_key = full_peer_id.to_string();
904
905 let pool = (self.peer_classifier)(sender_pubkey);
907
908 info!("Received offer from {} (pool: {:?})", full_peer_id.short(), pool);
909
910 {
912 let peers = self.state.peers.read().await;
913 if let Some(entry) = peers.get(&peer_key) {
914 if entry.peer.is_some() {
916 debug!("Already have peer {} with connection, skipping offer", full_peer_id.short());
917 return Ok(());
918 }
919 }
920 }
921
922 let pool_counts = self.get_pool_counts().await;
924 if !self.can_accept_peer(pool, &pool_counts) {
925 warn!("Rejecting offer from {} - pool {:?} is full", full_peer_id.short(), pool);
926 return Ok(());
927 }
928 let mut peer = Peer::new_with_store_and_events(
930 full_peer_id.clone(),
931 PeerDirection::Inbound,
932 self.my_peer_id.clone(),
933 self.signaling_tx.clone(),
934 self.config.stun_servers.clone(),
935 self.store.clone(),
936 Some(self.state_event_tx.clone()),
937 )
938 .await?;
939
940 peer.setup_handlers().await?;
941
942 let answer = peer.handle_offer(offer).await?;
944
945 {
947 let mut peers = self.state.peers.write().await;
948 peers.insert(
949 peer_key,
950 PeerEntry {
951 peer_id: full_peer_id.clone(),
952 direction: PeerDirection::Inbound,
953 state: ConnectionState::Connecting,
954 last_seen: Instant::now(),
955 peer: Some(peer),
956 pool,
957 },
958 );
959 }
960
961 let answer_msg = SignalingMessage::Answer {
963 answer,
964 recipient: full_peer_id.to_string(),
965 peer_id: self.my_peer_id.to_string(),
966 };
967 if relay_write_tx.send(answer_msg).is_err() {
968 warn!("Failed to send answer to {}", full_peer_id.short());
969 }
970 info!("Sent answer to {}", full_peer_id.short());
971
972 Ok(())
973 }
974
975 async fn handle_answer(
977 &self,
978 sender_pubkey: &str,
979 their_uuid: &str,
980 answer: serde_json::Value,
981 ) -> Result<()> {
982 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
983 let peer_key = full_peer_id.to_string();
984
985 info!("Received answer from {}", full_peer_id.short());
986
987 let mut peers = self.state.peers.write().await;
988 if let Some(entry) = peers.get_mut(&peer_key) {
989 if entry.state == ConnectionState::Connected {
991 debug!("Ignoring duplicate answer from {} - already connected", full_peer_id.short());
992 return Ok(());
993 }
994 if let Some(ref mut peer) = entry.peer {
995 use webrtc::peer_connection::signaling_state::RTCSignalingState;
997 let signaling_state = peer.signaling_state();
998 if signaling_state != RTCSignalingState::HaveLocalOffer {
999 debug!(
1000 "Ignoring answer from {} - signaling state is {:?}, not HaveLocalOffer",
1001 full_peer_id.short(),
1002 signaling_state
1003 );
1004 return Ok(());
1005 }
1006 peer.handle_answer(answer).await?;
1007 info!("Applied answer from {}", full_peer_id.short());
1008 }
1009 }
1010
1011 Ok(())
1012 }
1013
1014 async fn handle_candidate(
1016 &self,
1017 sender_pubkey: &str,
1018 their_uuid: &str,
1019 candidate: serde_json::Value,
1020 ) -> Result<()> {
1021 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1022 let peer_key = full_peer_id.to_string();
1023
1024 info!("Received ICE candidate from {}", full_peer_id.short());
1025
1026 let mut peers = self.state.peers.write().await;
1027 if let Some(entry) = peers.get_mut(&peer_key) {
1028 if let Some(ref mut peer) = entry.peer {
1029 peer.handle_candidate(candidate).await?;
1030 }
1031 }
1032
1033 Ok(())
1034 }
1035
1036 async fn handle_candidates(
1038 &self,
1039 sender_pubkey: &str,
1040 their_uuid: &str,
1041 candidates: Vec<serde_json::Value>,
1042 ) -> Result<()> {
1043 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1044 let peer_key = full_peer_id.to_string();
1045
1046 debug!(
1047 "Received {} candidates from {}",
1048 candidates.len(),
1049 full_peer_id.short()
1050 );
1051
1052 let mut peers = self.state.peers.write().await;
1053 if let Some(entry) = peers.get_mut(&peer_key) {
1054 if let Some(ref mut peer) = entry.peer {
1055 for candidate in candidates {
1056 if let Err(e) = peer.handle_candidate(candidate).await {
1057 debug!("Failed to add candidate: {}", e);
1058 }
1059 }
1060 }
1061 }
1062
1063 Ok(())
1064 }
1065
1066 async fn handle_peer_state_event(&self, event: PeerStateEvent) {
1068 match event {
1069 PeerStateEvent::Connected(peer_id) => {
1070 let peer_key = peer_id.to_string();
1071 let mut peers = self.state.peers.write().await;
1072 if let Some(entry) = peers.get_mut(&peer_key) {
1073 if entry.state != ConnectionState::Connected {
1074 info!("Peer {} connected (via state event)", peer_id.short());
1075 entry.state = ConnectionState::Connected;
1076 self.state.connected_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1078 }
1079 }
1080 }
1081 PeerStateEvent::Failed(peer_id) => {
1082 let peer_key = peer_id.to_string();
1083 info!("Peer {} connection failed - removing from pool", peer_id.short());
1084 let mut peers = self.state.peers.write().await;
1085 if let Some(entry) = peers.remove(&peer_key) {
1086 if entry.state == ConnectionState::Connected {
1088 self.state.connected_count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
1089 }
1090 if let Some(peer) = entry.peer {
1092 let _ = peer.close().await;
1093 }
1094 }
1095 }
1096 PeerStateEvent::Disconnected(peer_id) => {
1097 let peer_key = peer_id.to_string();
1098 info!("Peer {} disconnected - removing from pool", peer_id.short());
1099 let mut peers = self.state.peers.write().await;
1100 if let Some(entry) = peers.remove(&peer_key) {
1101 if entry.state == ConnectionState::Connected {
1103 self.state.connected_count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
1104 }
1105 if let Some(peer) = entry.peer {
1107 let _ = peer.close().await;
1108 }
1109 }
1110 }
1111 }
1112 }
1113
1114 async fn cleanup_stale_peers(&self) {
1116 let mut peers = self.state.peers.write().await;
1117 let mut connected_count = 0;
1118 let mut to_remove = Vec::new();
1119 let stale_timeout = Duration::from_secs(60); for (key, entry) in peers.iter_mut() {
1122 if let Some(ref peer) = entry.peer {
1123 if peer.is_connected() {
1125 if entry.state != ConnectionState::Connected {
1126 info!("Peer {} is now connected (sync fallback)", entry.peer_id.short());
1127 entry.state = ConnectionState::Connected;
1128 }
1129 connected_count += 1;
1130 } else if entry.state == ConnectionState::Connecting && entry.last_seen.elapsed() > stale_timeout {
1131 info!("Removing stale peer {} (stuck in Connecting for {:?})", entry.peer_id.short(), entry.last_seen.elapsed());
1133 to_remove.push(key.clone());
1134 }
1135 } else if entry.state == ConnectionState::Discovered && entry.last_seen.elapsed() > stale_timeout {
1136 debug!("Removing stale discovered peer {}", entry.peer_id.short());
1138 to_remove.push(key.clone());
1139 }
1140 }
1141
1142 for key in to_remove {
1144 if let Some(entry) = peers.remove(&key) {
1145 if let Some(peer) = entry.peer {
1146 let _ = peer.close().await;
1147 }
1148 }
1149 }
1150
1151 self.state
1152 .connected_count
1153 .store(connected_count, std::sync::atomic::Ordering::Relaxed);
1154 }
1155}
1156
1157#[allow(dead_code)]
1159#[derive(Debug, Clone)]
1160pub struct PeerState {
1161 pub peer_id: PeerId,
1162 pub direction: PeerDirection,
1163 pub state: String,
1164 pub last_seen: Instant,
1165}