hashtree_cli/webrtc/
signaling.rs

1//! WebRTC signaling over Nostr relays
2//!
3//! Protocol (compatible with hashtree-ts):
4//! - All signaling uses ephemeral kind 25050
5//! - Hello messages: #l: "hello" tag, broadcast for peer discovery (unencrypted)
6//! - Directed signaling (offer, answer, candidate, candidates): NIP-17 style
7//!   gift wrap for privacy - wrapped with ephemeral key, #p tag with recipient
8//!
9//! Security: Directed messages use gift wrapping with ephemeral keys so that
10//! relays cannot see the actual sender or correlate messages.
11
12use anyhow::Result;
13use futures::{SinkExt, StreamExt};
14use nostr::{nips::nip44, ClientMessage, EventBuilder, Filter, JsonUtil, Keys, Kind, PublicKey, RelayMessage, Tag};
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use tokio::sync::{mpsc, Mutex, RwLock};
19use tokio_tungstenite::{connect_async, tungstenite::Message};
20use tracing::{debug, error, info, warn};
21
22use super::peer::{ContentStore, Peer, PendingRequest};
23use super::types::{
24    PeerDirection, PeerId, PeerPool, PeerStateEvent, PeerStatus, SignalingMessage, WebRTCConfig, WEBRTC_KIND, HELLO_TAG,
25};
26
27/// Callback type for classifying peers into pools
28pub type PeerClassifier = Arc<dyn Fn(&str) -> PeerPool + Send + Sync>;
29
30/// Connection state for a peer
31#[derive(Debug, Clone, PartialEq)]
32pub enum ConnectionState {
33    Discovered,
34    Connecting,
35    Connected,
36    Failed,
37}
38
39impl std::fmt::Display for ConnectionState {
40    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41        match self {
42            ConnectionState::Discovered => write!(f, "discovered"),
43            ConnectionState::Connecting => write!(f, "connecting"),
44            ConnectionState::Connected => write!(f, "connected"),
45            ConnectionState::Failed => write!(f, "failed"),
46        }
47    }
48}
49
50/// Peer entry in the manager
51pub struct PeerEntry {
52    pub peer_id: PeerId,
53    pub direction: PeerDirection,
54    pub state: ConnectionState,
55    pub last_seen: Instant,
56    pub peer: Option<Peer>,
57    pub pool: PeerPool,
58}
59
60/// Shared state for WebRTC manager
61pub struct WebRTCState {
62    pub peers: RwLock<HashMap<String, PeerEntry>>,
63    pub connected_count: std::sync::atomic::AtomicUsize,
64}
65
66impl WebRTCState {
67    pub fn new() -> Self {
68        Self {
69            peers: RwLock::new(HashMap::new()),
70            connected_count: std::sync::atomic::AtomicUsize::new(0),
71        }
72    }
73
74    /// Request content by hash from connected peers
75    /// Queries peers sequentially with 500ms intervals until one responds
76    /// Returns the first successful response, or None if no peer has it
77    pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
78        use super::types::{DataRequest, MAX_HTL, encode_request};
79
80        let peers = self.peers.read().await;
81
82        // Collect connected peers with data channels
83        // We need to collect the Arc references first, then acquire locks outside the iterator
84        let peer_refs: Vec<_> = peers
85            .values()
86            .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
87            .filter_map(|p| {
88                p.peer.as_ref().map(|peer| {
89                    (p.peer_id.short(), peer.data_channel.clone(), peer.pending_requests.clone())
90                })
91            })
92            .collect();
93
94        drop(peers); // Release the read lock
95
96        // Now acquire locks and filter to peers with active data channels
97        let mut connected_peers: Vec<(String, Arc<Mutex<HashMap<String, PendingRequest>>>, Arc<webrtc::data_channel::RTCDataChannel>)> = Vec::new();
98        for (peer_id, dc_mutex, pending) in peer_refs {
99            let dc_guard = dc_mutex.lock().await;
100            if let Some(dc) = dc_guard.as_ref() {
101                connected_peers.push((peer_id, pending, dc.clone()));
102            }
103        }
104
105        if connected_peers.is_empty() {
106            debug!("No connected peers to query for {}", &hash_hex[..8.min(hash_hex.len())]);
107            return None;
108        }
109
110        debug!(
111            "Querying {} connected peers for {} (sequential with 500ms delay)",
112            connected_peers.len(),
113            &hash_hex[..8.min(hash_hex.len())]
114        );
115
116        // Convert hex to binary hash once
117        let hash_bytes = match hex::decode(hash_hex) {
118            Ok(b) => b,
119            Err(_) => return None,
120        };
121
122        // Query peers sequentially with 500ms delay between each
123        for (_i, (peer_id, pending_requests, dc)) in connected_peers.into_iter().enumerate() {
124            debug!("Querying peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
125
126            // Create response channel
127            let (tx, rx) = tokio::sync::oneshot::channel();
128
129            // Store pending request
130            {
131                let mut pending = pending_requests.lock().await;
132                pending.insert(
133                    hash_hex.to_string(),
134                    super::PendingRequest {
135                        hash: hash_bytes.clone(),
136                        response_tx: tx,
137                    },
138                );
139            }
140
141            // Send request
142            let req = DataRequest {
143                h: hash_bytes.clone(),
144                htl: MAX_HTL,
145            };
146            if let Ok(wire) = encode_request(&req) {
147                if dc.send(&bytes::Bytes::from(wire)).await.is_ok() {
148                    // Wait 500ms for response from this peer
149                    match tokio::time::timeout(std::time::Duration::from_millis(500), rx).await {
150                        Ok(Ok(Some(data))) => {
151                            debug!("Got response from peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
152                            return Some(data);
153                        }
154                        _ => {
155                            // Timeout or no data - clean up and try next peer
156                            debug!("No response from peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
157                        }
158                    }
159                }
160            }
161
162            // Clean up pending request
163            let mut pending = pending_requests.lock().await;
164            pending.remove(hash_hex);
165        }
166
167        debug!("No peer had data for {}", &hash_hex[..8.min(hash_hex.len())]);
168        None
169    }
170}
171
172/// WebRTC manager handles peer discovery and connection management
173pub struct WebRTCManager {
174    config: WebRTCConfig,
175    my_peer_id: PeerId,
176    keys: Keys,
177    state: Arc<WebRTCState>,
178    shutdown: Arc<tokio::sync::watch::Sender<bool>>,
179    shutdown_rx: tokio::sync::watch::Receiver<bool>,
180    /// Channel to send signaling messages to relays
181    signaling_tx: mpsc::Sender<SignalingMessage>,
182    signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
183    /// Optional content store for serving hash requests
184    store: Option<Arc<dyn ContentStore>>,
185    /// Peer classifier for pool assignment
186    peer_classifier: PeerClassifier,
187    /// Channel for peer state events (connection success/failure)
188    state_event_tx: mpsc::Sender<PeerStateEvent>,
189    state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
190}
191
192impl WebRTCManager {
193    /// Create a new WebRTC manager
194    pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
195        let pubkey = keys.public_key().to_hex();
196        let my_peer_id = PeerId::new(pubkey, None);
197        let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
198        let (signaling_tx, signaling_rx) = mpsc::channel(100);
199        let (state_event_tx, state_event_rx) = mpsc::channel(100);
200
201        // Default classifier: all peers go to 'other' pool
202        let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
203
204        Self {
205            config,
206            my_peer_id,
207            keys,
208            state: Arc::new(WebRTCState::new()),
209            shutdown: Arc::new(shutdown),
210            shutdown_rx,
211            signaling_tx,
212            signaling_rx: Some(signaling_rx),
213            store: None,
214            peer_classifier,
215            state_event_tx,
216            state_event_rx: Some(state_event_rx),
217        }
218    }
219
220    /// Create a new WebRTC manager with a peer classifier
221    pub fn new_with_classifier(keys: Keys, config: WebRTCConfig, classifier: PeerClassifier) -> Self {
222        let mut manager = Self::new(keys, config);
223        manager.peer_classifier = classifier;
224        manager
225    }
226
227    /// Create a new WebRTC manager with a content store for serving hash requests
228    pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
229        let mut manager = Self::new(keys, config);
230        manager.store = Some(store);
231        manager
232    }
233
234    /// Create a new WebRTC manager with store and classifier
235    pub fn new_with_store_and_classifier(
236        keys: Keys,
237        config: WebRTCConfig,
238        store: Arc<dyn ContentStore>,
239        classifier: PeerClassifier,
240    ) -> Self {
241        let mut manager = Self::new(keys, config);
242        manager.store = Some(store);
243        manager.peer_classifier = classifier;
244        manager
245    }
246
247    /// Set the content store for serving hash requests
248    pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
249        self.store = Some(store);
250    }
251
252    /// Set the peer classifier
253    pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
254        self.peer_classifier = classifier;
255    }
256
257    /// Get my peer ID
258    pub fn my_peer_id(&self) -> &PeerId {
259        &self.my_peer_id
260    }
261
262    /// Get shared state for external access
263    pub fn state(&self) -> Arc<WebRTCState> {
264        self.state.clone()
265    }
266
267    /// Signal shutdown
268    pub fn shutdown(&self) {
269        let _ = self.shutdown.send(true);
270    }
271
272    /// Get connected peer count
273    pub async fn connected_count(&self) -> usize {
274        self.state
275            .connected_count
276            .load(std::sync::atomic::Ordering::Relaxed)
277    }
278
279    /// Get all peer statuses
280    pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
281        self.state
282            .peers
283            .read()
284            .await
285            .values()
286            .map(|p| PeerStatus {
287                peer_id: p.peer_id.to_string(),
288                pubkey: p.peer_id.pubkey.clone(),
289                state: p.state.to_string(),
290                direction: p.direction,
291                connected_at: Some(p.last_seen),
292                pool: p.pool,
293            })
294            .collect()
295    }
296
297    /// Get pool counts
298    /// Returns (follows_connected, follows_active, other_connected, other_active)
299    /// "active" = Connected or Connecting (excludes Discovered and Failed)
300    pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
301        let peers = self.state.peers.read().await;
302        let mut follows_connected = 0;
303        let mut follows_active = 0;
304        let mut other_connected = 0;
305        let mut other_active = 0;
306
307        for entry in peers.values() {
308            // Only count Connected or Connecting as "active" connections
309            // Discovered peers are just seen hellos, not real connections
310            let is_active = entry.state == ConnectionState::Connected
311                || entry.state == ConnectionState::Connecting;
312
313            match entry.pool {
314                PeerPool::Follows => {
315                    if is_active {
316                        follows_active += 1;
317                    }
318                    if entry.state == ConnectionState::Connected {
319                        follows_connected += 1;
320                    }
321                }
322                PeerPool::Other => {
323                    if is_active {
324                        other_active += 1;
325                    }
326                    if entry.state == ConnectionState::Connected {
327                        other_connected += 1;
328                    }
329                }
330            }
331        }
332
333        (follows_connected, follows_active, other_connected, other_active)
334    }
335
336    /// Check if we can accept a peer in a given pool
337    fn can_accept_peer(&self, pool: PeerPool, pool_counts: &(usize, usize, usize, usize)) -> bool {
338        let (_, follows_active, _, other_active) = *pool_counts;
339        match pool {
340            PeerPool::Follows => follows_active < self.config.pools.follows.max_connections,
341            PeerPool::Other => other_active < self.config.pools.other.max_connections,
342        }
343    }
344
345    /// Check if a pool is satisfied
346    #[allow(dead_code)]
347    fn is_pool_satisfied(&self, pool: PeerPool, pool_counts: &(usize, usize, usize, usize)) -> bool {
348        let (follows_connected, _, other_connected, _) = *pool_counts;
349        match pool {
350            PeerPool::Follows => follows_connected >= self.config.pools.follows.satisfied_connections,
351            PeerPool::Other => other_connected >= self.config.pools.other.satisfied_connections,
352        }
353    }
354
355    /// Check if both pools are satisfied
356    #[allow(dead_code)]
357    fn is_satisfied(&self, pool_counts: &(usize, usize, usize, usize)) -> bool {
358        self.is_pool_satisfied(PeerPool::Follows, pool_counts)
359            && self.is_pool_satisfied(PeerPool::Other, pool_counts)
360    }
361
362    /// Check if we should initiate connection (tie-breaking)
363    /// Lower UUID initiates - same as iris-client/hashtree-ts
364    fn should_initiate(&self, their_uuid: &str) -> bool {
365        self.my_peer_id.uuid < their_uuid.to_string()
366    }
367
368    /// Start the WebRTC manager - connects to relays and handles signaling
369    pub async fn run(&mut self) -> Result<()> {
370        info!(
371            "Starting WebRTC manager with peer ID: {}",
372            self.my_peer_id.short()
373        );
374
375        let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr::Event)>(100);
376
377        // Take the signaling receiver
378        let mut signaling_rx = self.signaling_rx.take().expect("signaling_rx already taken");
379
380        // Take the state event receiver
381        let mut state_event_rx = self.state_event_rx.take().expect("state_event_rx already taken");
382
383        // Create a shared write channel for all relay tasks
384        let (relay_write_tx, _) = tokio::sync::broadcast::channel::<SignalingMessage>(100);
385
386        // Spawn relay connections
387        for relay_url in &self.config.relays {
388            let url = relay_url.clone();
389            let event_tx = event_tx.clone();
390            let shutdown_rx = self.shutdown_rx.clone();
391            let keys = self.keys.clone();
392            let my_peer_id = self.my_peer_id.clone();
393            let hello_interval = Duration::from_millis(self.config.hello_interval_ms);
394            let relay_write_rx = relay_write_tx.subscribe();
395
396            tokio::spawn(async move {
397                if let Err(e) = Self::relay_task(
398                    url.clone(),
399                    event_tx,
400                    shutdown_rx,
401                    keys,
402                    my_peer_id,
403                    hello_interval,
404                    relay_write_rx,
405                )
406                .await
407                {
408                    error!("Relay {} error: {}", url, e);
409                }
410            });
411        }
412
413        // Process incoming events and outgoing signaling messages
414        let mut shutdown_rx = self.shutdown_rx.clone();
415        // Cleanup interval - run every 30 seconds as a fallback (not for real-time sync)
416        let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
417        loop {
418            tokio::select! {
419                _ = shutdown_rx.changed() => {
420                    if *shutdown_rx.borrow() {
421                        info!("WebRTC manager shutting down");
422                        break;
423                    }
424                }
425                Some((relay, event)) = event_rx.recv() => {
426                    if let Err(e) = self.handle_event(&relay, &event, &relay_write_tx).await {
427                        debug!("Error handling event from {}: {}", relay, e);
428                    }
429                }
430                Some(msg) = signaling_rx.recv() => {
431                    // Forward signaling messages to relay broadcast
432                    let _ = relay_write_tx.send(msg);
433                }
434                Some(event) = state_event_rx.recv() => {
435                    // Handle peer state events (connected, failed, disconnected)
436                    self.handle_peer_state_event(event).await;
437                }
438                _ = cleanup_interval.tick() => {
439                    // Periodic cleanup of stale peers and state sync (fallback)
440                    self.cleanup_stale_peers().await;
441                }
442            }
443        }
444
445        Ok(())
446    }
447
448    /// Connect to a single relay and handle messages
449    async fn relay_task(
450        url: String,
451        event_tx: mpsc::Sender<(String, nostr::Event)>,
452        mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
453        keys: Keys,
454        my_peer_id: PeerId,
455        hello_interval: Duration,
456        mut signaling_rx: tokio::sync::broadcast::Receiver<SignalingMessage>,
457    ) -> Result<()> {
458        info!("Connecting to relay: {}", url);
459
460        let (ws_stream, _) = connect_async(&url).await?;
461        let (mut write, mut read) = ws_stream.split();
462
463        // Subscribe to webrtc events - two filters:
464        // 1. Hello messages: kind 25050 with #l: "hello" tag
465        // 2. Directed messages: kind 25050 with #p tag (our pubkey)
466        let hello_filter = Filter::new()
467            .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
468            .custom_tag(
469                nostr::SingleLetterTag::lowercase(nostr::Alphabet::L),
470                vec![HELLO_TAG],
471            )
472            .since(nostr::Timestamp::now() - Duration::from_secs(60));
473
474        let directed_filter = Filter::new()
475            .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
476            .custom_tag(
477                nostr::SingleLetterTag::lowercase(nostr::Alphabet::P),
478                vec![keys.public_key().to_hex()],
479            )
480            .since(nostr::Timestamp::now() - Duration::from_secs(60));
481
482        let sub_id = nostr::SubscriptionId::generate();
483        let sub_msg = ClientMessage::req(sub_id.clone(), vec![hello_filter, directed_filter]);
484        write.send(Message::Text(sub_msg.as_json().into())).await?;
485
486        info!("Subscribed to {} for WebRTC events (kind {})", url, WEBRTC_KIND);
487
488        let mut last_hello = Instant::now() - hello_interval; // Send immediately
489        let mut hello_ticker = tokio::time::interval(Duration::from_secs(1));
490
491        loop {
492            tokio::select! {
493                _ = shutdown_rx.changed() => {
494                    if *shutdown_rx.borrow() {
495                        break;
496                    }
497                }
498                _ = hello_ticker.tick() => {
499                    // Send hello periodically
500                    if last_hello.elapsed() >= hello_interval {
501                        let hello = SignalingMessage::hello(&my_peer_id.uuid);
502                        if let Ok(event) = Self::create_signaling_event(&keys, &hello).await {
503                            let msg = ClientMessage::event(event);
504                            if write.send(Message::Text(msg.as_json().into())).await.is_ok() {
505                                debug!("Sent hello to {}", url);
506                            }
507                        }
508                        last_hello = Instant::now();
509                    }
510                }
511                // Handle outgoing signaling messages
512                Ok(signaling_msg) = signaling_rx.recv() => {
513                    info!("Sending {} via {}", signaling_msg.msg_type(), url);
514                    if let Ok(event) = Self::create_signaling_event(&keys, &signaling_msg).await {
515                        let event_id = event.id.to_string();
516                        let msg = ClientMessage::event(event);
517                        if write.send(Message::Text(msg.as_json().into())).await.is_ok() {
518                            info!("Sent {} to {} (event id: {})", signaling_msg.msg_type(), url, &event_id[..16]);
519                        }
520                    }
521                }
522                msg = read.next() => {
523                    match msg {
524                        Some(Ok(Message::Text(text))) => {
525                            if let Ok(relay_msg) = RelayMessage::from_json(&text) {
526                                if let RelayMessage::Event { event, .. } = relay_msg {
527                                    let _ = event_tx.send((url.clone(), *event)).await;
528                                }
529                            }
530                        }
531                        Some(Err(e)) => {
532                            error!("WebSocket error from {}: {}", url, e);
533                            break;
534                        }
535                        None => {
536                            warn!("WebSocket closed: {}", url);
537                            break;
538                        }
539                        _ => {}
540                    }
541                }
542            }
543        }
544
545        Ok(())
546    }
547
548    /// Create a signaling event
549    ///
550    /// For directed messages (offer, answer, candidate, candidates), use NIP-17 style
551    /// gift wrapping with ephemeral keys for privacy.
552    /// Hello messages use kind 25050 with #l: "hello" tag and peerId.
553    async fn create_signaling_event(keys: &Keys, msg: &SignalingMessage) -> Result<nostr::Event> {
554        // Check if message has a recipient (needs gift wrapping)
555        if let Some(recipient_str) = msg.recipient() {
556            // Parse recipient to get their pubkey
557            if let Some(peer_id) = PeerId::from_string(recipient_str) {
558                let recipient_pubkey = PublicKey::from_hex(&peer_id.pubkey)?;
559
560                // Create seal with sender's actual pubkey (the "rumor")
561                let seal = serde_json::json!({
562                    "pubkey": keys.public_key().to_hex(),
563                    "kind": WEBRTC_KIND,
564                    "content": serde_json::to_string(msg)?,
565                    "tags": []
566                });
567
568                // Generate ephemeral keypair for the wrapper
569                let ephemeral_keys = Keys::generate();
570
571                // Encrypt the seal for the recipient using ephemeral key (NIP-44)
572                let encrypted_content = nip44::encrypt(
573                    ephemeral_keys.secret_key(),
574                    &recipient_pubkey,
575                    &seal.to_string(),
576                    nip44::Version::V2
577                )?;
578
579                // Create wrapper event with ephemeral key
580                let created_at = nostr::Timestamp::now();
581                let expiration = created_at + Duration::from_secs(5 * 60); // 5 minutes
582
583                let tags = vec![
584                    Tag::parse(["p", &recipient_pubkey.to_hex()])?,
585                    Tag::parse(["expiration", &expiration.as_u64().to_string()])?,
586                ];
587
588                let event = EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), encrypted_content)
589                    .tags(tags)
590                    .sign(&ephemeral_keys)
591                    .await?;
592
593                return Ok(event);
594            }
595        }
596
597        // Hello messages - kind 25050 with #l: "hello" tag and peerId
598        let tags = vec![
599            Tag::parse(["l", HELLO_TAG])?,
600            Tag::parse(["peerId", msg.peer_id()])?,
601        ];
602
603        let event = EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), "")
604            .tags(tags)
605            .sign(keys)
606            .await?;
607
608        Ok(event)
609    }
610
611    /// Handle an incoming event
612    ///
613    /// Messages may be:
614    /// 1. Hello messages: kind 25050 with #l: "hello" tag and peerId
615    /// 2. Gift-wrapped directed messages: kind 25050 with #p tag, encrypted with ephemeral key
616    async fn handle_event(
617        &self,
618        relay: &str,
619        event: &nostr::Event,
620        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
621    ) -> Result<()> {
622        // Must be kind 25050
623        if event.kind != Kind::Ephemeral(WEBRTC_KIND as u16) {
624            return Ok(());
625        }
626
627        // Helper to get tag value
628        let get_tag = |name: &str| -> Option<String> {
629            event.tags.iter().find_map(|tag| {
630                let v: Vec<String> = tag.clone().to_vec();
631                if v.len() >= 2 && v[0] == name {
632                    Some(v[1].clone())
633                } else {
634                    None
635                }
636            })
637        };
638
639        // Check if this is a hello message (#l: "hello" tag)
640        let l_tag = get_tag("l");
641        if l_tag.as_deref() == Some(HELLO_TAG) {
642            let sender_pubkey = event.pubkey.to_hex();
643
644            // Skip our own hello messages
645            if sender_pubkey == self.my_peer_id.pubkey {
646                return Ok(());
647            }
648
649            if let Some(their_uuid) = get_tag("peerId") {
650                debug!("Received hello from {} via {}", &sender_pubkey[..8], relay);
651                self.handle_hello(&sender_pubkey, &their_uuid, relay_write_tx)
652                    .await?;
653            }
654            return Ok(());
655        }
656
657        // Check if this is a directed message for us (#p tag with our pubkey)
658        let p_tag = get_tag("p");
659        if p_tag.as_deref() != Some(&self.keys.public_key().to_hex()) {
660            // Not for us - ignore silently
661            return Ok(());
662        }
663
664        // Gift-wrapped directed message - decrypt using our key and ephemeral sender's pubkey
665        if event.content.is_empty() {
666            return Ok(());
667        }
668
669        // Try to unwrap the gift - decrypt with our key and the ephemeral sender's pubkey
670        let seal: serde_json::Value = match nip44::decrypt(self.keys.secret_key(), &event.pubkey, &event.content) {
671            Ok(plaintext) => {
672                match serde_json::from_str(&plaintext) {
673                    Ok(v) => v,
674                    Err(_) => return Ok(()),
675                }
676            }
677            Err(_) => {
678                // Can't decrypt - not for us or invalid
679                return Ok(());
680            }
681        };
682
683        // Extract the actual sender's pubkey and content from the seal
684        let sender_pubkey = seal.get("pubkey")
685            .and_then(|v| v.as_str())
686            .ok_or_else(|| anyhow::anyhow!("Missing pubkey in seal"))?;
687
688        // Skip our own messages
689        if sender_pubkey == self.my_peer_id.pubkey {
690            return Ok(());
691        }
692
693        let content = seal.get("content")
694            .and_then(|v| v.as_str())
695            .ok_or_else(|| anyhow::anyhow!("Missing content in seal"))?;
696
697        let msg: SignalingMessage = serde_json::from_str(content)?;
698
699        debug!(
700            "Received {} from {} via {} (gift-wrapped)",
701            msg.msg_type(),
702            &sender_pubkey[..8],
703            relay
704        );
705
706        match msg {
707            SignalingMessage::Hello { .. } => {
708                // Hello messages should come via tags, not gift wrap
709                return Ok(());
710            }
711            SignalingMessage::Offer {
712                recipient,
713                peer_id: their_uuid,
714                offer,
715            } => {
716                if recipient != self.my_peer_id.to_string() {
717                    return Ok(()); // Not for us
718                }
719                self.handle_offer(&sender_pubkey, &their_uuid, offer, relay_write_tx)
720                    .await?;
721            }
722            SignalingMessage::Answer {
723                recipient,
724                peer_id: their_uuid,
725                answer,
726            } => {
727                if recipient != self.my_peer_id.to_string() {
728                    return Ok(());
729                }
730                self.handle_answer(&sender_pubkey, &their_uuid, answer)
731                    .await?;
732            }
733            SignalingMessage::Candidate {
734                recipient,
735                peer_id: their_uuid,
736                candidate,
737            } => {
738                if recipient != self.my_peer_id.to_string() {
739                    return Ok(());
740                }
741                self.handle_candidate(&sender_pubkey, &their_uuid, candidate)
742                    .await?;
743            }
744            SignalingMessage::Candidates {
745                recipient,
746                peer_id: their_uuid,
747                candidates,
748            } => {
749                if recipient != self.my_peer_id.to_string() {
750                    return Ok(());
751                }
752                self.handle_candidates(&sender_pubkey, &their_uuid, candidates)
753                    .await?;
754            }
755        }
756
757        Ok(())
758    }
759
760    /// Handle incoming hello message
761    async fn handle_hello(
762        &self,
763        sender_pubkey: &str,
764        their_uuid: &str,
765        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
766    ) -> Result<()> {
767        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
768        let peer_key = full_peer_id.to_string();
769
770        // Check if we already have this peer
771        {
772            let peers = self.state.peers.read().await;
773            if let Some(entry) = peers.get(&peer_key) {
774                // Already connected or connecting, just update last_seen
775                if entry.state == ConnectionState::Connected
776                    || entry.state == ConnectionState::Connecting
777                {
778                    return Ok(());
779                }
780            }
781        }
782
783        // Classify the peer into a pool
784        let pool = (self.peer_classifier)(sender_pubkey);
785
786        // Check pool limits
787        let pool_counts = self.get_pool_counts().await;
788        if !self.can_accept_peer(pool, &pool_counts) {
789            debug!("Ignoring hello from {} - pool {:?} is full", full_peer_id.short(), pool);
790            return Ok(());
791        }
792
793        // Decide if we should initiate based on tie-breaking
794        let should_initiate = self.should_initiate(their_uuid);
795
796        // If pool is already satisfied, don't initiate new outbound connections
797        // This reserves space for inbound connections
798        let pool_satisfied = self.is_pool_satisfied(pool, &pool_counts);
799        let will_initiate = should_initiate && !pool_satisfied;
800
801        info!(
802            "Discovered peer: {} (pool: {:?}, initiate: {}, pool_satisfied: {})",
803            full_peer_id.short(),
804            pool,
805            will_initiate,
806            pool_satisfied
807        );
808
809        // If we're not initiating and pool is satisfied, don't even add to discovered
810        // (we won't accept their offer either since pool check happens in handle_offer)
811        if !will_initiate && pool_satisfied {
812            debug!("Pool {:?} is satisfied, not tracking peer {}", pool, full_peer_id.short());
813            return Ok(());
814        }
815
816        // Create peer entry with pool assignment
817        {
818            let mut peers = self.state.peers.write().await;
819            peers.insert(
820                peer_key.clone(),
821                PeerEntry {
822                    peer_id: full_peer_id.clone(),
823                    direction: if will_initiate {
824                        PeerDirection::Outbound
825                    } else {
826                        PeerDirection::Inbound
827                    },
828                    state: ConnectionState::Discovered,
829                    last_seen: Instant::now(),
830                    peer: None,
831                    pool,
832                },
833            );
834        }
835
836        // If we should initiate, create offer
837        if will_initiate {
838            self.initiate_connection(&full_peer_id, pool, relay_write_tx)
839                .await?;
840        }
841
842        Ok(())
843    }
844
845    /// Initiate a connection to a peer (create and send offer)
846    async fn initiate_connection(
847        &self,
848        peer_id: &PeerId,
849        pool: PeerPool,
850        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
851    ) -> Result<()> {
852        let peer_key = peer_id.to_string();
853
854        info!("Initiating connection to {} (pool: {:?})", peer_id.short(), pool);
855
856        // Create peer connection with content store and state events
857        let mut peer = Peer::new_with_store_and_events(
858            peer_id.clone(),
859            PeerDirection::Outbound,
860            self.my_peer_id.clone(),
861            self.signaling_tx.clone(),
862            self.config.stun_servers.clone(),
863            self.store.clone(),
864            Some(self.state_event_tx.clone()),
865        )
866        .await?;
867
868        peer.setup_handlers().await?;
869
870        // Create offer
871        let offer = peer.connect().await?;
872
873        // Update state
874        {
875            let mut peers = self.state.peers.write().await;
876            if let Some(entry) = peers.get_mut(&peer_key) {
877                entry.state = ConnectionState::Connecting;
878                entry.peer = Some(peer);
879                entry.pool = pool;
880            }
881        }
882
883        // Send offer
884        let offer_msg = SignalingMessage::Offer {
885            offer,
886            recipient: peer_id.to_string(),
887            peer_id: self.my_peer_id.uuid.clone(),
888        };
889        if relay_write_tx.send(offer_msg).is_err() {
890            warn!("Failed to broadcast offer to {}", peer_id.short());
891        }
892
893        info!("Sent offer to {}", peer_id.short());
894
895        Ok(())
896    }
897
898    /// Handle incoming offer
899    async fn handle_offer(
900        &self,
901        sender_pubkey: &str,
902        their_uuid: &str,
903        offer: serde_json::Value,
904        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
905    ) -> Result<()> {
906        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
907        let peer_key = full_peer_id.to_string();
908
909        // Classify the peer into a pool
910        let pool = (self.peer_classifier)(sender_pubkey);
911
912        info!("Received offer from {} (pool: {:?})", full_peer_id.short(), pool);
913
914        // Check if we already have this peer with an actual connection
915        {
916            let peers = self.state.peers.read().await;
917            if let Some(entry) = peers.get(&peer_key) {
918                // Only skip if we have an actual peer connection (not just discovered)
919                if entry.peer.is_some() {
920                    debug!("Already have peer {} with connection, skipping offer", full_peer_id.short());
921                    return Ok(());
922                }
923            }
924        }
925
926        // Check pool limits
927        let pool_counts = self.get_pool_counts().await;
928        if !self.can_accept_peer(pool, &pool_counts) {
929            warn!("Rejecting offer from {} - pool {:?} is full", full_peer_id.short(), pool);
930            return Ok(());
931        }
932        // Create peer connection with content store and state events
933        let mut peer = Peer::new_with_store_and_events(
934            full_peer_id.clone(),
935            PeerDirection::Inbound,
936            self.my_peer_id.clone(),
937            self.signaling_tx.clone(),
938            self.config.stun_servers.clone(),
939            self.store.clone(),
940            Some(self.state_event_tx.clone()),
941        )
942        .await?;
943
944        peer.setup_handlers().await?;
945
946        // Handle offer and create answer
947        let answer = peer.handle_offer(offer).await?;
948
949        // Update state
950        {
951            let mut peers = self.state.peers.write().await;
952            peers.insert(
953                peer_key,
954                PeerEntry {
955                    peer_id: full_peer_id.clone(),
956                    direction: PeerDirection::Inbound,
957                    state: ConnectionState::Connecting,
958                    last_seen: Instant::now(),
959                    peer: Some(peer),
960                    pool,
961                },
962            );
963        }
964
965        // Send answer
966        let answer_msg = SignalingMessage::Answer {
967            answer,
968            recipient: full_peer_id.to_string(),
969            peer_id: self.my_peer_id.to_string(),
970        };
971        if relay_write_tx.send(answer_msg).is_err() {
972            warn!("Failed to send answer to {}", full_peer_id.short());
973        }
974        info!("Sent answer to {}", full_peer_id.short());
975
976        Ok(())
977    }
978
979    /// Handle incoming answer
980    async fn handle_answer(
981        &self,
982        sender_pubkey: &str,
983        their_uuid: &str,
984        answer: serde_json::Value,
985    ) -> Result<()> {
986        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
987        let peer_key = full_peer_id.to_string();
988
989        info!("Received answer from {}", full_peer_id.short());
990
991        let mut peers = self.state.peers.write().await;
992        if let Some(entry) = peers.get_mut(&peer_key) {
993            // Skip if already connected - duplicate answers from multiple relays
994            if entry.state == ConnectionState::Connected {
995                debug!("Ignoring duplicate answer from {} - already connected", full_peer_id.short());
996                return Ok(());
997            }
998            if let Some(ref mut peer) = entry.peer {
999                // Check WebRTC signaling state before applying answer
1000                use webrtc::peer_connection::signaling_state::RTCSignalingState;
1001                let signaling_state = peer.signaling_state();
1002                if signaling_state != RTCSignalingState::HaveLocalOffer {
1003                    debug!(
1004                        "Ignoring answer from {} - signaling state is {:?}, not HaveLocalOffer",
1005                        full_peer_id.short(),
1006                        signaling_state
1007                    );
1008                    return Ok(());
1009                }
1010                peer.handle_answer(answer).await?;
1011                info!("Applied answer from {}", full_peer_id.short());
1012            }
1013        }
1014
1015        Ok(())
1016    }
1017
1018    /// Handle incoming ICE candidate
1019    async fn handle_candidate(
1020        &self,
1021        sender_pubkey: &str,
1022        their_uuid: &str,
1023        candidate: serde_json::Value,
1024    ) -> Result<()> {
1025        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1026        let peer_key = full_peer_id.to_string();
1027
1028        info!("Received ICE candidate from {}", full_peer_id.short());
1029
1030        let mut peers = self.state.peers.write().await;
1031        if let Some(entry) = peers.get_mut(&peer_key) {
1032            if let Some(ref mut peer) = entry.peer {
1033                peer.handle_candidate(candidate).await?;
1034            }
1035        }
1036
1037        Ok(())
1038    }
1039
1040    /// Handle batched ICE candidates
1041    async fn handle_candidates(
1042        &self,
1043        sender_pubkey: &str,
1044        their_uuid: &str,
1045        candidates: Vec<serde_json::Value>,
1046    ) -> Result<()> {
1047        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1048        let peer_key = full_peer_id.to_string();
1049
1050        debug!(
1051            "Received {} candidates from {}",
1052            candidates.len(),
1053            full_peer_id.short()
1054        );
1055
1056        let mut peers = self.state.peers.write().await;
1057        if let Some(entry) = peers.get_mut(&peer_key) {
1058            if let Some(ref mut peer) = entry.peer {
1059                for candidate in candidates {
1060                    if let Err(e) = peer.handle_candidate(candidate).await {
1061                        debug!("Failed to add candidate: {}", e);
1062                    }
1063                }
1064            }
1065        }
1066
1067        Ok(())
1068    }
1069
1070    /// Handle peer state change events from peer connections
1071    async fn handle_peer_state_event(&self, event: PeerStateEvent) {
1072        match event {
1073            PeerStateEvent::Connected(peer_id) => {
1074                let peer_key = peer_id.to_string();
1075                let mut peers = self.state.peers.write().await;
1076                if let Some(entry) = peers.get_mut(&peer_key) {
1077                    if entry.state != ConnectionState::Connected {
1078                        info!("Peer {} connected (via state event)", peer_id.short());
1079                        entry.state = ConnectionState::Connected;
1080                        // Update connected count
1081                        self.state.connected_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1082                    }
1083                }
1084            }
1085            PeerStateEvent::Failed(peer_id) => {
1086                let peer_key = peer_id.to_string();
1087                info!("Peer {} connection failed - removing from pool", peer_id.short());
1088                let mut peers = self.state.peers.write().await;
1089                if let Some(entry) = peers.remove(&peer_key) {
1090                    // Decrement connected count if was connected
1091                    if entry.state == ConnectionState::Connected {
1092                        self.state.connected_count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
1093                    }
1094                    // Close the peer connection if it exists
1095                    if let Some(peer) = entry.peer {
1096                        let _ = peer.close().await;
1097                    }
1098                }
1099            }
1100            PeerStateEvent::Disconnected(peer_id) => {
1101                let peer_key = peer_id.to_string();
1102                info!("Peer {} disconnected - removing from pool", peer_id.short());
1103                let mut peers = self.state.peers.write().await;
1104                if let Some(entry) = peers.remove(&peer_key) {
1105                    // Decrement connected count if was connected
1106                    if entry.state == ConnectionState::Connected {
1107                        self.state.connected_count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
1108                    }
1109                    // Close the peer connection if it exists
1110                    if let Some(peer) = entry.peer {
1111                        let _ = peer.close().await;
1112                    }
1113                }
1114            }
1115        }
1116    }
1117
1118    /// Cleanup stale peers and sync connection states (fallback, runs every 30s)
1119    async fn cleanup_stale_peers(&self) {
1120        let mut peers = self.state.peers.write().await;
1121        let mut connected_count = 0;
1122        let mut to_remove = Vec::new();
1123        let stale_timeout = Duration::from_secs(60); // Remove peers stuck in Discovered/Connecting for 60s
1124
1125        for (key, entry) in peers.iter_mut() {
1126            if let Some(ref peer) = entry.peer {
1127                // Sync connected state as fallback (in case event was missed)
1128                if peer.is_connected() {
1129                    if entry.state != ConnectionState::Connected {
1130                        info!("Peer {} is now connected (sync fallback)", entry.peer_id.short());
1131                        entry.state = ConnectionState::Connected;
1132                    }
1133                    connected_count += 1;
1134                } else if entry.state == ConnectionState::Connecting && entry.last_seen.elapsed() > stale_timeout {
1135                    // Peer stuck in Connecting for too long - mark for removal
1136                    info!("Removing stale peer {} (stuck in Connecting for {:?})", entry.peer_id.short(), entry.last_seen.elapsed());
1137                    to_remove.push(key.clone());
1138                }
1139            } else if entry.state == ConnectionState::Discovered && entry.last_seen.elapsed() > stale_timeout {
1140                // Discovered peer with no actual connection - remove
1141                debug!("Removing stale discovered peer {}", entry.peer_id.short());
1142                to_remove.push(key.clone());
1143            }
1144        }
1145
1146        // Remove stale peers
1147        for key in to_remove {
1148            if let Some(entry) = peers.remove(&key) {
1149                if let Some(peer) = entry.peer {
1150                    let _ = peer.close().await;
1151                }
1152            }
1153        }
1154
1155        self.state
1156            .connected_count
1157            .store(connected_count, std::sync::atomic::Ordering::Relaxed);
1158    }
1159}
1160
1161// Keep the old PeerState for backward compatibility with tests
1162#[allow(dead_code)]
1163#[derive(Debug, Clone)]
1164pub struct PeerState {
1165    pub peer_id: PeerId,
1166    pub direction: PeerDirection,
1167    pub state: String,
1168    pub last_seen: Instant,
1169}