hashtree_cli/webrtc/
signaling.rs

1//! WebRTC signaling over Nostr relays
2//!
3//! Protocol (compatible with hashtree-ts):
4//! - All signaling uses ephemeral kind 25050
5//! - Hello messages: #l: "hello" tag, broadcast for peer discovery (unencrypted)
6//! - Directed signaling (offer, answer, candidate, candidates): NIP-17 style
7//!   gift wrap for privacy - wrapped with ephemeral key, #p tag with recipient
8//!
9//! Security: Directed messages use gift wrapping with ephemeral keys so that
10//! relays cannot see the actual sender or correlate messages.
11
12use anyhow::Result;
13use futures::{SinkExt, StreamExt};
14use nostr::{nips::nip44, ClientMessage, EventBuilder, Filter, JsonUtil, Keys, Kind, PublicKey, RelayMessage, Tag};
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use tokio::sync::{mpsc, Mutex, RwLock};
19use tokio_tungstenite::{connect_async, tungstenite::Message};
20use tracing::{debug, error, info, warn};
21
22use super::peer::{ContentStore, Peer, PendingRequest};
23use super::types::{
24    PeerDirection, PeerId, PeerPool, PeerStateEvent, PeerStatus, SignalingMessage, WebRTCConfig, WEBRTC_KIND, HELLO_TAG,
25};
26
27/// Callback type for classifying peers into pools
28pub type PeerClassifier = Arc<dyn Fn(&str) -> PeerPool + Send + Sync>;
29
30/// Connection state for a peer
31#[derive(Debug, Clone, PartialEq)]
32pub enum ConnectionState {
33    Discovered,
34    Connecting,
35    Connected,
36    Failed,
37}
38
39impl std::fmt::Display for ConnectionState {
40    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41        match self {
42            ConnectionState::Discovered => write!(f, "discovered"),
43            ConnectionState::Connecting => write!(f, "connecting"),
44            ConnectionState::Connected => write!(f, "connected"),
45            ConnectionState::Failed => write!(f, "failed"),
46        }
47    }
48}
49
50/// Peer entry in the manager
51pub struct PeerEntry {
52    pub peer_id: PeerId,
53    pub direction: PeerDirection,
54    pub state: ConnectionState,
55    pub last_seen: Instant,
56    pub peer: Option<Peer>,
57    pub pool: PeerPool,
58}
59
60/// Shared state for WebRTC manager
61pub struct WebRTCState {
62    pub peers: RwLock<HashMap<String, PeerEntry>>,
63    pub connected_count: std::sync::atomic::AtomicUsize,
64}
65
66impl WebRTCState {
67    pub fn new() -> Self {
68        Self {
69            peers: RwLock::new(HashMap::new()),
70            connected_count: std::sync::atomic::AtomicUsize::new(0),
71        }
72    }
73
74    /// Request content by hash from connected peers
75    /// Queries peers sequentially with 500ms intervals until one responds
76    /// Returns the first successful response, or None if no peer has it
77    pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
78        use super::types::{DataRequest, MAX_HTL, encode_request};
79
80        let peers = self.peers.read().await;
81
82        // Collect connected peers with data channels
83        // We need to collect the Arc references first, then acquire locks outside the iterator
84        let peer_refs: Vec<_> = peers
85            .values()
86            .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
87            .filter_map(|p| {
88                p.peer.as_ref().map(|peer| {
89                    (p.peer_id.short(), peer.data_channel.clone(), peer.pending_requests.clone())
90                })
91            })
92            .collect();
93
94        drop(peers); // Release the read lock
95
96        // Now acquire locks and filter to peers with active data channels
97        let mut connected_peers: Vec<(String, Arc<Mutex<HashMap<String, PendingRequest>>>, Arc<webrtc::data_channel::RTCDataChannel>)> = Vec::new();
98        for (peer_id, dc_mutex, pending) in peer_refs {
99            let dc_guard = dc_mutex.lock().await;
100            if let Some(dc) = dc_guard.as_ref() {
101                connected_peers.push((peer_id, pending, dc.clone()));
102            }
103        }
104
105        if connected_peers.is_empty() {
106            debug!("No connected peers to query for {}", &hash_hex[..8.min(hash_hex.len())]);
107            return None;
108        }
109
110        debug!(
111            "Querying {} connected peers for {} (sequential with 500ms delay)",
112            connected_peers.len(),
113            &hash_hex[..8.min(hash_hex.len())]
114        );
115
116        // Convert hex to binary hash once
117        let hash_bytes = match hex::decode(hash_hex) {
118            Ok(b) => b,
119            Err(_) => return None,
120        };
121
122        // Query peers sequentially with 500ms delay between each
123        for (_i, (peer_id, pending_requests, dc)) in connected_peers.into_iter().enumerate() {
124            debug!("Querying peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
125
126            // Create response channel
127            let (tx, rx) = tokio::sync::oneshot::channel();
128
129            // Store pending request
130            {
131                let mut pending = pending_requests.lock().await;
132                pending.insert(
133                    hash_hex.to_string(),
134                    super::PendingRequest {
135                        hash: hash_bytes.clone(),
136                        response_tx: tx,
137                    },
138                );
139            }
140
141            // Send request
142            let req = DataRequest {
143                h: hash_bytes.clone(),
144                htl: MAX_HTL,
145            };
146            if let Ok(wire) = encode_request(&req) {
147                if dc.send(&bytes::Bytes::from(wire)).await.is_ok() {
148                    // Wait 500ms for response from this peer
149                    match tokio::time::timeout(std::time::Duration::from_millis(500), rx).await {
150                        Ok(Ok(Some(data))) => {
151                            debug!("Got response from peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
152                            return Some(data);
153                        }
154                        _ => {
155                            // Timeout or no data - clean up and try next peer
156                            debug!("No response from peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
157                        }
158                    }
159                }
160            }
161
162            // Clean up pending request
163            let mut pending = pending_requests.lock().await;
164            pending.remove(hash_hex);
165        }
166
167        debug!("No peer had data for {}", &hash_hex[..8.min(hash_hex.len())]);
168        None
169    }
170}
171
172/// WebRTC manager handles peer discovery and connection management
173pub struct WebRTCManager {
174    config: WebRTCConfig,
175    my_peer_id: PeerId,
176    keys: Keys,
177    state: Arc<WebRTCState>,
178    shutdown: Arc<tokio::sync::watch::Sender<bool>>,
179    shutdown_rx: tokio::sync::watch::Receiver<bool>,
180    /// Channel to send signaling messages to relays
181    signaling_tx: mpsc::Sender<SignalingMessage>,
182    signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
183    /// Optional content store for serving hash requests
184    store: Option<Arc<dyn ContentStore>>,
185    /// Peer classifier for pool assignment
186    peer_classifier: PeerClassifier,
187    /// Channel for peer state events (connection success/failure)
188    state_event_tx: mpsc::Sender<PeerStateEvent>,
189    state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
190}
191
192impl WebRTCManager {
193    /// Create a new WebRTC manager
194    pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
195        let pubkey = keys.public_key().to_hex();
196        let my_peer_id = PeerId::new(pubkey, None);
197        let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
198        let (signaling_tx, signaling_rx) = mpsc::channel(100);
199        let (state_event_tx, state_event_rx) = mpsc::channel(100);
200
201        // Default classifier: all peers go to 'other' pool
202        let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
203
204        Self {
205            config,
206            my_peer_id,
207            keys,
208            state: Arc::new(WebRTCState::new()),
209            shutdown: Arc::new(shutdown),
210            shutdown_rx,
211            signaling_tx,
212            signaling_rx: Some(signaling_rx),
213            store: None,
214            peer_classifier,
215            state_event_tx,
216            state_event_rx: Some(state_event_rx),
217        }
218    }
219
220    /// Create a new WebRTC manager with a peer classifier
221    pub fn new_with_classifier(keys: Keys, config: WebRTCConfig, classifier: PeerClassifier) -> Self {
222        let mut manager = Self::new(keys, config);
223        manager.peer_classifier = classifier;
224        manager
225    }
226
227    /// Create a new WebRTC manager with a content store for serving hash requests
228    pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
229        let mut manager = Self::new(keys, config);
230        manager.store = Some(store);
231        manager
232    }
233
234    /// Create a new WebRTC manager with store and classifier
235    pub fn new_with_store_and_classifier(
236        keys: Keys,
237        config: WebRTCConfig,
238        store: Arc<dyn ContentStore>,
239        classifier: PeerClassifier,
240    ) -> Self {
241        let mut manager = Self::new(keys, config);
242        manager.store = Some(store);
243        manager.peer_classifier = classifier;
244        manager
245    }
246
247    /// Set the content store for serving hash requests
248    pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
249        self.store = Some(store);
250    }
251
252    /// Set the peer classifier
253    pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
254        self.peer_classifier = classifier;
255    }
256
257    /// Get my peer ID
258    pub fn my_peer_id(&self) -> &PeerId {
259        &self.my_peer_id
260    }
261
262    /// Get shared state for external access
263    pub fn state(&self) -> Arc<WebRTCState> {
264        self.state.clone()
265    }
266
267    /// Signal shutdown
268    pub fn shutdown(&self) {
269        let _ = self.shutdown.send(true);
270    }
271
272    /// Get connected peer count
273    pub async fn connected_count(&self) -> usize {
274        self.state
275            .connected_count
276            .load(std::sync::atomic::Ordering::Relaxed)
277    }
278
279    /// Get all peer statuses
280    pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
281        self.state
282            .peers
283            .read()
284            .await
285            .values()
286            .map(|p| PeerStatus {
287                peer_id: p.peer_id.to_string(),
288                pubkey: p.peer_id.pubkey.clone(),
289                state: p.state.to_string(),
290                direction: p.direction,
291                connected_at: Some(p.last_seen),
292                pool: p.pool,
293            })
294            .collect()
295    }
296
297    /// Get pool counts
298    /// Returns (follows_connected, follows_active, other_connected, other_active)
299    /// "active" = Connected or Connecting (excludes Discovered and Failed)
300    pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
301        let peers = self.state.peers.read().await;
302        let mut follows_connected = 0;
303        let mut follows_active = 0;
304        let mut other_connected = 0;
305        let mut other_active = 0;
306
307        for entry in peers.values() {
308            // Only count Connected or Connecting as "active" connections
309            // Discovered peers are just seen hellos, not real connections
310            let is_active = entry.state == ConnectionState::Connected
311                || entry.state == ConnectionState::Connecting;
312
313            match entry.pool {
314                PeerPool::Follows => {
315                    if is_active {
316                        follows_active += 1;
317                    }
318                    if entry.state == ConnectionState::Connected {
319                        follows_connected += 1;
320                    }
321                }
322                PeerPool::Other => {
323                    if is_active {
324                        other_active += 1;
325                    }
326                    if entry.state == ConnectionState::Connected {
327                        other_connected += 1;
328                    }
329                }
330            }
331        }
332
333        (follows_connected, follows_active, other_connected, other_active)
334    }
335
336    /// Check if we can accept a peer in a given pool
337    fn can_accept_peer(&self, pool: PeerPool, pool_counts: &(usize, usize, usize, usize)) -> bool {
338        let (_, follows_active, _, other_active) = *pool_counts;
339        match pool {
340            PeerPool::Follows => follows_active < self.config.pools.follows.max_connections,
341            PeerPool::Other => other_active < self.config.pools.other.max_connections,
342        }
343    }
344
345    /// Check if a pool is satisfied
346    #[allow(dead_code)]
347    fn is_pool_satisfied(&self, pool: PeerPool, pool_counts: &(usize, usize, usize, usize)) -> bool {
348        let (follows_connected, _, other_connected, _) = *pool_counts;
349        match pool {
350            PeerPool::Follows => follows_connected >= self.config.pools.follows.satisfied_connections,
351            PeerPool::Other => other_connected >= self.config.pools.other.satisfied_connections,
352        }
353    }
354
355    /// Check if both pools are satisfied
356    #[allow(dead_code)]
357    fn is_satisfied(&self, pool_counts: &(usize, usize, usize, usize)) -> bool {
358        self.is_pool_satisfied(PeerPool::Follows, pool_counts)
359            && self.is_pool_satisfied(PeerPool::Other, pool_counts)
360    }
361
362    /// Check if we should initiate connection (tie-breaking)
363    /// Lower UUID initiates - same as iris-client/hashtree-ts
364    fn should_initiate(&self, their_uuid: &str) -> bool {
365        self.my_peer_id.uuid < their_uuid.to_string()
366    }
367
368    /// Start the WebRTC manager - connects to relays and handles signaling
369    pub async fn run(&mut self) -> Result<()> {
370        info!(
371            "Starting WebRTC manager with peer ID: {}",
372            self.my_peer_id.short()
373        );
374
375        let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr::Event)>(100);
376
377        // Take the signaling receiver
378        let mut signaling_rx = self.signaling_rx.take().expect("signaling_rx already taken");
379
380        // Take the state event receiver
381        let mut state_event_rx = self.state_event_rx.take().expect("state_event_rx already taken");
382
383        // Create a shared write channel for all relay tasks
384        let (relay_write_tx, _) = tokio::sync::broadcast::channel::<SignalingMessage>(100);
385
386        // Spawn relay connections
387        for relay_url in &self.config.relays {
388            let url = relay_url.clone();
389            let event_tx = event_tx.clone();
390            let shutdown_rx = self.shutdown_rx.clone();
391            let keys = self.keys.clone();
392            let my_peer_id = self.my_peer_id.clone();
393            let hello_interval = Duration::from_millis(self.config.hello_interval_ms);
394            let relay_write_rx = relay_write_tx.subscribe();
395
396            tokio::spawn(async move {
397                if let Err(e) = Self::relay_task(
398                    url.clone(),
399                    event_tx,
400                    shutdown_rx,
401                    keys,
402                    my_peer_id,
403                    hello_interval,
404                    relay_write_rx,
405                )
406                .await
407                {
408                    error!("Relay {} error: {}", url, e);
409                }
410            });
411        }
412
413        // Process incoming events and outgoing signaling messages
414        let mut shutdown_rx = self.shutdown_rx.clone();
415        // Cleanup interval - run every 30 seconds as a fallback (not for real-time sync)
416        let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
417        loop {
418            tokio::select! {
419                _ = shutdown_rx.changed() => {
420                    if *shutdown_rx.borrow() {
421                        info!("WebRTC manager shutting down");
422                        break;
423                    }
424                }
425                Some((relay, event)) = event_rx.recv() => {
426                    if let Err(e) = self.handle_event(&relay, &event, &relay_write_tx).await {
427                        debug!("Error handling event from {}: {}", relay, e);
428                    }
429                }
430                Some(msg) = signaling_rx.recv() => {
431                    // Forward signaling messages to relay broadcast
432                    let _ = relay_write_tx.send(msg);
433                }
434                Some(event) = state_event_rx.recv() => {
435                    // Handle peer state events (connected, failed, disconnected)
436                    self.handle_peer_state_event(event).await;
437                }
438                _ = cleanup_interval.tick() => {
439                    // Periodic cleanup of stale peers and state sync (fallback)
440                    self.cleanup_stale_peers().await;
441                }
442            }
443        }
444
445        Ok(())
446    }
447
448    /// Connect to a single relay and handle messages
449    async fn relay_task(
450        url: String,
451        event_tx: mpsc::Sender<(String, nostr::Event)>,
452        mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
453        keys: Keys,
454        my_peer_id: PeerId,
455        hello_interval: Duration,
456        mut signaling_rx: tokio::sync::broadcast::Receiver<SignalingMessage>,
457    ) -> Result<()> {
458        info!("Connecting to relay: {}", url);
459
460        let (ws_stream, _) = connect_async(&url).await?;
461        let (mut write, mut read) = ws_stream.split();
462
463        // Subscribe to webrtc events - two filters:
464        // 1. Hello messages: kind 25050 with #l: "hello" tag
465        // 2. Directed messages: kind 25050 with #p tag (our pubkey)
466        let hello_filter = Filter::new()
467            .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
468            .custom_tag(
469                nostr::SingleLetterTag::lowercase(nostr::Alphabet::L),
470                vec![HELLO_TAG],
471            )
472            .since(nostr::Timestamp::now() - Duration::from_secs(60));
473
474        let directed_filter = Filter::new()
475            .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
476            .custom_tag(
477                nostr::SingleLetterTag::lowercase(nostr::Alphabet::P),
478                vec![keys.public_key().to_hex()],
479            )
480            .since(nostr::Timestamp::now() - Duration::from_secs(60));
481
482        let sub_id = nostr::SubscriptionId::generate();
483        let sub_msg = ClientMessage::req(sub_id.clone(), vec![hello_filter, directed_filter]);
484        write.send(Message::Text(sub_msg.as_json().into())).await?;
485
486        info!("Subscribed to {} for WebRTC events (kind {})", url, WEBRTC_KIND);
487
488        let mut last_hello = Instant::now() - hello_interval; // Send immediately
489        let mut hello_ticker = tokio::time::interval(Duration::from_secs(1));
490
491        loop {
492            tokio::select! {
493                _ = shutdown_rx.changed() => {
494                    if *shutdown_rx.borrow() {
495                        break;
496                    }
497                }
498                _ = hello_ticker.tick() => {
499                    // Send hello periodically
500                    if last_hello.elapsed() >= hello_interval {
501                        let hello = SignalingMessage::hello(&my_peer_id.uuid);
502                        if let Ok(event) = Self::create_signaling_event(&keys, &hello).await {
503                            let msg = ClientMessage::event(event);
504                            if write.send(Message::Text(msg.as_json().into())).await.is_ok() {
505                                debug!("Sent hello to {}", url);
506                            }
507                        }
508                        last_hello = Instant::now();
509                    }
510                }
511                // Handle outgoing signaling messages
512                Ok(signaling_msg) = signaling_rx.recv() => {
513                    info!("Sending {} via {}", signaling_msg.msg_type(), url);
514                    if let Ok(event) = Self::create_signaling_event(&keys, &signaling_msg).await {
515                        let event_id = event.id.to_string();
516                        let msg = ClientMessage::event(event);
517                        if write.send(Message::Text(msg.as_json().into())).await.is_ok() {
518                            info!("Sent {} to {} (event id: {})", signaling_msg.msg_type(), url, &event_id[..16]);
519                        }
520                    }
521                }
522                msg = read.next() => {
523                    match msg {
524                        Some(Ok(Message::Text(text))) => {
525                            if let Ok(relay_msg) = RelayMessage::from_json(&text) {
526                                if let RelayMessage::Event { event, .. } = relay_msg {
527                                    let _ = event_tx.send((url.clone(), *event)).await;
528                                }
529                            }
530                        }
531                        Some(Err(e)) => {
532                            error!("WebSocket error from {}: {}", url, e);
533                            break;
534                        }
535                        None => {
536                            warn!("WebSocket closed: {}", url);
537                            break;
538                        }
539                        _ => {}
540                    }
541                }
542            }
543        }
544
545        Ok(())
546    }
547
548    /// Create a signaling event
549    ///
550    /// For directed messages (offer, answer, candidate, candidates), use NIP-17 style
551    /// gift wrapping with ephemeral keys for privacy.
552    /// Hello messages use kind 25050 with #l: "hello" tag and peerId.
553    async fn create_signaling_event(keys: &Keys, msg: &SignalingMessage) -> Result<nostr::Event> {
554        // Check if message has a recipient (needs gift wrapping)
555        if let Some(recipient_str) = msg.recipient() {
556            // Parse recipient to get their pubkey
557            if let Some(peer_id) = PeerId::from_string(recipient_str) {
558                let recipient_pubkey = PublicKey::from_hex(&peer_id.pubkey)?;
559
560                // Create seal with sender's actual pubkey (the "rumor")
561                let seal = serde_json::json!({
562                    "pubkey": keys.public_key().to_hex(),
563                    "kind": WEBRTC_KIND,
564                    "content": serde_json::to_string(msg)?,
565                    "tags": []
566                });
567
568                // Generate ephemeral keypair for the wrapper
569                let ephemeral_keys = Keys::generate();
570
571                // Encrypt the seal for the recipient using ephemeral key (NIP-44)
572                let encrypted_content = nip44::encrypt(
573                    ephemeral_keys.secret_key(),
574                    &recipient_pubkey,
575                    &seal.to_string(),
576                    nip44::Version::V2
577                )?;
578
579                // Create wrapper event with ephemeral key
580                let created_at = nostr::Timestamp::now();
581                let expiration = created_at + Duration::from_secs(5 * 60); // 5 minutes
582
583                let tags = vec![
584                    Tag::parse(&["p", &recipient_pubkey.to_hex()])?,
585                    Tag::parse(&["expiration", &expiration.as_u64().to_string()])?,
586                ];
587
588                let event = EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), encrypted_content, tags)
589                    .to_event(&ephemeral_keys)?;
590
591                return Ok(event);
592            }
593        }
594
595        // Hello messages - kind 25050 with #l: "hello" tag and peerId
596        let tags = vec![
597            Tag::parse(&["l", HELLO_TAG])?,
598            Tag::parse(&["peerId", msg.peer_id()])?,
599        ];
600
601        let event = EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), "", tags)
602            .to_event(keys)?;
603
604        Ok(event)
605    }
606
607    /// Handle an incoming event
608    ///
609    /// Messages may be:
610    /// 1. Hello messages: kind 25050 with #l: "hello" tag and peerId
611    /// 2. Gift-wrapped directed messages: kind 25050 with #p tag, encrypted with ephemeral key
612    async fn handle_event(
613        &self,
614        relay: &str,
615        event: &nostr::Event,
616        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
617    ) -> Result<()> {
618        // Must be kind 25050
619        if event.kind != Kind::Ephemeral(WEBRTC_KIND as u16) {
620            return Ok(());
621        }
622
623        // Helper to get tag value
624        let get_tag = |name: &str| -> Option<String> {
625            event.tags.iter().find_map(|tag| {
626                let v: Vec<String> = tag.clone().to_vec();
627                if v.len() >= 2 && v[0] == name {
628                    Some(v[1].clone())
629                } else {
630                    None
631                }
632            })
633        };
634
635        // Check if this is a hello message (#l: "hello" tag)
636        let l_tag = get_tag("l");
637        if l_tag.as_deref() == Some(HELLO_TAG) {
638            let sender_pubkey = event.pubkey.to_hex();
639
640            // Skip our own hello messages
641            if sender_pubkey == self.my_peer_id.pubkey {
642                return Ok(());
643            }
644
645            if let Some(their_uuid) = get_tag("peerId") {
646                debug!("Received hello from {} via {}", &sender_pubkey[..8], relay);
647                self.handle_hello(&sender_pubkey, &their_uuid, relay_write_tx)
648                    .await?;
649            }
650            return Ok(());
651        }
652
653        // Check if this is a directed message for us (#p tag with our pubkey)
654        let p_tag = get_tag("p");
655        if p_tag.as_deref() != Some(&self.keys.public_key().to_hex()) {
656            // Not for us - ignore silently
657            return Ok(());
658        }
659
660        // Gift-wrapped directed message - decrypt using our key and ephemeral sender's pubkey
661        if event.content.is_empty() {
662            return Ok(());
663        }
664
665        // Try to unwrap the gift - decrypt with our key and the ephemeral sender's pubkey
666        let seal: serde_json::Value = match nip44::decrypt(self.keys.secret_key(), &event.pubkey, &event.content) {
667            Ok(plaintext) => {
668                match serde_json::from_str(&plaintext) {
669                    Ok(v) => v,
670                    Err(_) => return Ok(()),
671                }
672            }
673            Err(_) => {
674                // Can't decrypt - not for us or invalid
675                return Ok(());
676            }
677        };
678
679        // Extract the actual sender's pubkey and content from the seal
680        let sender_pubkey = seal.get("pubkey")
681            .and_then(|v| v.as_str())
682            .ok_or_else(|| anyhow::anyhow!("Missing pubkey in seal"))?;
683
684        // Skip our own messages
685        if sender_pubkey == self.my_peer_id.pubkey {
686            return Ok(());
687        }
688
689        let content = seal.get("content")
690            .and_then(|v| v.as_str())
691            .ok_or_else(|| anyhow::anyhow!("Missing content in seal"))?;
692
693        let msg: SignalingMessage = serde_json::from_str(content)?;
694
695        debug!(
696            "Received {} from {} via {} (gift-wrapped)",
697            msg.msg_type(),
698            &sender_pubkey[..8],
699            relay
700        );
701
702        match msg {
703            SignalingMessage::Hello { .. } => {
704                // Hello messages should come via tags, not gift wrap
705                return Ok(());
706            }
707            SignalingMessage::Offer {
708                recipient,
709                peer_id: their_uuid,
710                offer,
711            } => {
712                if recipient != self.my_peer_id.to_string() {
713                    return Ok(()); // Not for us
714                }
715                self.handle_offer(&sender_pubkey, &their_uuid, offer, relay_write_tx)
716                    .await?;
717            }
718            SignalingMessage::Answer {
719                recipient,
720                peer_id: their_uuid,
721                answer,
722            } => {
723                if recipient != self.my_peer_id.to_string() {
724                    return Ok(());
725                }
726                self.handle_answer(&sender_pubkey, &their_uuid, answer)
727                    .await?;
728            }
729            SignalingMessage::Candidate {
730                recipient,
731                peer_id: their_uuid,
732                candidate,
733            } => {
734                if recipient != self.my_peer_id.to_string() {
735                    return Ok(());
736                }
737                self.handle_candidate(&sender_pubkey, &their_uuid, candidate)
738                    .await?;
739            }
740            SignalingMessage::Candidates {
741                recipient,
742                peer_id: their_uuid,
743                candidates,
744            } => {
745                if recipient != self.my_peer_id.to_string() {
746                    return Ok(());
747                }
748                self.handle_candidates(&sender_pubkey, &their_uuid, candidates)
749                    .await?;
750            }
751        }
752
753        Ok(())
754    }
755
756    /// Handle incoming hello message
757    async fn handle_hello(
758        &self,
759        sender_pubkey: &str,
760        their_uuid: &str,
761        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
762    ) -> Result<()> {
763        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
764        let peer_key = full_peer_id.to_string();
765
766        // Check if we already have this peer
767        {
768            let peers = self.state.peers.read().await;
769            if let Some(entry) = peers.get(&peer_key) {
770                // Already connected or connecting, just update last_seen
771                if entry.state == ConnectionState::Connected
772                    || entry.state == ConnectionState::Connecting
773                {
774                    return Ok(());
775                }
776            }
777        }
778
779        // Classify the peer into a pool
780        let pool = (self.peer_classifier)(sender_pubkey);
781
782        // Check pool limits
783        let pool_counts = self.get_pool_counts().await;
784        if !self.can_accept_peer(pool, &pool_counts) {
785            debug!("Ignoring hello from {} - pool {:?} is full", full_peer_id.short(), pool);
786            return Ok(());
787        }
788
789        // Decide if we should initiate based on tie-breaking
790        let should_initiate = self.should_initiate(their_uuid);
791
792        // If pool is already satisfied, don't initiate new outbound connections
793        // This reserves space for inbound connections
794        let pool_satisfied = self.is_pool_satisfied(pool, &pool_counts);
795        let will_initiate = should_initiate && !pool_satisfied;
796
797        info!(
798            "Discovered peer: {} (pool: {:?}, initiate: {}, pool_satisfied: {})",
799            full_peer_id.short(),
800            pool,
801            will_initiate,
802            pool_satisfied
803        );
804
805        // If we're not initiating and pool is satisfied, don't even add to discovered
806        // (we won't accept their offer either since pool check happens in handle_offer)
807        if !will_initiate && pool_satisfied {
808            debug!("Pool {:?} is satisfied, not tracking peer {}", pool, full_peer_id.short());
809            return Ok(());
810        }
811
812        // Create peer entry with pool assignment
813        {
814            let mut peers = self.state.peers.write().await;
815            peers.insert(
816                peer_key.clone(),
817                PeerEntry {
818                    peer_id: full_peer_id.clone(),
819                    direction: if will_initiate {
820                        PeerDirection::Outbound
821                    } else {
822                        PeerDirection::Inbound
823                    },
824                    state: ConnectionState::Discovered,
825                    last_seen: Instant::now(),
826                    peer: None,
827                    pool,
828                },
829            );
830        }
831
832        // If we should initiate, create offer
833        if will_initiate {
834            self.initiate_connection(&full_peer_id, pool, relay_write_tx)
835                .await?;
836        }
837
838        Ok(())
839    }
840
841    /// Initiate a connection to a peer (create and send offer)
842    async fn initiate_connection(
843        &self,
844        peer_id: &PeerId,
845        pool: PeerPool,
846        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
847    ) -> Result<()> {
848        let peer_key = peer_id.to_string();
849
850        info!("Initiating connection to {} (pool: {:?})", peer_id.short(), pool);
851
852        // Create peer connection with content store and state events
853        let mut peer = Peer::new_with_store_and_events(
854            peer_id.clone(),
855            PeerDirection::Outbound,
856            self.my_peer_id.clone(),
857            self.signaling_tx.clone(),
858            self.config.stun_servers.clone(),
859            self.store.clone(),
860            Some(self.state_event_tx.clone()),
861        )
862        .await?;
863
864        peer.setup_handlers().await?;
865
866        // Create offer
867        let offer = peer.connect().await?;
868
869        // Update state
870        {
871            let mut peers = self.state.peers.write().await;
872            if let Some(entry) = peers.get_mut(&peer_key) {
873                entry.state = ConnectionState::Connecting;
874                entry.peer = Some(peer);
875                entry.pool = pool;
876            }
877        }
878
879        // Send offer
880        let offer_msg = SignalingMessage::Offer {
881            offer,
882            recipient: peer_id.to_string(),
883            peer_id: self.my_peer_id.uuid.clone(),
884        };
885        if relay_write_tx.send(offer_msg).is_err() {
886            warn!("Failed to broadcast offer to {}", peer_id.short());
887        }
888
889        info!("Sent offer to {}", peer_id.short());
890
891        Ok(())
892    }
893
894    /// Handle incoming offer
895    async fn handle_offer(
896        &self,
897        sender_pubkey: &str,
898        their_uuid: &str,
899        offer: serde_json::Value,
900        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
901    ) -> Result<()> {
902        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
903        let peer_key = full_peer_id.to_string();
904
905        // Classify the peer into a pool
906        let pool = (self.peer_classifier)(sender_pubkey);
907
908        info!("Received offer from {} (pool: {:?})", full_peer_id.short(), pool);
909
910        // Check if we already have this peer with an actual connection
911        {
912            let peers = self.state.peers.read().await;
913            if let Some(entry) = peers.get(&peer_key) {
914                // Only skip if we have an actual peer connection (not just discovered)
915                if entry.peer.is_some() {
916                    debug!("Already have peer {} with connection, skipping offer", full_peer_id.short());
917                    return Ok(());
918                }
919            }
920        }
921
922        // Check pool limits
923        let pool_counts = self.get_pool_counts().await;
924        if !self.can_accept_peer(pool, &pool_counts) {
925            warn!("Rejecting offer from {} - pool {:?} is full", full_peer_id.short(), pool);
926            return Ok(());
927        }
928        // Create peer connection with content store and state events
929        let mut peer = Peer::new_with_store_and_events(
930            full_peer_id.clone(),
931            PeerDirection::Inbound,
932            self.my_peer_id.clone(),
933            self.signaling_tx.clone(),
934            self.config.stun_servers.clone(),
935            self.store.clone(),
936            Some(self.state_event_tx.clone()),
937        )
938        .await?;
939
940        peer.setup_handlers().await?;
941
942        // Handle offer and create answer
943        let answer = peer.handle_offer(offer).await?;
944
945        // Update state
946        {
947            let mut peers = self.state.peers.write().await;
948            peers.insert(
949                peer_key,
950                PeerEntry {
951                    peer_id: full_peer_id.clone(),
952                    direction: PeerDirection::Inbound,
953                    state: ConnectionState::Connecting,
954                    last_seen: Instant::now(),
955                    peer: Some(peer),
956                    pool,
957                },
958            );
959        }
960
961        // Send answer
962        let answer_msg = SignalingMessage::Answer {
963            answer,
964            recipient: full_peer_id.to_string(),
965            peer_id: self.my_peer_id.to_string(),
966        };
967        if relay_write_tx.send(answer_msg).is_err() {
968            warn!("Failed to send answer to {}", full_peer_id.short());
969        }
970        info!("Sent answer to {}", full_peer_id.short());
971
972        Ok(())
973    }
974
975    /// Handle incoming answer
976    async fn handle_answer(
977        &self,
978        sender_pubkey: &str,
979        their_uuid: &str,
980        answer: serde_json::Value,
981    ) -> Result<()> {
982        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
983        let peer_key = full_peer_id.to_string();
984
985        info!("Received answer from {}", full_peer_id.short());
986
987        let mut peers = self.state.peers.write().await;
988        if let Some(entry) = peers.get_mut(&peer_key) {
989            // Skip if already connected - duplicate answers from multiple relays
990            if entry.state == ConnectionState::Connected {
991                debug!("Ignoring duplicate answer from {} - already connected", full_peer_id.short());
992                return Ok(());
993            }
994            if let Some(ref mut peer) = entry.peer {
995                // Check WebRTC signaling state before applying answer
996                use webrtc::peer_connection::signaling_state::RTCSignalingState;
997                let signaling_state = peer.signaling_state();
998                if signaling_state != RTCSignalingState::HaveLocalOffer {
999                    debug!(
1000                        "Ignoring answer from {} - signaling state is {:?}, not HaveLocalOffer",
1001                        full_peer_id.short(),
1002                        signaling_state
1003                    );
1004                    return Ok(());
1005                }
1006                peer.handle_answer(answer).await?;
1007                info!("Applied answer from {}", full_peer_id.short());
1008            }
1009        }
1010
1011        Ok(())
1012    }
1013
1014    /// Handle incoming ICE candidate
1015    async fn handle_candidate(
1016        &self,
1017        sender_pubkey: &str,
1018        their_uuid: &str,
1019        candidate: serde_json::Value,
1020    ) -> Result<()> {
1021        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1022        let peer_key = full_peer_id.to_string();
1023
1024        info!("Received ICE candidate from {}", full_peer_id.short());
1025
1026        let mut peers = self.state.peers.write().await;
1027        if let Some(entry) = peers.get_mut(&peer_key) {
1028            if let Some(ref mut peer) = entry.peer {
1029                peer.handle_candidate(candidate).await?;
1030            }
1031        }
1032
1033        Ok(())
1034    }
1035
1036    /// Handle batched ICE candidates
1037    async fn handle_candidates(
1038        &self,
1039        sender_pubkey: &str,
1040        their_uuid: &str,
1041        candidates: Vec<serde_json::Value>,
1042    ) -> Result<()> {
1043        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1044        let peer_key = full_peer_id.to_string();
1045
1046        debug!(
1047            "Received {} candidates from {}",
1048            candidates.len(),
1049            full_peer_id.short()
1050        );
1051
1052        let mut peers = self.state.peers.write().await;
1053        if let Some(entry) = peers.get_mut(&peer_key) {
1054            if let Some(ref mut peer) = entry.peer {
1055                for candidate in candidates {
1056                    if let Err(e) = peer.handle_candidate(candidate).await {
1057                        debug!("Failed to add candidate: {}", e);
1058                    }
1059                }
1060            }
1061        }
1062
1063        Ok(())
1064    }
1065
1066    /// Handle peer state change events from peer connections
1067    async fn handle_peer_state_event(&self, event: PeerStateEvent) {
1068        match event {
1069            PeerStateEvent::Connected(peer_id) => {
1070                let peer_key = peer_id.to_string();
1071                let mut peers = self.state.peers.write().await;
1072                if let Some(entry) = peers.get_mut(&peer_key) {
1073                    if entry.state != ConnectionState::Connected {
1074                        info!("Peer {} connected (via state event)", peer_id.short());
1075                        entry.state = ConnectionState::Connected;
1076                        // Update connected count
1077                        self.state.connected_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1078                    }
1079                }
1080            }
1081            PeerStateEvent::Failed(peer_id) => {
1082                let peer_key = peer_id.to_string();
1083                info!("Peer {} connection failed - removing from pool", peer_id.short());
1084                let mut peers = self.state.peers.write().await;
1085                if let Some(entry) = peers.remove(&peer_key) {
1086                    // Decrement connected count if was connected
1087                    if entry.state == ConnectionState::Connected {
1088                        self.state.connected_count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
1089                    }
1090                    // Close the peer connection if it exists
1091                    if let Some(peer) = entry.peer {
1092                        let _ = peer.close().await;
1093                    }
1094                }
1095            }
1096            PeerStateEvent::Disconnected(peer_id) => {
1097                let peer_key = peer_id.to_string();
1098                info!("Peer {} disconnected - removing from pool", peer_id.short());
1099                let mut peers = self.state.peers.write().await;
1100                if let Some(entry) = peers.remove(&peer_key) {
1101                    // Decrement connected count if was connected
1102                    if entry.state == ConnectionState::Connected {
1103                        self.state.connected_count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
1104                    }
1105                    // Close the peer connection if it exists
1106                    if let Some(peer) = entry.peer {
1107                        let _ = peer.close().await;
1108                    }
1109                }
1110            }
1111        }
1112    }
1113
1114    /// Cleanup stale peers and sync connection states (fallback, runs every 30s)
1115    async fn cleanup_stale_peers(&self) {
1116        let mut peers = self.state.peers.write().await;
1117        let mut connected_count = 0;
1118        let mut to_remove = Vec::new();
1119        let stale_timeout = Duration::from_secs(60); // Remove peers stuck in Discovered/Connecting for 60s
1120
1121        for (key, entry) in peers.iter_mut() {
1122            if let Some(ref peer) = entry.peer {
1123                // Sync connected state as fallback (in case event was missed)
1124                if peer.is_connected() {
1125                    if entry.state != ConnectionState::Connected {
1126                        info!("Peer {} is now connected (sync fallback)", entry.peer_id.short());
1127                        entry.state = ConnectionState::Connected;
1128                    }
1129                    connected_count += 1;
1130                } else if entry.state == ConnectionState::Connecting && entry.last_seen.elapsed() > stale_timeout {
1131                    // Peer stuck in Connecting for too long - mark for removal
1132                    info!("Removing stale peer {} (stuck in Connecting for {:?})", entry.peer_id.short(), entry.last_seen.elapsed());
1133                    to_remove.push(key.clone());
1134                }
1135            } else if entry.state == ConnectionState::Discovered && entry.last_seen.elapsed() > stale_timeout {
1136                // Discovered peer with no actual connection - remove
1137                debug!("Removing stale discovered peer {}", entry.peer_id.short());
1138                to_remove.push(key.clone());
1139            }
1140        }
1141
1142        // Remove stale peers
1143        for key in to_remove {
1144            if let Some(entry) = peers.remove(&key) {
1145                if let Some(peer) = entry.peer {
1146                    let _ = peer.close().await;
1147                }
1148            }
1149        }
1150
1151        self.state
1152            .connected_count
1153            .store(connected_count, std::sync::atomic::Ordering::Relaxed);
1154    }
1155}
1156
1157// Keep the old PeerState for backward compatibility with tests
1158#[allow(dead_code)]
1159#[derive(Debug, Clone)]
1160pub struct PeerState {
1161    pub peer_id: PeerId,
1162    pub direction: PeerDirection,
1163    pub state: String,
1164    pub last_seen: Instant,
1165}