Skip to main content

hashtree_cli/webrtc/
signaling.rs

1//! WebRTC signaling over Nostr relays
2//!
3//! Protocol (compatible with hashtree-ts):
4//! - All signaling uses ephemeral kind 25050
5//! - Hello messages: #l: "hello" tag, broadcast for peer discovery (unencrypted)
6//! - Directed signaling (offer, answer, candidate, candidates): NIP-17 style
7//!   gift wrap for privacy - wrapped with ephemeral key, #p tag with recipient
8//!
9//! Security: Directed messages use gift wrapping with ephemeral keys so that
10//! relays cannot see the actual sender or correlate messages.
11
12use anyhow::Result;
13use futures::{SinkExt, StreamExt};
14use nostr::{
15    nips::nip44, ClientMessage, EventBuilder, Filter, JsonUtil, Keys, Kind, PublicKey,
16    RelayMessage, Tag,
17};
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21use tokio::sync::{mpsc, Mutex, RwLock};
22use tokio_tungstenite::{connect_async, tungstenite::Message};
23use tracing::{debug, error, info, warn};
24
25use super::peer::{ContentStore, Peer, PendingRequest};
26use super::types::{
27    PeerDirection, PeerId, PeerPool, PeerStateEvent, PeerStatus, SignalingMessage, WebRTCConfig,
28    HELLO_TAG, WEBRTC_KIND,
29};
30use crate::nostr_relay::NostrRelay;
31
32/// Callback type for classifying peers into pools
33pub type PeerClassifier = Arc<dyn Fn(&str) -> PeerPool + Send + Sync>;
34
35/// Connection state for a peer
36#[derive(Debug, Clone, PartialEq)]
37pub enum ConnectionState {
38    Discovered,
39    Connecting,
40    Connected,
41    Failed,
42}
43
44impl std::fmt::Display for ConnectionState {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        match self {
47            ConnectionState::Discovered => write!(f, "discovered"),
48            ConnectionState::Connecting => write!(f, "connecting"),
49            ConnectionState::Connected => write!(f, "connected"),
50            ConnectionState::Failed => write!(f, "failed"),
51        }
52    }
53}
54
55/// Peer entry in the manager
56pub struct PeerEntry {
57    pub peer_id: PeerId,
58    pub direction: PeerDirection,
59    pub state: ConnectionState,
60    pub last_seen: Instant,
61    pub peer: Option<Peer>,
62    pub pool: PeerPool,
63    pub bytes_sent: u64,
64    pub bytes_received: u64,
65}
66
67/// Shared state for WebRTC manager
68pub struct WebRTCState {
69    pub peers: RwLock<HashMap<String, PeerEntry>>,
70    pub connected_count: std::sync::atomic::AtomicUsize,
71    /// Total bytes sent across all peers (cumulative)
72    pub bytes_sent: std::sync::atomic::AtomicU64,
73    /// Total bytes received across all peers (cumulative)
74    pub bytes_received: std::sync::atomic::AtomicU64,
75}
76
77impl WebRTCState {
78    pub fn new() -> Self {
79        Self {
80            peers: RwLock::new(HashMap::new()),
81            connected_count: std::sync::atomic::AtomicUsize::new(0),
82            bytes_sent: std::sync::atomic::AtomicU64::new(0),
83            bytes_received: std::sync::atomic::AtomicU64::new(0),
84        }
85    }
86
87    /// Get current bandwidth stats (bytes sent/received)
88    pub fn get_bandwidth(&self) -> (u64, u64) {
89        (
90            self.bytes_sent.load(std::sync::atomic::Ordering::Relaxed),
91            self.bytes_received
92                .load(std::sync::atomic::Ordering::Relaxed),
93        )
94    }
95
96    /// Record bytes sent (global + per-peer)
97    pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
98        self.bytes_sent
99            .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
100        if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
101            entry.bytes_sent += bytes;
102        }
103    }
104
105    /// Record bytes received (global + per-peer)
106    pub async fn record_received(&self, peer_id: &str, bytes: u64) {
107        self.bytes_received
108            .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
109        if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
110            entry.bytes_received += bytes;
111        }
112    }
113
114    /// Request content by hash from connected peers
115    /// Queries peers sequentially with 500ms intervals until one responds
116    /// Returns the first successful response, or None if no peer has it
117    pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
118        use super::types::{encode_request, DataRequest, MAX_HTL};
119
120        let peers = self.peers.read().await;
121
122        // Collect connected peers with data channels
123        // We need to collect the Arc references first, then acquire locks outside the iterator
124        let peer_refs: Vec<_> = peers
125            .values()
126            .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
127            .filter_map(|p| {
128                p.peer.as_ref().map(|peer| {
129                    (
130                        p.peer_id.short(),
131                        peer.data_channel.clone(),
132                        peer.pending_requests.clone(),
133                    )
134                })
135            })
136            .collect();
137
138        drop(peers); // Release the read lock
139
140        // Now acquire locks and filter to peers with active data channels
141        let mut connected_peers: Vec<(
142            String,
143            Arc<Mutex<HashMap<String, PendingRequest>>>,
144            Arc<webrtc::data_channel::RTCDataChannel>,
145        )> = Vec::new();
146        for (peer_id, dc_mutex, pending) in peer_refs {
147            let dc_guard = dc_mutex.lock().await;
148            if let Some(dc) = dc_guard.as_ref() {
149                connected_peers.push((peer_id, pending, dc.clone()));
150            }
151        }
152
153        if connected_peers.is_empty() {
154            debug!(
155                "No connected peers to query for {}",
156                &hash_hex[..8.min(hash_hex.len())]
157            );
158            return None;
159        }
160
161        debug!(
162            "Querying {} connected peers for {} (sequential with 500ms delay)",
163            connected_peers.len(),
164            &hash_hex[..8.min(hash_hex.len())]
165        );
166
167        // Convert hex to binary hash once
168        let hash_bytes = match hex::decode(hash_hex) {
169            Ok(b) => b,
170            Err(_) => return None,
171        };
172
173        // Query peers sequentially with 500ms delay between each
174        for (_i, (peer_id, pending_requests, dc)) in connected_peers.into_iter().enumerate() {
175            debug!(
176                "Querying peer {} for {}",
177                peer_id,
178                &hash_hex[..8.min(hash_hex.len())]
179            );
180
181            // Create response channel
182            let (tx, rx) = tokio::sync::oneshot::channel();
183
184            // Store pending request
185            {
186                let mut pending = pending_requests.lock().await;
187                pending.insert(
188                    hash_hex.to_string(),
189                    super::PendingRequest {
190                        hash: hash_bytes.clone(),
191                        response_tx: tx,
192                    },
193                );
194            }
195
196            // Send request
197            let req = DataRequest {
198                h: hash_bytes.clone(),
199                htl: MAX_HTL,
200            };
201            if let Ok(wire) = encode_request(&req) {
202                let wire_len = wire.len() as u64;
203                if dc.send(&bytes::Bytes::from(wire)).await.is_ok() {
204                    self.record_sent(&peer_id, wire_len).await;
205                    // Wait 500ms for response from this peer
206                    match tokio::time::timeout(std::time::Duration::from_millis(500), rx).await {
207                        Ok(Ok(Some(data))) => {
208                            self.record_received(&peer_id, data.len() as u64).await;
209                            debug!(
210                                "Got response from peer {} for {}",
211                                peer_id,
212                                &hash_hex[..8.min(hash_hex.len())]
213                            );
214                            return Some(data);
215                        }
216                        _ => {
217                            // Timeout or no data - clean up and try next peer
218                            debug!(
219                                "No response from peer {} for {}",
220                                peer_id,
221                                &hash_hex[..8.min(hash_hex.len())]
222                            );
223                        }
224                    }
225                }
226            }
227
228            // Clean up pending request
229            let mut pending = pending_requests.lock().await;
230            pending.remove(hash_hex);
231        }
232
233        debug!(
234            "No peer had data for {}",
235            &hash_hex[..8.min(hash_hex.len())]
236        );
237        None
238    }
239}
240
241/// WebRTC manager handles peer discovery and connection management
242pub struct WebRTCManager {
243    config: WebRTCConfig,
244    my_peer_id: PeerId,
245    keys: Keys,
246    state: Arc<WebRTCState>,
247    shutdown: Arc<tokio::sync::watch::Sender<bool>>,
248    shutdown_rx: tokio::sync::watch::Receiver<bool>,
249    /// Channel to send signaling messages to relays
250    signaling_tx: mpsc::Sender<SignalingMessage>,
251    signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
252    /// Optional content store for serving hash requests
253    store: Option<Arc<dyn ContentStore>>,
254    /// Peer classifier for pool assignment
255    peer_classifier: PeerClassifier,
256    /// Optional Nostr relay for data-channel relay messages
257    nostr_relay: Option<Arc<NostrRelay>>,
258    /// Channel for peer state events (connection success/failure)
259    state_event_tx: mpsc::Sender<PeerStateEvent>,
260    state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
261}
262
263impl WebRTCManager {
264    /// Create a new WebRTC manager
265    pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
266        let pubkey = keys.public_key().to_hex();
267        let my_peer_id = PeerId::new(pubkey, None);
268        let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
269        let (signaling_tx, signaling_rx) = mpsc::channel(100);
270        let (state_event_tx, state_event_rx) = mpsc::channel(100);
271
272        // Default classifier: all peers go to 'other' pool
273        let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
274
275        Self {
276            config,
277            my_peer_id,
278            keys,
279            state: Arc::new(WebRTCState::new()),
280            shutdown: Arc::new(shutdown),
281            shutdown_rx,
282            signaling_tx,
283            signaling_rx: Some(signaling_rx),
284            store: None,
285            peer_classifier,
286            nostr_relay: None,
287            state_event_tx,
288            state_event_rx: Some(state_event_rx),
289        }
290    }
291
292    /// Create a new WebRTC manager with a peer classifier
293    pub fn new_with_classifier(
294        keys: Keys,
295        config: WebRTCConfig,
296        classifier: PeerClassifier,
297    ) -> Self {
298        let mut manager = Self::new(keys, config);
299        manager.peer_classifier = classifier;
300        manager
301    }
302
303    /// Create a new WebRTC manager with a content store for serving hash requests
304    pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
305        let mut manager = Self::new(keys, config);
306        manager.store = Some(store);
307        manager
308    }
309
310    /// Create a new WebRTC manager with store and classifier
311    pub fn new_with_store_and_classifier(
312        keys: Keys,
313        config: WebRTCConfig,
314        store: Arc<dyn ContentStore>,
315        classifier: PeerClassifier,
316    ) -> Self {
317        let mut manager = Self::new(keys, config);
318        manager.store = Some(store);
319        manager.peer_classifier = classifier;
320        manager
321    }
322
323    /// Set the content store for serving hash requests
324    pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
325        self.store = Some(store);
326    }
327
328    /// Set the peer classifier
329    pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
330        self.peer_classifier = classifier;
331    }
332
333    /// Set the Nostr relay for data-channel relay messages
334    pub fn set_nostr_relay(&mut self, relay: Arc<NostrRelay>) {
335        self.nostr_relay = Some(relay);
336    }
337
338    /// Get my peer ID
339    pub fn my_peer_id(&self) -> &PeerId {
340        &self.my_peer_id
341    }
342
343    /// Get shared state for external access
344    pub fn state(&self) -> Arc<WebRTCState> {
345        self.state.clone()
346    }
347
348    /// Signal shutdown
349    pub fn shutdown(&self) {
350        let _ = self.shutdown.send(true);
351    }
352
353    /// Get connected peer count
354    pub async fn connected_count(&self) -> usize {
355        self.state
356            .connected_count
357            .load(std::sync::atomic::Ordering::Relaxed)
358    }
359
360    /// Get all peer statuses
361    pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
362        self.state
363            .peers
364            .read()
365            .await
366            .values()
367            .map(|p| PeerStatus {
368                peer_id: p.peer_id.to_string(),
369                pubkey: p.peer_id.pubkey.clone(),
370                state: p.state.to_string(),
371                direction: p.direction,
372                connected_at: Some(p.last_seen),
373                pool: p.pool,
374            })
375            .collect()
376    }
377
378    /// Get pool counts
379    /// Returns (follows_connected, follows_active, other_connected, other_active)
380    /// "active" = Connected or Connecting (excludes Discovered and Failed)
381    pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
382        let peers = self.state.peers.read().await;
383        let mut follows_connected = 0;
384        let mut follows_active = 0;
385        let mut other_connected = 0;
386        let mut other_active = 0;
387
388        for entry in peers.values() {
389            // Only count Connected or Connecting as "active" connections
390            // Discovered peers are just seen hellos, not real connections
391            let is_active = entry.state == ConnectionState::Connected
392                || entry.state == ConnectionState::Connecting;
393
394            match entry.pool {
395                PeerPool::Follows => {
396                    if is_active {
397                        follows_active += 1;
398                    }
399                    if entry.state == ConnectionState::Connected {
400                        follows_connected += 1;
401                    }
402                }
403                PeerPool::Other => {
404                    if is_active {
405                        other_active += 1;
406                    }
407                    if entry.state == ConnectionState::Connected {
408                        other_connected += 1;
409                    }
410                }
411            }
412        }
413
414        (
415            follows_connected,
416            follows_active,
417            other_connected,
418            other_active,
419        )
420    }
421
422    /// Check if we can accept a peer in a given pool
423    fn can_accept_peer(&self, pool: PeerPool, pool_counts: &(usize, usize, usize, usize)) -> bool {
424        let (_, follows_active, _, other_active) = *pool_counts;
425        match pool {
426            PeerPool::Follows => follows_active < self.config.pools.follows.max_connections,
427            PeerPool::Other => other_active < self.config.pools.other.max_connections,
428        }
429    }
430
431    /// Check if a pool is satisfied
432    #[allow(dead_code)]
433    fn is_pool_satisfied(
434        &self,
435        pool: PeerPool,
436        pool_counts: &(usize, usize, usize, usize),
437    ) -> bool {
438        let (follows_connected, _, other_connected, _) = *pool_counts;
439        match pool {
440            PeerPool::Follows => {
441                follows_connected >= self.config.pools.follows.satisfied_connections
442            }
443            PeerPool::Other => other_connected >= self.config.pools.other.satisfied_connections,
444        }
445    }
446
447    /// Check if both pools are satisfied
448    #[allow(dead_code)]
449    fn is_satisfied(&self, pool_counts: &(usize, usize, usize, usize)) -> bool {
450        self.is_pool_satisfied(PeerPool::Follows, pool_counts)
451            && self.is_pool_satisfied(PeerPool::Other, pool_counts)
452    }
453
454    /// Check if we should initiate connection (tie-breaking)
455    /// Lower UUID initiates - same as iris-client/hashtree-ts
456    fn should_initiate(&self, their_uuid: &str) -> bool {
457        self.my_peer_id.uuid < their_uuid.to_string()
458    }
459
460    /// Start the WebRTC manager - connects to relays and handles signaling
461    pub async fn run(&mut self) -> Result<()> {
462        info!(
463            "Starting WebRTC manager with peer ID: {}",
464            self.my_peer_id.short()
465        );
466
467        let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr::Event)>(100);
468
469        // Take the signaling receiver
470        let mut signaling_rx = self
471            .signaling_rx
472            .take()
473            .expect("signaling_rx already taken");
474
475        // Take the state event receiver
476        let mut state_event_rx = self
477            .state_event_rx
478            .take()
479            .expect("state_event_rx already taken");
480
481        // Create a shared write channel for all relay tasks
482        let (relay_write_tx, _) = tokio::sync::broadcast::channel::<SignalingMessage>(100);
483
484        // Spawn relay connections
485        for relay_url in &self.config.relays {
486            let url = relay_url.clone();
487            let event_tx = event_tx.clone();
488            let shutdown_rx = self.shutdown_rx.clone();
489            let keys = self.keys.clone();
490            let my_peer_id = self.my_peer_id.clone();
491            let hello_interval = Duration::from_millis(self.config.hello_interval_ms);
492            let relay_write_rx = relay_write_tx.subscribe();
493
494            tokio::spawn(async move {
495                if let Err(e) = Self::relay_task(
496                    url.clone(),
497                    event_tx,
498                    shutdown_rx,
499                    keys,
500                    my_peer_id,
501                    hello_interval,
502                    relay_write_rx,
503                )
504                .await
505                {
506                    error!("Relay {} error: {}", url, e);
507                }
508            });
509        }
510
511        // Process incoming events and outgoing signaling messages
512        let mut shutdown_rx = self.shutdown_rx.clone();
513        // Cleanup interval - run every 30 seconds as a fallback (not for real-time sync)
514        let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
515        loop {
516            tokio::select! {
517                _ = shutdown_rx.changed() => {
518                    if *shutdown_rx.borrow() {
519                        info!("WebRTC manager shutting down");
520                        break;
521                    }
522                }
523                Some((relay, event)) = event_rx.recv() => {
524                    if let Err(e) = self.handle_event(&relay, &event, &relay_write_tx).await {
525                        debug!("Error handling event from {}: {}", relay, e);
526                    }
527                }
528                Some(msg) = signaling_rx.recv() => {
529                    // Forward signaling messages to relay broadcast
530                    let _ = relay_write_tx.send(msg);
531                }
532                Some(event) = state_event_rx.recv() => {
533                    // Handle peer state events (connected, failed, disconnected)
534                    self.handle_peer_state_event(event).await;
535                }
536                _ = cleanup_interval.tick() => {
537                    // Periodic cleanup of stale peers and state sync (fallback)
538                    self.cleanup_stale_peers().await;
539                }
540            }
541        }
542
543        Ok(())
544    }
545
546    /// Connect to a single relay and handle messages
547    async fn relay_task(
548        url: String,
549        event_tx: mpsc::Sender<(String, nostr::Event)>,
550        mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
551        keys: Keys,
552        my_peer_id: PeerId,
553        hello_interval: Duration,
554        mut signaling_rx: tokio::sync::broadcast::Receiver<SignalingMessage>,
555    ) -> Result<()> {
556        info!("Connecting to relay: {}", url);
557
558        let (ws_stream, _) = connect_async(&url).await?;
559        let (mut write, mut read) = ws_stream.split();
560
561        // Subscribe to webrtc events - two filters:
562        // 1. Hello messages: kind 25050 with #l: "hello" tag
563        // 2. Directed messages: kind 25050 with #p tag (our pubkey)
564        let hello_filter = Filter::new()
565            .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
566            .custom_tag(
567                nostr::SingleLetterTag::lowercase(nostr::Alphabet::L),
568                vec![HELLO_TAG],
569            )
570            .since(nostr::Timestamp::now() - Duration::from_secs(60));
571
572        let directed_filter = Filter::new()
573            .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
574            .custom_tag(
575                nostr::SingleLetterTag::lowercase(nostr::Alphabet::P),
576                vec![keys.public_key().to_hex()],
577            )
578            .since(nostr::Timestamp::now() - Duration::from_secs(60));
579
580        let sub_id = nostr::SubscriptionId::generate();
581        let sub_msg = ClientMessage::req(sub_id.clone(), vec![hello_filter, directed_filter]);
582        write.send(Message::Text(sub_msg.as_json().into())).await?;
583
584        info!(
585            "Subscribed to {} for WebRTC events (kind {})",
586            url, WEBRTC_KIND
587        );
588
589        let mut last_hello = Instant::now() - hello_interval; // Send immediately
590        let mut hello_ticker = tokio::time::interval(Duration::from_secs(1));
591
592        loop {
593            tokio::select! {
594                _ = shutdown_rx.changed() => {
595                    if *shutdown_rx.borrow() {
596                        break;
597                    }
598                }
599                _ = hello_ticker.tick() => {
600                    // Send hello periodically
601                    if last_hello.elapsed() >= hello_interval {
602                        let hello = SignalingMessage::hello(&my_peer_id.uuid);
603                        if let Ok(event) = Self::create_signaling_event(&keys, &hello).await {
604                            let msg = ClientMessage::event(event);
605                            if write.send(Message::Text(msg.as_json().into())).await.is_ok() {
606                                debug!("Sent hello to {}", url);
607                            }
608                        }
609                        last_hello = Instant::now();
610                    }
611                }
612                // Handle outgoing signaling messages
613                Ok(signaling_msg) = signaling_rx.recv() => {
614                    info!("Sending {} via {}", signaling_msg.msg_type(), url);
615                    if let Ok(event) = Self::create_signaling_event(&keys, &signaling_msg).await {
616                        let event_id = event.id.to_string();
617                        let msg = ClientMessage::event(event);
618                        if write.send(Message::Text(msg.as_json().into())).await.is_ok() {
619                            info!("Sent {} to {} (event id: {})", signaling_msg.msg_type(), url, &event_id[..16]);
620                        }
621                    }
622                }
623                msg = read.next() => {
624                    match msg {
625                        Some(Ok(Message::Text(text))) => {
626                            if let Ok(relay_msg) = RelayMessage::from_json(&text) {
627                                if let RelayMessage::Event { event, .. } = relay_msg {
628                                    let _ = event_tx.send((url.clone(), *event)).await;
629                                }
630                            }
631                        }
632                        Some(Err(e)) => {
633                            error!("WebSocket error from {}: {}", url, e);
634                            break;
635                        }
636                        None => {
637                            warn!("WebSocket closed: {}", url);
638                            break;
639                        }
640                        _ => {}
641                    }
642                }
643            }
644        }
645
646        Ok(())
647    }
648
649    /// Create a signaling event
650    ///
651    /// For directed messages (offer, answer, candidate, candidates), use NIP-17 style
652    /// gift wrapping with ephemeral keys for privacy.
653    /// Hello messages use kind 25050 with #l: "hello" tag and peerId.
654    async fn create_signaling_event(keys: &Keys, msg: &SignalingMessage) -> Result<nostr::Event> {
655        // Check if message has a recipient (needs gift wrapping)
656        if let Some(recipient_str) = msg.recipient() {
657            // Parse recipient to get their pubkey
658            if let Some(peer_id) = PeerId::from_string(recipient_str) {
659                let recipient_pubkey = PublicKey::from_hex(&peer_id.pubkey)?;
660
661                // Create seal with sender's actual pubkey (the "rumor")
662                let seal = serde_json::json!({
663                    "pubkey": keys.public_key().to_hex(),
664                    "kind": WEBRTC_KIND,
665                    "content": serde_json::to_string(msg)?,
666                    "tags": []
667                });
668
669                // Generate ephemeral keypair for the wrapper
670                let ephemeral_keys = Keys::generate();
671
672                // Encrypt the seal for the recipient using ephemeral key (NIP-44)
673                let encrypted_content = nip44::encrypt(
674                    ephemeral_keys.secret_key(),
675                    &recipient_pubkey,
676                    &seal.to_string(),
677                    nip44::Version::V2,
678                )?;
679
680                // Create wrapper event with ephemeral key
681                let created_at = nostr::Timestamp::now();
682                let expiration = created_at + Duration::from_secs(5 * 60); // 5 minutes
683
684                let tags = vec![
685                    Tag::parse(&["p", &recipient_pubkey.to_hex()])?,
686                    Tag::parse(&["expiration", &expiration.as_u64().to_string()])?,
687                ];
688
689                let event =
690                    EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), encrypted_content, tags)
691                        .to_event(&ephemeral_keys)?;
692
693                return Ok(event);
694            }
695        }
696
697        // Hello messages - kind 25050 with #l: "hello" tag and peerId
698        let tags = vec![
699            Tag::parse(&["l", HELLO_TAG])?,
700            Tag::parse(&["peerId", msg.peer_id()])?,
701        ];
702
703        let event =
704            EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), "", tags).to_event(keys)?;
705
706        Ok(event)
707    }
708
709    /// Handle an incoming event
710    ///
711    /// Messages may be:
712    /// 1. Hello messages: kind 25050 with #l: "hello" tag and peerId
713    /// 2. Gift-wrapped directed messages: kind 25050 with #p tag, encrypted with ephemeral key
714    async fn handle_event(
715        &self,
716        relay: &str,
717        event: &nostr::Event,
718        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
719    ) -> Result<()> {
720        // Must be kind 25050
721        if event.kind != Kind::Ephemeral(WEBRTC_KIND as u16) {
722            return Ok(());
723        }
724
725        // Helper to get tag value
726        let get_tag = |name: &str| -> Option<String> {
727            event.tags.iter().find_map(|tag| {
728                let v: Vec<String> = tag.clone().to_vec();
729                if v.len() >= 2 && v[0] == name {
730                    Some(v[1].clone())
731                } else {
732                    None
733                }
734            })
735        };
736
737        // Check if this is a hello message (#l: "hello" tag)
738        let l_tag = get_tag("l");
739        if l_tag.as_deref() == Some(HELLO_TAG) {
740            let sender_pubkey = event.pubkey.to_hex();
741
742            // Skip our own hello messages
743            if sender_pubkey == self.my_peer_id.pubkey {
744                return Ok(());
745            }
746
747            if let Some(their_uuid) = get_tag("peerId") {
748                debug!("Received hello from {} via {}", &sender_pubkey[..8], relay);
749                self.handle_hello(&sender_pubkey, &their_uuid, relay_write_tx)
750                    .await?;
751            }
752            return Ok(());
753        }
754
755        // Check if this is a directed message for us (#p tag with our pubkey)
756        let p_tag = get_tag("p");
757        if p_tag.as_deref() != Some(&self.keys.public_key().to_hex()) {
758            // Not for us - ignore silently
759            return Ok(());
760        }
761
762        // Gift-wrapped directed message - decrypt using our key and ephemeral sender's pubkey
763        if event.content.is_empty() {
764            return Ok(());
765        }
766
767        // Try to unwrap the gift - decrypt with our key and the ephemeral sender's pubkey
768        let seal: serde_json::Value =
769            match nip44::decrypt(self.keys.secret_key(), &event.pubkey, &event.content) {
770                Ok(plaintext) => match serde_json::from_str(&plaintext) {
771                    Ok(v) => v,
772                    Err(_) => return Ok(()),
773                },
774                Err(_) => {
775                    // Can't decrypt - not for us or invalid
776                    return Ok(());
777                }
778            };
779
780        // Extract the actual sender's pubkey and content from the seal
781        let sender_pubkey = seal
782            .get("pubkey")
783            .and_then(|v| v.as_str())
784            .ok_or_else(|| anyhow::anyhow!("Missing pubkey in seal"))?;
785
786        // Skip our own messages
787        if sender_pubkey == self.my_peer_id.pubkey {
788            return Ok(());
789        }
790
791        let content = seal
792            .get("content")
793            .and_then(|v| v.as_str())
794            .ok_or_else(|| anyhow::anyhow!("Missing content in seal"))?;
795
796        let raw_msg: serde_json::Value = serde_json::from_str(content)?;
797        let msg_type = raw_msg.get("type").and_then(|v| v.as_str()).unwrap_or("");
798
799        // Support hashtree-ts format: { type, peerId, targetPeerId, sdp/candidate/candidates }
800        if raw_msg.get("targetPeerId").is_some() {
801            let target_peer = raw_msg
802                .get("targetPeerId")
803                .and_then(|v| v.as_str())
804                .unwrap_or("");
805            if target_peer != self.my_peer_id.to_string() {
806                return Ok(());
807            }
808
809            let peer_id = raw_msg.get("peerId").and_then(|v| v.as_str()).unwrap_or("");
810            let their_uuid = peer_id.split(':').nth(1).unwrap_or(peer_id);
811
812            match msg_type {
813                "offer" => {
814                    let sdp = raw_msg
815                        .get("sdp")
816                        .and_then(|v| v.as_str())
817                        .ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
818                    let offer = serde_json::json!({ "type": "offer", "sdp": sdp });
819                    self.handle_offer(&sender_pubkey, their_uuid, offer, relay_write_tx)
820                        .await?;
821                }
822                "answer" => {
823                    let sdp = raw_msg
824                        .get("sdp")
825                        .and_then(|v| v.as_str())
826                        .ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
827                    let answer = serde_json::json!({ "type": "answer", "sdp": sdp });
828                    self.handle_answer(&sender_pubkey, their_uuid, answer)
829                        .await?;
830                }
831                "candidate" => {
832                    let candidate = raw_msg
833                        .get("candidate")
834                        .and_then(|v| v.as_str())
835                        .unwrap_or("");
836                    if !candidate.is_empty() {
837                        let candidate_json = serde_json::json!({
838                            "candidate": candidate,
839                            "sdpMid": raw_msg.get("sdpMid"),
840                            "sdpMLineIndex": raw_msg.get("sdpMLineIndex"),
841                        });
842                        self.handle_candidate(&sender_pubkey, their_uuid, candidate_json)
843                            .await?;
844                    }
845                }
846                "candidates" => {
847                    let candidates = raw_msg
848                        .get("candidates")
849                        .and_then(|v| v.as_array())
850                        .map(|entries| {
851                            entries
852                                .iter()
853                                .filter_map(|entry| {
854                                    if let Some(candidate_str) = entry
855                                        .get("candidate")
856                                        .and_then(|v| v.as_str())
857                                        .or_else(|| entry.as_str())
858                                    {
859                                        Some(serde_json::json!({
860                                            "candidate": candidate_str,
861                                            "sdpMid": entry.get("sdpMid"),
862                                            "sdpMLineIndex": entry.get("sdpMLineIndex"),
863                                        }))
864                                    } else {
865                                        None
866                                    }
867                                })
868                                .collect::<Vec<_>>()
869                        })
870                        .unwrap_or_default();
871                    self.handle_candidates(&sender_pubkey, their_uuid, candidates)
872                        .await?;
873                }
874                _ => {}
875            }
876
877            return Ok(());
878        }
879
880        let msg: SignalingMessage = serde_json::from_value(raw_msg)?;
881
882        debug!(
883            "Received {} from {} via {} (gift-wrapped)",
884            msg.msg_type(),
885            &sender_pubkey[..8],
886            relay
887        );
888
889        match msg {
890            SignalingMessage::Hello { .. } => {
891                // Hello messages should come via tags, not gift wrap
892                return Ok(());
893            }
894            SignalingMessage::Offer {
895                recipient,
896                peer_id: their_uuid,
897                offer,
898            } => {
899                if recipient != self.my_peer_id.to_string() {
900                    return Ok(()); // Not for us
901                }
902                if let Err(e) = self
903                    .handle_offer(&sender_pubkey, &their_uuid, offer, relay_write_tx)
904                    .await
905                {
906                    error!(
907                        "handle_offer FAILED: sender={}, uuid={}, error={:?}",
908                        &sender_pubkey[..8.min(sender_pubkey.len())],
909                        their_uuid,
910                        e
911                    );
912                    return Err(e);
913                }
914            }
915            SignalingMessage::Answer {
916                recipient,
917                peer_id: their_uuid,
918                answer,
919            } => {
920                if recipient != self.my_peer_id.to_string() {
921                    return Ok(());
922                }
923                self.handle_answer(&sender_pubkey, &their_uuid, answer)
924                    .await?;
925            }
926            SignalingMessage::Candidate {
927                recipient,
928                peer_id: their_uuid,
929                candidate,
930            } => {
931                if recipient != self.my_peer_id.to_string() {
932                    return Ok(());
933                }
934                self.handle_candidate(&sender_pubkey, &their_uuid, candidate)
935                    .await?;
936            }
937            SignalingMessage::Candidates {
938                recipient,
939                peer_id: their_uuid,
940                candidates,
941            } => {
942                if recipient != self.my_peer_id.to_string() {
943                    return Ok(());
944                }
945                self.handle_candidates(&sender_pubkey, &their_uuid, candidates)
946                    .await?;
947            }
948        }
949
950        Ok(())
951    }
952
953    /// Handle incoming hello message
954    async fn handle_hello(
955        &self,
956        sender_pubkey: &str,
957        their_uuid: &str,
958        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
959    ) -> Result<()> {
960        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
961        let peer_key = full_peer_id.to_string();
962
963        // Check if we already have this peer
964        {
965            let peers = self.state.peers.read().await;
966            if let Some(entry) = peers.get(&peer_key) {
967                // Already connected or connecting, just update last_seen
968                if entry.state == ConnectionState::Connected
969                    || entry.state == ConnectionState::Connecting
970                {
971                    return Ok(());
972                }
973            }
974        }
975
976        // Classify the peer into a pool
977        let pool = (self.peer_classifier)(sender_pubkey);
978
979        // Check pool limits
980        let pool_counts = self.get_pool_counts().await;
981        if !self.can_accept_peer(pool, &pool_counts) {
982            debug!(
983                "Ignoring hello from {} - pool {:?} is full",
984                full_peer_id.short(),
985                pool
986            );
987            return Ok(());
988        }
989
990        // Decide if we should initiate based on tie-breaking
991        let should_initiate = self.should_initiate(their_uuid);
992
993        // If pool is already satisfied, don't initiate new outbound connections
994        // This reserves space for inbound connections
995        let pool_satisfied = self.is_pool_satisfied(pool, &pool_counts);
996        let will_initiate = should_initiate && !pool_satisfied;
997
998        info!(
999            "Discovered peer: {} (pool: {:?}, initiate: {}, pool_satisfied: {})",
1000            full_peer_id.short(),
1001            pool,
1002            will_initiate,
1003            pool_satisfied
1004        );
1005
1006        // If we're not initiating and pool is satisfied, don't even add to discovered
1007        // (we won't accept their offer either since pool check happens in handle_offer)
1008        if !will_initiate && pool_satisfied {
1009            debug!(
1010                "Pool {:?} is satisfied, not tracking peer {}",
1011                pool,
1012                full_peer_id.short()
1013            );
1014            return Ok(());
1015        }
1016
1017        // Create peer entry with pool assignment
1018        {
1019            let mut peers = self.state.peers.write().await;
1020            peers.insert(
1021                peer_key.clone(),
1022                PeerEntry {
1023                    peer_id: full_peer_id.clone(),
1024                    direction: if will_initiate {
1025                        PeerDirection::Outbound
1026                    } else {
1027                        PeerDirection::Inbound
1028                    },
1029                    state: ConnectionState::Discovered,
1030                    last_seen: Instant::now(),
1031                    peer: None,
1032                    pool,
1033                    bytes_sent: 0,
1034                    bytes_received: 0,
1035                },
1036            );
1037        }
1038
1039        // If we should initiate, create offer
1040        if will_initiate {
1041            self.initiate_connection(&full_peer_id, pool, relay_write_tx)
1042                .await?;
1043        }
1044
1045        Ok(())
1046    }
1047
1048    /// Initiate a connection to a peer (create and send offer)
1049    async fn initiate_connection(
1050        &self,
1051        peer_id: &PeerId,
1052        pool: PeerPool,
1053        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1054    ) -> Result<()> {
1055        let peer_key = peer_id.to_string();
1056
1057        info!(
1058            "Initiating connection to {} (pool: {:?})",
1059            peer_id.short(),
1060            pool
1061        );
1062
1063        // Create peer connection with content store and state events
1064        let mut peer = Peer::new_with_store_and_events(
1065            peer_id.clone(),
1066            PeerDirection::Outbound,
1067            self.my_peer_id.clone(),
1068            self.signaling_tx.clone(),
1069            self.config.stun_servers.clone(),
1070            self.store.clone(),
1071            Some(self.state_event_tx.clone()),
1072            self.nostr_relay.clone(),
1073        )
1074        .await?;
1075
1076        peer.setup_handlers().await?;
1077
1078        // Create offer
1079        let offer = peer.connect().await?;
1080
1081        // Update state
1082        {
1083            let mut peers = self.state.peers.write().await;
1084            if let Some(entry) = peers.get_mut(&peer_key) {
1085                entry.state = ConnectionState::Connecting;
1086                entry.peer = Some(peer);
1087                entry.pool = pool;
1088            }
1089        }
1090
1091        // Send offer
1092        let offer_msg = SignalingMessage::Offer {
1093            offer,
1094            recipient: peer_id.to_string(),
1095            peer_id: self.my_peer_id.uuid.clone(),
1096        };
1097        if relay_write_tx.send(offer_msg).is_err() {
1098            warn!("Failed to broadcast offer to {}", peer_id.short());
1099        }
1100
1101        info!("Sent offer to {}", peer_id.short());
1102
1103        Ok(())
1104    }
1105
1106    /// Handle incoming offer
1107    async fn handle_offer(
1108        &self,
1109        sender_pubkey: &str,
1110        their_uuid: &str,
1111        offer: serde_json::Value,
1112        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1113    ) -> Result<()> {
1114        debug!(
1115            "handle_offer ENTRY: sender={}, uuid={}",
1116            &sender_pubkey[..8.min(sender_pubkey.len())],
1117            their_uuid
1118        );
1119        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1120        let peer_key = full_peer_id.to_string();
1121
1122        // Classify the peer into a pool
1123        let pool = (self.peer_classifier)(sender_pubkey);
1124
1125        info!(
1126            "Received offer from {} (pool: {:?})",
1127            full_peer_id.short(),
1128            pool
1129        );
1130
1131        // Check if we already have this peer with an actual connection
1132        {
1133            let peers = self.state.peers.read().await;
1134            debug!(
1135                "Checking for existing peer, peer_key: {}, known_peers: {}",
1136                peer_key,
1137                peers.len()
1138            );
1139            if let Some(entry) = peers.get(&peer_key) {
1140                // Only skip if we have an actual peer connection (not just discovered)
1141                if entry.peer.is_some() {
1142                    debug!(
1143                        "Already have peer {} with connection, skipping offer",
1144                        full_peer_id.short()
1145                    );
1146                    return Ok(());
1147                }
1148                debug!(
1149                    "Peer {} exists but has no connection, proceeding",
1150                    full_peer_id.short()
1151                );
1152            } else {
1153                debug!(
1154                    "Peer {} not found in peers map, will create new entry",
1155                    full_peer_id.short()
1156                );
1157            }
1158        }
1159
1160        // Check pool limits
1161        let pool_counts = self.get_pool_counts().await;
1162        debug!(
1163            "Pool counts: {:?}, checking can_accept_peer for {:?}",
1164            pool_counts, pool
1165        );
1166        if !self.can_accept_peer(pool, &pool_counts) {
1167            warn!(
1168                "Rejecting offer from {} - pool {:?} is full",
1169                full_peer_id.short(),
1170                pool
1171            );
1172            return Ok(());
1173        }
1174        debug!("Pool check passed for {}", full_peer_id.short());
1175
1176        // Create peer connection with content store and state events
1177        debug!("Creating peer connection for {}", full_peer_id.short());
1178        let mut peer = Peer::new_with_store_and_events(
1179            full_peer_id.clone(),
1180            PeerDirection::Inbound,
1181            self.my_peer_id.clone(),
1182            self.signaling_tx.clone(),
1183            self.config.stun_servers.clone(),
1184            self.store.clone(),
1185            Some(self.state_event_tx.clone()),
1186            self.nostr_relay.clone(),
1187        )
1188        .await?;
1189        debug!("Peer connection created for {}", full_peer_id.short());
1190
1191        peer.setup_handlers().await?;
1192        debug!("Handlers set up for {}", full_peer_id.short());
1193
1194        // Handle offer and create answer
1195        let answer = peer.handle_offer(offer).await?;
1196        debug!("Answer created for {}", full_peer_id.short());
1197
1198        // Update state
1199        {
1200            let mut peers = self.state.peers.write().await;
1201            peers.insert(
1202                peer_key,
1203                PeerEntry {
1204                    peer_id: full_peer_id.clone(),
1205                    direction: PeerDirection::Inbound,
1206                    state: ConnectionState::Connecting,
1207                    last_seen: Instant::now(),
1208                    peer: Some(peer),
1209                    pool,
1210                    bytes_sent: 0,
1211                    bytes_received: 0,
1212                },
1213            );
1214        }
1215
1216        // Send answer
1217        // Note: peer_id is just the UUID, not full pubkey:uuid
1218        // The recipient will construct full peer_id from sender pubkey + this UUID
1219        let answer_msg = SignalingMessage::Answer {
1220            answer,
1221            recipient: full_peer_id.to_string(),
1222            peer_id: self.my_peer_id.uuid.clone(),
1223        };
1224        if relay_write_tx.send(answer_msg).is_err() {
1225            warn!("Failed to send answer to {}", full_peer_id.short());
1226        }
1227        info!("Sent answer to {}", full_peer_id.short());
1228
1229        Ok(())
1230    }
1231
1232    /// Handle incoming answer
1233    async fn handle_answer(
1234        &self,
1235        sender_pubkey: &str,
1236        their_uuid: &str,
1237        answer: serde_json::Value,
1238    ) -> Result<()> {
1239        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1240        let peer_key = full_peer_id.to_string();
1241
1242        info!("Received answer from {}", full_peer_id.short());
1243
1244        let mut peers = self.state.peers.write().await;
1245        if let Some(entry) = peers.get_mut(&peer_key) {
1246            // Skip if already connected - duplicate answers from multiple relays
1247            if entry.state == ConnectionState::Connected {
1248                debug!(
1249                    "Ignoring duplicate answer from {} - already connected",
1250                    full_peer_id.short()
1251                );
1252                return Ok(());
1253            }
1254            if let Some(ref mut peer) = entry.peer {
1255                // Check WebRTC signaling state before applying answer
1256                use webrtc::peer_connection::signaling_state::RTCSignalingState;
1257                let signaling_state = peer.signaling_state();
1258                if signaling_state != RTCSignalingState::HaveLocalOffer {
1259                    debug!(
1260                        "Ignoring answer from {} - signaling state is {:?}, not HaveLocalOffer",
1261                        full_peer_id.short(),
1262                        signaling_state
1263                    );
1264                    return Ok(());
1265                }
1266                peer.handle_answer(answer).await?;
1267                info!("Applied answer from {}", full_peer_id.short());
1268            } else {
1269                debug!("Peer {} has no connection object", full_peer_id.short());
1270            }
1271        } else {
1272            debug!("No peer found for key: {}", peer_key);
1273        }
1274
1275        Ok(())
1276    }
1277
1278    /// Handle incoming ICE candidate
1279    async fn handle_candidate(
1280        &self,
1281        sender_pubkey: &str,
1282        their_uuid: &str,
1283        candidate: serde_json::Value,
1284    ) -> Result<()> {
1285        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1286        let peer_key = full_peer_id.to_string();
1287
1288        info!("Received ICE candidate from {}", full_peer_id.short());
1289
1290        let mut peers = self.state.peers.write().await;
1291        if let Some(entry) = peers.get_mut(&peer_key) {
1292            if let Some(ref mut peer) = entry.peer {
1293                peer.handle_candidate(candidate).await?;
1294            }
1295        }
1296
1297        Ok(())
1298    }
1299
1300    /// Handle batched ICE candidates
1301    async fn handle_candidates(
1302        &self,
1303        sender_pubkey: &str,
1304        their_uuid: &str,
1305        candidates: Vec<serde_json::Value>,
1306    ) -> Result<()> {
1307        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1308        let peer_key = full_peer_id.to_string();
1309
1310        debug!(
1311            "Received {} candidates from {}",
1312            candidates.len(),
1313            full_peer_id.short()
1314        );
1315
1316        let mut peers = self.state.peers.write().await;
1317        if let Some(entry) = peers.get_mut(&peer_key) {
1318            if let Some(ref mut peer) = entry.peer {
1319                for candidate in candidates {
1320                    if let Err(e) = peer.handle_candidate(candidate).await {
1321                        debug!("Failed to add candidate: {}", e);
1322                    }
1323                }
1324            }
1325        }
1326
1327        Ok(())
1328    }
1329
1330    /// Handle peer state change events from peer connections
1331    async fn handle_peer_state_event(&self, event: PeerStateEvent) {
1332        match event {
1333            PeerStateEvent::Connected(peer_id) => {
1334                let peer_key = peer_id.to_string();
1335                let mut peers = self.state.peers.write().await;
1336                if let Some(entry) = peers.get_mut(&peer_key) {
1337                    if entry.state != ConnectionState::Connected {
1338                        info!("Peer {} connected (via state event)", peer_id.short());
1339                        entry.state = ConnectionState::Connected;
1340                        // Update connected count
1341                        self.state
1342                            .connected_count
1343                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1344                    }
1345                }
1346            }
1347            PeerStateEvent::Failed(peer_id) => {
1348                let peer_key = peer_id.to_string();
1349                info!(
1350                    "Peer {} connection failed - removing from pool",
1351                    peer_id.short()
1352                );
1353                let mut peers = self.state.peers.write().await;
1354                if let Some(entry) = peers.remove(&peer_key) {
1355                    // Decrement connected count if was connected
1356                    if entry.state == ConnectionState::Connected {
1357                        self.state
1358                            .connected_count
1359                            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
1360                    }
1361                    // Close the peer connection if it exists
1362                    if let Some(peer) = entry.peer {
1363                        let _ = peer.close().await;
1364                    }
1365                }
1366            }
1367            PeerStateEvent::Disconnected(peer_id) => {
1368                let peer_key = peer_id.to_string();
1369                info!("Peer {} disconnected - removing from pool", peer_id.short());
1370                let mut peers = self.state.peers.write().await;
1371                if let Some(entry) = peers.remove(&peer_key) {
1372                    // Decrement connected count if was connected
1373                    if entry.state == ConnectionState::Connected {
1374                        self.state
1375                            .connected_count
1376                            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
1377                    }
1378                    // Close the peer connection if it exists
1379                    if let Some(peer) = entry.peer {
1380                        let _ = peer.close().await;
1381                    }
1382                }
1383            }
1384        }
1385    }
1386
1387    /// Cleanup stale peers and sync connection states (fallback, runs every 30s)
1388    async fn cleanup_stale_peers(&self) {
1389        let mut peers = self.state.peers.write().await;
1390        let mut connected_count = 0;
1391        let mut to_remove = Vec::new();
1392        let stale_timeout = Duration::from_secs(60); // Remove peers stuck in Discovered/Connecting for 60s
1393
1394        for (key, entry) in peers.iter_mut() {
1395            if let Some(ref peer) = entry.peer {
1396                // Sync connected state as fallback (in case event was missed)
1397                if peer.is_connected() {
1398                    if entry.state != ConnectionState::Connected {
1399                        info!(
1400                            "Peer {} is now connected (sync fallback)",
1401                            entry.peer_id.short()
1402                        );
1403                        entry.state = ConnectionState::Connected;
1404                    }
1405                    connected_count += 1;
1406                } else if entry.state == ConnectionState::Connecting
1407                    && entry.last_seen.elapsed() > stale_timeout
1408                {
1409                    // Peer stuck in Connecting for too long - mark for removal
1410                    info!(
1411                        "Removing stale peer {} (stuck in Connecting for {:?})",
1412                        entry.peer_id.short(),
1413                        entry.last_seen.elapsed()
1414                    );
1415                    to_remove.push(key.clone());
1416                }
1417            } else if entry.state == ConnectionState::Discovered
1418                && entry.last_seen.elapsed() > stale_timeout
1419            {
1420                // Discovered peer with no actual connection - remove
1421                debug!("Removing stale discovered peer {}", entry.peer_id.short());
1422                to_remove.push(key.clone());
1423            }
1424        }
1425
1426        // Remove stale peers
1427        for key in to_remove {
1428            if let Some(entry) = peers.remove(&key) {
1429                if let Some(peer) = entry.peer {
1430                    let _ = peer.close().await;
1431                }
1432            }
1433        }
1434
1435        self.state
1436            .connected_count
1437            .store(connected_count, std::sync::atomic::Ordering::Relaxed);
1438    }
1439}
1440
1441// Keep the old PeerState for backward compatibility with tests
1442#[allow(dead_code)]
1443#[derive(Debug, Clone)]
1444pub struct PeerState {
1445    pub peer_id: PeerId,
1446    pub direction: PeerDirection,
1447    pub state: String,
1448    pub last_seen: Instant,
1449}