hashtree_cli/webrtc/
signaling.rs

1//! WebRTC signaling over Nostr relays
2//!
3//! Protocol (compatible with hashtree-ts):
4//! - All signaling uses ephemeral kind 25050
5//! - Hello messages: #l: "hello" tag, broadcast for peer discovery (unencrypted)
6//! - Directed signaling (offer, answer, candidate, candidates): NIP-17 style
7//!   gift wrap for privacy - wrapped with ephemeral key, #p tag with recipient
8//!
9//! Security: Directed messages use gift wrapping with ephemeral keys so that
10//! relays cannot see the actual sender or correlate messages.
11
12use anyhow::Result;
13use futures::{SinkExt, StreamExt};
14use nostr::{nips::nip44, ClientMessage, EventBuilder, Filter, JsonUtil, Keys, Kind, PublicKey, RelayMessage, Tag};
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use tokio::sync::{mpsc, Mutex, RwLock};
19use tokio_tungstenite::{connect_async, tungstenite::Message};
20use tracing::{debug, error, info, warn};
21
22use super::peer::{ContentStore, Peer, PendingRequest};
23use super::types::{
24    PeerDirection, PeerId, PeerPool, PeerStateEvent, PeerStatus, SignalingMessage, WebRTCConfig, WEBRTC_KIND, HELLO_TAG,
25};
26
27/// Callback type for classifying peers into pools
28pub type PeerClassifier = Arc<dyn Fn(&str) -> PeerPool + Send + Sync>;
29
30/// Connection state for a peer
31#[derive(Debug, Clone, PartialEq)]
32pub enum ConnectionState {
33    Discovered,
34    Connecting,
35    Connected,
36    Failed,
37}
38
39impl std::fmt::Display for ConnectionState {
40    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41        match self {
42            ConnectionState::Discovered => write!(f, "discovered"),
43            ConnectionState::Connecting => write!(f, "connecting"),
44            ConnectionState::Connected => write!(f, "connected"),
45            ConnectionState::Failed => write!(f, "failed"),
46        }
47    }
48}
49
50/// Peer entry in the manager
51pub struct PeerEntry {
52    pub peer_id: PeerId,
53    pub direction: PeerDirection,
54    pub state: ConnectionState,
55    pub last_seen: Instant,
56    pub peer: Option<Peer>,
57    pub pool: PeerPool,
58    pub bytes_sent: u64,
59    pub bytes_received: u64,
60}
61
62/// Shared state for WebRTC manager
63pub struct WebRTCState {
64    pub peers: RwLock<HashMap<String, PeerEntry>>,
65    pub connected_count: std::sync::atomic::AtomicUsize,
66    /// Total bytes sent across all peers (cumulative)
67    pub bytes_sent: std::sync::atomic::AtomicU64,
68    /// Total bytes received across all peers (cumulative)
69    pub bytes_received: std::sync::atomic::AtomicU64,
70}
71
72impl WebRTCState {
73    pub fn new() -> Self {
74        Self {
75            peers: RwLock::new(HashMap::new()),
76            connected_count: std::sync::atomic::AtomicUsize::new(0),
77            bytes_sent: std::sync::atomic::AtomicU64::new(0),
78            bytes_received: std::sync::atomic::AtomicU64::new(0),
79        }
80    }
81
82    /// Get current bandwidth stats (bytes sent/received)
83    pub fn get_bandwidth(&self) -> (u64, u64) {
84        (
85            self.bytes_sent.load(std::sync::atomic::Ordering::Relaxed),
86            self.bytes_received.load(std::sync::atomic::Ordering::Relaxed),
87        )
88    }
89
90    /// Record bytes sent (global + per-peer)
91    pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
92        self.bytes_sent.fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
93        if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
94            entry.bytes_sent += bytes;
95        }
96    }
97
98    /// Record bytes received (global + per-peer)
99    pub async fn record_received(&self, peer_id: &str, bytes: u64) {
100        self.bytes_received.fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
101        if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
102            entry.bytes_received += bytes;
103        }
104    }
105
106    /// Request content by hash from connected peers
107    /// Queries peers sequentially with 500ms intervals until one responds
108    /// Returns the first successful response, or None if no peer has it
109    pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
110        use super::types::{DataRequest, MAX_HTL, encode_request};
111
112        let peers = self.peers.read().await;
113
114        // Collect connected peers with data channels
115        // We need to collect the Arc references first, then acquire locks outside the iterator
116        let peer_refs: Vec<_> = peers
117            .values()
118            .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
119            .filter_map(|p| {
120                p.peer.as_ref().map(|peer| {
121                    (p.peer_id.short(), peer.data_channel.clone(), peer.pending_requests.clone())
122                })
123            })
124            .collect();
125
126        drop(peers); // Release the read lock
127
128        // Now acquire locks and filter to peers with active data channels
129        let mut connected_peers: Vec<(String, Arc<Mutex<HashMap<String, PendingRequest>>>, Arc<webrtc::data_channel::RTCDataChannel>)> = Vec::new();
130        for (peer_id, dc_mutex, pending) in peer_refs {
131            let dc_guard = dc_mutex.lock().await;
132            if let Some(dc) = dc_guard.as_ref() {
133                connected_peers.push((peer_id, pending, dc.clone()));
134            }
135        }
136
137        if connected_peers.is_empty() {
138            debug!("No connected peers to query for {}", &hash_hex[..8.min(hash_hex.len())]);
139            return None;
140        }
141
142        debug!(
143            "Querying {} connected peers for {} (sequential with 500ms delay)",
144            connected_peers.len(),
145            &hash_hex[..8.min(hash_hex.len())]
146        );
147
148        // Convert hex to binary hash once
149        let hash_bytes = match hex::decode(hash_hex) {
150            Ok(b) => b,
151            Err(_) => return None,
152        };
153
154        // Query peers sequentially with 500ms delay between each
155        for (_i, (peer_id, pending_requests, dc)) in connected_peers.into_iter().enumerate() {
156            debug!("Querying peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
157
158            // Create response channel
159            let (tx, rx) = tokio::sync::oneshot::channel();
160
161            // Store pending request
162            {
163                let mut pending = pending_requests.lock().await;
164                pending.insert(
165                    hash_hex.to_string(),
166                    super::PendingRequest {
167                        hash: hash_bytes.clone(),
168                        response_tx: tx,
169                    },
170                );
171            }
172
173            // Send request
174            let req = DataRequest {
175                h: hash_bytes.clone(),
176                htl: MAX_HTL,
177            };
178            if let Ok(wire) = encode_request(&req) {
179                let wire_len = wire.len() as u64;
180                if dc.send(&bytes::Bytes::from(wire)).await.is_ok() {
181                    self.record_sent(&peer_id, wire_len).await;
182                    // Wait 500ms for response from this peer
183                    match tokio::time::timeout(std::time::Duration::from_millis(500), rx).await {
184                        Ok(Ok(Some(data))) => {
185                            self.record_received(&peer_id, data.len() as u64).await;
186                            debug!("Got response from peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
187                            return Some(data);
188                        }
189                        _ => {
190                            // Timeout or no data - clean up and try next peer
191                            debug!("No response from peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
192                        }
193                    }
194                }
195            }
196
197            // Clean up pending request
198            let mut pending = pending_requests.lock().await;
199            pending.remove(hash_hex);
200        }
201
202        debug!("No peer had data for {}", &hash_hex[..8.min(hash_hex.len())]);
203        None
204    }
205}
206
207/// WebRTC manager handles peer discovery and connection management
208pub struct WebRTCManager {
209    config: WebRTCConfig,
210    my_peer_id: PeerId,
211    keys: Keys,
212    state: Arc<WebRTCState>,
213    shutdown: Arc<tokio::sync::watch::Sender<bool>>,
214    shutdown_rx: tokio::sync::watch::Receiver<bool>,
215    /// Channel to send signaling messages to relays
216    signaling_tx: mpsc::Sender<SignalingMessage>,
217    signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
218    /// Optional content store for serving hash requests
219    store: Option<Arc<dyn ContentStore>>,
220    /// Peer classifier for pool assignment
221    peer_classifier: PeerClassifier,
222    /// Channel for peer state events (connection success/failure)
223    state_event_tx: mpsc::Sender<PeerStateEvent>,
224    state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
225}
226
227impl WebRTCManager {
228    /// Create a new WebRTC manager
229    pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
230        let pubkey = keys.public_key().to_hex();
231        let my_peer_id = PeerId::new(pubkey, None);
232        let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
233        let (signaling_tx, signaling_rx) = mpsc::channel(100);
234        let (state_event_tx, state_event_rx) = mpsc::channel(100);
235
236        // Default classifier: all peers go to 'other' pool
237        let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
238
239        Self {
240            config,
241            my_peer_id,
242            keys,
243            state: Arc::new(WebRTCState::new()),
244            shutdown: Arc::new(shutdown),
245            shutdown_rx,
246            signaling_tx,
247            signaling_rx: Some(signaling_rx),
248            store: None,
249            peer_classifier,
250            state_event_tx,
251            state_event_rx: Some(state_event_rx),
252        }
253    }
254
255    /// Create a new WebRTC manager with a peer classifier
256    pub fn new_with_classifier(keys: Keys, config: WebRTCConfig, classifier: PeerClassifier) -> Self {
257        let mut manager = Self::new(keys, config);
258        manager.peer_classifier = classifier;
259        manager
260    }
261
262    /// Create a new WebRTC manager with a content store for serving hash requests
263    pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
264        let mut manager = Self::new(keys, config);
265        manager.store = Some(store);
266        manager
267    }
268
269    /// Create a new WebRTC manager with store and classifier
270    pub fn new_with_store_and_classifier(
271        keys: Keys,
272        config: WebRTCConfig,
273        store: Arc<dyn ContentStore>,
274        classifier: PeerClassifier,
275    ) -> Self {
276        let mut manager = Self::new(keys, config);
277        manager.store = Some(store);
278        manager.peer_classifier = classifier;
279        manager
280    }
281
282    /// Set the content store for serving hash requests
283    pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
284        self.store = Some(store);
285    }
286
287    /// Set the peer classifier
288    pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
289        self.peer_classifier = classifier;
290    }
291
292    /// Get my peer ID
293    pub fn my_peer_id(&self) -> &PeerId {
294        &self.my_peer_id
295    }
296
297    /// Get shared state for external access
298    pub fn state(&self) -> Arc<WebRTCState> {
299        self.state.clone()
300    }
301
302    /// Signal shutdown
303    pub fn shutdown(&self) {
304        let _ = self.shutdown.send(true);
305    }
306
307    /// Get connected peer count
308    pub async fn connected_count(&self) -> usize {
309        self.state
310            .connected_count
311            .load(std::sync::atomic::Ordering::Relaxed)
312    }
313
314    /// Get all peer statuses
315    pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
316        self.state
317            .peers
318            .read()
319            .await
320            .values()
321            .map(|p| PeerStatus {
322                peer_id: p.peer_id.to_string(),
323                pubkey: p.peer_id.pubkey.clone(),
324                state: p.state.to_string(),
325                direction: p.direction,
326                connected_at: Some(p.last_seen),
327                pool: p.pool,
328            })
329            .collect()
330    }
331
332    /// Get pool counts
333    /// Returns (follows_connected, follows_active, other_connected, other_active)
334    /// "active" = Connected or Connecting (excludes Discovered and Failed)
335    pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
336        let peers = self.state.peers.read().await;
337        let mut follows_connected = 0;
338        let mut follows_active = 0;
339        let mut other_connected = 0;
340        let mut other_active = 0;
341
342        for entry in peers.values() {
343            // Only count Connected or Connecting as "active" connections
344            // Discovered peers are just seen hellos, not real connections
345            let is_active = entry.state == ConnectionState::Connected
346                || entry.state == ConnectionState::Connecting;
347
348            match entry.pool {
349                PeerPool::Follows => {
350                    if is_active {
351                        follows_active += 1;
352                    }
353                    if entry.state == ConnectionState::Connected {
354                        follows_connected += 1;
355                    }
356                }
357                PeerPool::Other => {
358                    if is_active {
359                        other_active += 1;
360                    }
361                    if entry.state == ConnectionState::Connected {
362                        other_connected += 1;
363                    }
364                }
365            }
366        }
367
368        (follows_connected, follows_active, other_connected, other_active)
369    }
370
371    /// Check if we can accept a peer in a given pool
372    fn can_accept_peer(&self, pool: PeerPool, pool_counts: &(usize, usize, usize, usize)) -> bool {
373        let (_, follows_active, _, other_active) = *pool_counts;
374        match pool {
375            PeerPool::Follows => follows_active < self.config.pools.follows.max_connections,
376            PeerPool::Other => other_active < self.config.pools.other.max_connections,
377        }
378    }
379
380    /// Check if a pool is satisfied
381    #[allow(dead_code)]
382    fn is_pool_satisfied(&self, pool: PeerPool, pool_counts: &(usize, usize, usize, usize)) -> bool {
383        let (follows_connected, _, other_connected, _) = *pool_counts;
384        match pool {
385            PeerPool::Follows => follows_connected >= self.config.pools.follows.satisfied_connections,
386            PeerPool::Other => other_connected >= self.config.pools.other.satisfied_connections,
387        }
388    }
389
390    /// Check if both pools are satisfied
391    #[allow(dead_code)]
392    fn is_satisfied(&self, pool_counts: &(usize, usize, usize, usize)) -> bool {
393        self.is_pool_satisfied(PeerPool::Follows, pool_counts)
394            && self.is_pool_satisfied(PeerPool::Other, pool_counts)
395    }
396
397    /// Check if we should initiate connection (tie-breaking)
398    /// Lower UUID initiates - same as iris-client/hashtree-ts
399    fn should_initiate(&self, their_uuid: &str) -> bool {
400        self.my_peer_id.uuid < their_uuid.to_string()
401    }
402
403    /// Start the WebRTC manager - connects to relays and handles signaling
404    pub async fn run(&mut self) -> Result<()> {
405        info!(
406            "Starting WebRTC manager with peer ID: {}",
407            self.my_peer_id.short()
408        );
409
410        let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr::Event)>(100);
411
412        // Take the signaling receiver
413        let mut signaling_rx = self.signaling_rx.take().expect("signaling_rx already taken");
414
415        // Take the state event receiver
416        let mut state_event_rx = self.state_event_rx.take().expect("state_event_rx already taken");
417
418        // Create a shared write channel for all relay tasks
419        let (relay_write_tx, _) = tokio::sync::broadcast::channel::<SignalingMessage>(100);
420
421        // Spawn relay connections
422        for relay_url in &self.config.relays {
423            let url = relay_url.clone();
424            let event_tx = event_tx.clone();
425            let shutdown_rx = self.shutdown_rx.clone();
426            let keys = self.keys.clone();
427            let my_peer_id = self.my_peer_id.clone();
428            let hello_interval = Duration::from_millis(self.config.hello_interval_ms);
429            let relay_write_rx = relay_write_tx.subscribe();
430
431            tokio::spawn(async move {
432                if let Err(e) = Self::relay_task(
433                    url.clone(),
434                    event_tx,
435                    shutdown_rx,
436                    keys,
437                    my_peer_id,
438                    hello_interval,
439                    relay_write_rx,
440                )
441                .await
442                {
443                    error!("Relay {} error: {}", url, e);
444                }
445            });
446        }
447
448        // Process incoming events and outgoing signaling messages
449        let mut shutdown_rx = self.shutdown_rx.clone();
450        // Cleanup interval - run every 30 seconds as a fallback (not for real-time sync)
451        let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
452        loop {
453            tokio::select! {
454                _ = shutdown_rx.changed() => {
455                    if *shutdown_rx.borrow() {
456                        info!("WebRTC manager shutting down");
457                        break;
458                    }
459                }
460                Some((relay, event)) = event_rx.recv() => {
461                    if let Err(e) = self.handle_event(&relay, &event, &relay_write_tx).await {
462                        debug!("Error handling event from {}: {}", relay, e);
463                    }
464                }
465                Some(msg) = signaling_rx.recv() => {
466                    // Forward signaling messages to relay broadcast
467                    let _ = relay_write_tx.send(msg);
468                }
469                Some(event) = state_event_rx.recv() => {
470                    // Handle peer state events (connected, failed, disconnected)
471                    self.handle_peer_state_event(event).await;
472                }
473                _ = cleanup_interval.tick() => {
474                    // Periodic cleanup of stale peers and state sync (fallback)
475                    self.cleanup_stale_peers().await;
476                }
477            }
478        }
479
480        Ok(())
481    }
482
483    /// Connect to a single relay and handle messages
484    async fn relay_task(
485        url: String,
486        event_tx: mpsc::Sender<(String, nostr::Event)>,
487        mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
488        keys: Keys,
489        my_peer_id: PeerId,
490        hello_interval: Duration,
491        mut signaling_rx: tokio::sync::broadcast::Receiver<SignalingMessage>,
492    ) -> Result<()> {
493        info!("Connecting to relay: {}", url);
494
495        let (ws_stream, _) = connect_async(&url).await?;
496        let (mut write, mut read) = ws_stream.split();
497
498        // Subscribe to webrtc events - two filters:
499        // 1. Hello messages: kind 25050 with #l: "hello" tag
500        // 2. Directed messages: kind 25050 with #p tag (our pubkey)
501        let hello_filter = Filter::new()
502            .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
503            .custom_tag(
504                nostr::SingleLetterTag::lowercase(nostr::Alphabet::L),
505                vec![HELLO_TAG],
506            )
507            .since(nostr::Timestamp::now() - Duration::from_secs(60));
508
509        let directed_filter = Filter::new()
510            .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
511            .custom_tag(
512                nostr::SingleLetterTag::lowercase(nostr::Alphabet::P),
513                vec![keys.public_key().to_hex()],
514            )
515            .since(nostr::Timestamp::now() - Duration::from_secs(60));
516
517        let sub_id = nostr::SubscriptionId::generate();
518        let sub_msg = ClientMessage::req(sub_id.clone(), vec![hello_filter, directed_filter]);
519        write.send(Message::Text(sub_msg.as_json().into())).await?;
520
521        info!("Subscribed to {} for WebRTC events (kind {})", url, WEBRTC_KIND);
522
523        let mut last_hello = Instant::now() - hello_interval; // Send immediately
524        let mut hello_ticker = tokio::time::interval(Duration::from_secs(1));
525
526        loop {
527            tokio::select! {
528                _ = shutdown_rx.changed() => {
529                    if *shutdown_rx.borrow() {
530                        break;
531                    }
532                }
533                _ = hello_ticker.tick() => {
534                    // Send hello periodically
535                    if last_hello.elapsed() >= hello_interval {
536                        let hello = SignalingMessage::hello(&my_peer_id.uuid);
537                        if let Ok(event) = Self::create_signaling_event(&keys, &hello).await {
538                            let msg = ClientMessage::event(event);
539                            if write.send(Message::Text(msg.as_json().into())).await.is_ok() {
540                                debug!("Sent hello to {}", url);
541                            }
542                        }
543                        last_hello = Instant::now();
544                    }
545                }
546                // Handle outgoing signaling messages
547                Ok(signaling_msg) = signaling_rx.recv() => {
548                    info!("Sending {} via {}", signaling_msg.msg_type(), url);
549                    if let Ok(event) = Self::create_signaling_event(&keys, &signaling_msg).await {
550                        let event_id = event.id.to_string();
551                        let msg = ClientMessage::event(event);
552                        if write.send(Message::Text(msg.as_json().into())).await.is_ok() {
553                            info!("Sent {} to {} (event id: {})", signaling_msg.msg_type(), url, &event_id[..16]);
554                        }
555                    }
556                }
557                msg = read.next() => {
558                    match msg {
559                        Some(Ok(Message::Text(text))) => {
560                            if let Ok(relay_msg) = RelayMessage::from_json(&text) {
561                                if let RelayMessage::Event { event, .. } = relay_msg {
562                                    let _ = event_tx.send((url.clone(), *event)).await;
563                                }
564                            }
565                        }
566                        Some(Err(e)) => {
567                            error!("WebSocket error from {}: {}", url, e);
568                            break;
569                        }
570                        None => {
571                            warn!("WebSocket closed: {}", url);
572                            break;
573                        }
574                        _ => {}
575                    }
576                }
577            }
578        }
579
580        Ok(())
581    }
582
583    /// Create a signaling event
584    ///
585    /// For directed messages (offer, answer, candidate, candidates), use NIP-17 style
586    /// gift wrapping with ephemeral keys for privacy.
587    /// Hello messages use kind 25050 with #l: "hello" tag and peerId.
588    async fn create_signaling_event(keys: &Keys, msg: &SignalingMessage) -> Result<nostr::Event> {
589        // Check if message has a recipient (needs gift wrapping)
590        if let Some(recipient_str) = msg.recipient() {
591            // Parse recipient to get their pubkey
592            if let Some(peer_id) = PeerId::from_string(recipient_str) {
593                let recipient_pubkey = PublicKey::from_hex(&peer_id.pubkey)?;
594
595                // Create seal with sender's actual pubkey (the "rumor")
596                let seal = serde_json::json!({
597                    "pubkey": keys.public_key().to_hex(),
598                    "kind": WEBRTC_KIND,
599                    "content": serde_json::to_string(msg)?,
600                    "tags": []
601                });
602
603                // Generate ephemeral keypair for the wrapper
604                let ephemeral_keys = Keys::generate();
605
606                // Encrypt the seal for the recipient using ephemeral key (NIP-44)
607                let encrypted_content = nip44::encrypt(
608                    ephemeral_keys.secret_key(),
609                    &recipient_pubkey,
610                    &seal.to_string(),
611                    nip44::Version::V2
612                )?;
613
614                // Create wrapper event with ephemeral key
615                let created_at = nostr::Timestamp::now();
616                let expiration = created_at + Duration::from_secs(5 * 60); // 5 minutes
617
618                let tags = vec![
619                    Tag::parse(&["p", &recipient_pubkey.to_hex()])?,
620                    Tag::parse(&["expiration", &expiration.as_u64().to_string()])?,
621                ];
622
623                let event = EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), encrypted_content, tags)
624                    .to_event(&ephemeral_keys)?;
625
626                return Ok(event);
627            }
628        }
629
630        // Hello messages - kind 25050 with #l: "hello" tag and peerId
631        let tags = vec![
632            Tag::parse(&["l", HELLO_TAG])?,
633            Tag::parse(&["peerId", msg.peer_id()])?,
634        ];
635
636        let event = EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), "", tags)
637            .to_event(keys)?;
638
639        Ok(event)
640    }
641
642    /// Handle an incoming event
643    ///
644    /// Messages may be:
645    /// 1. Hello messages: kind 25050 with #l: "hello" tag and peerId
646    /// 2. Gift-wrapped directed messages: kind 25050 with #p tag, encrypted with ephemeral key
647    async fn handle_event(
648        &self,
649        relay: &str,
650        event: &nostr::Event,
651        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
652    ) -> Result<()> {
653        // Must be kind 25050
654        if event.kind != Kind::Ephemeral(WEBRTC_KIND as u16) {
655            return Ok(());
656        }
657
658        // Helper to get tag value
659        let get_tag = |name: &str| -> Option<String> {
660            event.tags.iter().find_map(|tag| {
661                let v: Vec<String> = tag.clone().to_vec();
662                if v.len() >= 2 && v[0] == name {
663                    Some(v[1].clone())
664                } else {
665                    None
666                }
667            })
668        };
669
670        // Check if this is a hello message (#l: "hello" tag)
671        let l_tag = get_tag("l");
672        if l_tag.as_deref() == Some(HELLO_TAG) {
673            let sender_pubkey = event.pubkey.to_hex();
674
675            // Skip our own hello messages
676            if sender_pubkey == self.my_peer_id.pubkey {
677                return Ok(());
678            }
679
680            if let Some(their_uuid) = get_tag("peerId") {
681                debug!("Received hello from {} via {}", &sender_pubkey[..8], relay);
682                self.handle_hello(&sender_pubkey, &their_uuid, relay_write_tx)
683                    .await?;
684            }
685            return Ok(());
686        }
687
688        // Check if this is a directed message for us (#p tag with our pubkey)
689        let p_tag = get_tag("p");
690        if p_tag.as_deref() != Some(&self.keys.public_key().to_hex()) {
691            // Not for us - ignore silently
692            return Ok(());
693        }
694
695        // Gift-wrapped directed message - decrypt using our key and ephemeral sender's pubkey
696        if event.content.is_empty() {
697            return Ok(());
698        }
699
700        // Try to unwrap the gift - decrypt with our key and the ephemeral sender's pubkey
701        let seal: serde_json::Value = match nip44::decrypt(self.keys.secret_key(), &event.pubkey, &event.content) {
702            Ok(plaintext) => {
703                match serde_json::from_str(&plaintext) {
704                    Ok(v) => v,
705                    Err(_) => return Ok(()),
706                }
707            }
708            Err(_) => {
709                // Can't decrypt - not for us or invalid
710                return Ok(());
711            }
712        };
713
714        // Extract the actual sender's pubkey and content from the seal
715        let sender_pubkey = seal.get("pubkey")
716            .and_then(|v| v.as_str())
717            .ok_or_else(|| anyhow::anyhow!("Missing pubkey in seal"))?;
718
719        // Skip our own messages
720        if sender_pubkey == self.my_peer_id.pubkey {
721            return Ok(());
722        }
723
724        let content = seal.get("content")
725            .and_then(|v| v.as_str())
726            .ok_or_else(|| anyhow::anyhow!("Missing content in seal"))?;
727
728        let raw_msg: serde_json::Value = serde_json::from_str(content)?;
729        let msg_type = raw_msg.get("type").and_then(|v| v.as_str()).unwrap_or("");
730
731        // Support hashtree-ts format: { type, peerId, targetPeerId, sdp/candidate/candidates }
732        if raw_msg.get("targetPeerId").is_some() {
733            let target_peer = raw_msg
734                .get("targetPeerId")
735                .and_then(|v| v.as_str())
736                .unwrap_or("");
737            if target_peer != self.my_peer_id.to_string() {
738                return Ok(());
739            }
740
741            let peer_id = raw_msg.get("peerId").and_then(|v| v.as_str()).unwrap_or("");
742            let their_uuid = peer_id.split(':').nth(1).unwrap_or(peer_id);
743
744            match msg_type {
745                "offer" => {
746                    let sdp = raw_msg.get("sdp").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
747                    let offer = serde_json::json!({ "type": "offer", "sdp": sdp });
748                    self.handle_offer(&sender_pubkey, their_uuid, offer, relay_write_tx).await?;
749                }
750                "answer" => {
751                    let sdp = raw_msg.get("sdp").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
752                    let answer = serde_json::json!({ "type": "answer", "sdp": sdp });
753                    self.handle_answer(&sender_pubkey, their_uuid, answer).await?;
754                }
755                "candidate" => {
756                    let candidate = raw_msg.get("candidate").and_then(|v| v.as_str()).unwrap_or("");
757                    if !candidate.is_empty() {
758                        let candidate_json = serde_json::json!({
759                            "candidate": candidate,
760                            "sdpMid": raw_msg.get("sdpMid"),
761                            "sdpMLineIndex": raw_msg.get("sdpMLineIndex"),
762                        });
763                        self.handle_candidate(&sender_pubkey, their_uuid, candidate_json).await?;
764                    }
765                }
766                "candidates" => {
767                    let candidates = raw_msg
768                        .get("candidates")
769                        .and_then(|v| v.as_array())
770                        .map(|entries| {
771                            entries.iter().filter_map(|entry| {
772                                if let Some(candidate_str) = entry.get("candidate").and_then(|v| v.as_str()).or_else(|| entry.as_str()) {
773                                    Some(serde_json::json!({
774                                        "candidate": candidate_str,
775                                        "sdpMid": entry.get("sdpMid"),
776                                        "sdpMLineIndex": entry.get("sdpMLineIndex"),
777                                    }))
778                                } else {
779                                    None
780                                }
781                            }).collect::<Vec<_>>()
782                        })
783                        .unwrap_or_default();
784                    self.handle_candidates(&sender_pubkey, their_uuid, candidates).await?;
785                }
786                _ => {}
787            }
788
789            return Ok(());
790        }
791
792        let msg: SignalingMessage = serde_json::from_value(raw_msg)?;
793
794        debug!(
795            "Received {} from {} via {} (gift-wrapped)",
796            msg.msg_type(),
797            &sender_pubkey[..8],
798            relay
799        );
800
801        match msg {
802            SignalingMessage::Hello { .. } => {
803                // Hello messages should come via tags, not gift wrap
804                return Ok(());
805            }
806            SignalingMessage::Offer {
807                recipient,
808                peer_id: their_uuid,
809                offer,
810            } => {
811                if recipient != self.my_peer_id.to_string() {
812                    return Ok(()); // Not for us
813                }
814                if let Err(e) = self.handle_offer(&sender_pubkey, &their_uuid, offer, relay_write_tx)
815                    .await {
816                    error!("handle_offer FAILED: sender={}, uuid={}, error={:?}", &sender_pubkey[..8.min(sender_pubkey.len())], their_uuid, e);
817                    return Err(e);
818                }
819            }
820            SignalingMessage::Answer {
821                recipient,
822                peer_id: their_uuid,
823                answer,
824            } => {
825                if recipient != self.my_peer_id.to_string() {
826                    return Ok(());
827                }
828                self.handle_answer(&sender_pubkey, &their_uuid, answer)
829                    .await?;
830            }
831            SignalingMessage::Candidate {
832                recipient,
833                peer_id: their_uuid,
834                candidate,
835            } => {
836                if recipient != self.my_peer_id.to_string() {
837                    return Ok(());
838                }
839                self.handle_candidate(&sender_pubkey, &their_uuid, candidate)
840                    .await?;
841            }
842            SignalingMessage::Candidates {
843                recipient,
844                peer_id: their_uuid,
845                candidates,
846            } => {
847                if recipient != self.my_peer_id.to_string() {
848                    return Ok(());
849                }
850                self.handle_candidates(&sender_pubkey, &their_uuid, candidates)
851                    .await?;
852            }
853        }
854
855        Ok(())
856    }
857
858    /// Handle incoming hello message
859    async fn handle_hello(
860        &self,
861        sender_pubkey: &str,
862        their_uuid: &str,
863        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
864    ) -> Result<()> {
865        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
866        let peer_key = full_peer_id.to_string();
867
868        // Check if we already have this peer
869        {
870            let peers = self.state.peers.read().await;
871            if let Some(entry) = peers.get(&peer_key) {
872                // Already connected or connecting, just update last_seen
873                if entry.state == ConnectionState::Connected
874                    || entry.state == ConnectionState::Connecting
875                {
876                    return Ok(());
877                }
878            }
879        }
880
881        // Classify the peer into a pool
882        let pool = (self.peer_classifier)(sender_pubkey);
883
884        // Check pool limits
885        let pool_counts = self.get_pool_counts().await;
886        if !self.can_accept_peer(pool, &pool_counts) {
887            debug!("Ignoring hello from {} - pool {:?} is full", full_peer_id.short(), pool);
888            return Ok(());
889        }
890
891        // Decide if we should initiate based on tie-breaking
892        let should_initiate = self.should_initiate(their_uuid);
893
894        // If pool is already satisfied, don't initiate new outbound connections
895        // This reserves space for inbound connections
896        let pool_satisfied = self.is_pool_satisfied(pool, &pool_counts);
897        let will_initiate = should_initiate && !pool_satisfied;
898
899        info!(
900            "Discovered peer: {} (pool: {:?}, initiate: {}, pool_satisfied: {})",
901            full_peer_id.short(),
902            pool,
903            will_initiate,
904            pool_satisfied
905        );
906
907        // If we're not initiating and pool is satisfied, don't even add to discovered
908        // (we won't accept their offer either since pool check happens in handle_offer)
909        if !will_initiate && pool_satisfied {
910            debug!("Pool {:?} is satisfied, not tracking peer {}", pool, full_peer_id.short());
911            return Ok(());
912        }
913
914        // Create peer entry with pool assignment
915        {
916            let mut peers = self.state.peers.write().await;
917            peers.insert(
918                peer_key.clone(),
919                PeerEntry {
920                    peer_id: full_peer_id.clone(),
921                    direction: if will_initiate {
922                        PeerDirection::Outbound
923                    } else {
924                        PeerDirection::Inbound
925                    },
926                    state: ConnectionState::Discovered,
927                    last_seen: Instant::now(),
928                    peer: None,
929                    pool,
930                    bytes_sent: 0,
931                    bytes_received: 0,
932                },
933            );
934        }
935
936        // If we should initiate, create offer
937        if will_initiate {
938            self.initiate_connection(&full_peer_id, pool, relay_write_tx)
939                .await?;
940        }
941
942        Ok(())
943    }
944
945    /// Initiate a connection to a peer (create and send offer)
946    async fn initiate_connection(
947        &self,
948        peer_id: &PeerId,
949        pool: PeerPool,
950        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
951    ) -> Result<()> {
952        let peer_key = peer_id.to_string();
953
954        info!("Initiating connection to {} (pool: {:?})", peer_id.short(), pool);
955
956        // Create peer connection with content store and state events
957        let mut peer = Peer::new_with_store_and_events(
958            peer_id.clone(),
959            PeerDirection::Outbound,
960            self.my_peer_id.clone(),
961            self.signaling_tx.clone(),
962            self.config.stun_servers.clone(),
963            self.store.clone(),
964            Some(self.state_event_tx.clone()),
965        )
966        .await?;
967
968        peer.setup_handlers().await?;
969
970        // Create offer
971        let offer = peer.connect().await?;
972
973        // Update state
974        {
975            let mut peers = self.state.peers.write().await;
976            if let Some(entry) = peers.get_mut(&peer_key) {
977                entry.state = ConnectionState::Connecting;
978                entry.peer = Some(peer);
979                entry.pool = pool;
980            }
981        }
982
983        // Send offer
984        let offer_msg = SignalingMessage::Offer {
985            offer,
986            recipient: peer_id.to_string(),
987            peer_id: self.my_peer_id.uuid.clone(),
988        };
989        if relay_write_tx.send(offer_msg).is_err() {
990            warn!("Failed to broadcast offer to {}", peer_id.short());
991        }
992
993        info!("Sent offer to {}", peer_id.short());
994
995        Ok(())
996    }
997
998    /// Handle incoming offer
999    async fn handle_offer(
1000        &self,
1001        sender_pubkey: &str,
1002        their_uuid: &str,
1003        offer: serde_json::Value,
1004        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1005    ) -> Result<()> {
1006        debug!("handle_offer ENTRY: sender={}, uuid={}", &sender_pubkey[..8.min(sender_pubkey.len())], their_uuid);
1007        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1008        let peer_key = full_peer_id.to_string();
1009
1010        // Classify the peer into a pool
1011        let pool = (self.peer_classifier)(sender_pubkey);
1012
1013        info!("Received offer from {} (pool: {:?})", full_peer_id.short(), pool);
1014
1015        // Check if we already have this peer with an actual connection
1016        {
1017            let peers = self.state.peers.read().await;
1018            debug!("Checking for existing peer, peer_key: {}, known_peers: {}", peer_key, peers.len());
1019            if let Some(entry) = peers.get(&peer_key) {
1020                // Only skip if we have an actual peer connection (not just discovered)
1021                if entry.peer.is_some() {
1022                    debug!("Already have peer {} with connection, skipping offer", full_peer_id.short());
1023                    return Ok(());
1024                }
1025                debug!("Peer {} exists but has no connection, proceeding", full_peer_id.short());
1026            } else {
1027                debug!("Peer {} not found in peers map, will create new entry", full_peer_id.short());
1028            }
1029        }
1030
1031        // Check pool limits
1032        let pool_counts = self.get_pool_counts().await;
1033        debug!("Pool counts: {:?}, checking can_accept_peer for {:?}", pool_counts, pool);
1034        if !self.can_accept_peer(pool, &pool_counts) {
1035            warn!("Rejecting offer from {} - pool {:?} is full", full_peer_id.short(), pool);
1036            return Ok(());
1037        }
1038        debug!("Pool check passed for {}", full_peer_id.short());
1039
1040        // Create peer connection with content store and state events
1041        debug!("Creating peer connection for {}", full_peer_id.short());
1042        let mut peer = Peer::new_with_store_and_events(
1043            full_peer_id.clone(),
1044            PeerDirection::Inbound,
1045            self.my_peer_id.clone(),
1046            self.signaling_tx.clone(),
1047            self.config.stun_servers.clone(),
1048            self.store.clone(),
1049            Some(self.state_event_tx.clone()),
1050        )
1051        .await?;
1052        debug!("Peer connection created for {}", full_peer_id.short());
1053
1054        peer.setup_handlers().await?;
1055        debug!("Handlers set up for {}", full_peer_id.short());
1056
1057        // Handle offer and create answer
1058        let answer = peer.handle_offer(offer).await?;
1059        debug!("Answer created for {}", full_peer_id.short());
1060
1061        // Update state
1062        {
1063            let mut peers = self.state.peers.write().await;
1064            peers.insert(
1065                peer_key,
1066                PeerEntry {
1067                    peer_id: full_peer_id.clone(),
1068                    direction: PeerDirection::Inbound,
1069                    state: ConnectionState::Connecting,
1070                    last_seen: Instant::now(),
1071                    peer: Some(peer),
1072                    pool,
1073                    bytes_sent: 0,
1074                    bytes_received: 0,
1075                },
1076            );
1077        }
1078
1079        // Send answer
1080        // Note: peer_id is just the UUID, not full pubkey:uuid
1081        // The recipient will construct full peer_id from sender pubkey + this UUID
1082        let answer_msg = SignalingMessage::Answer {
1083            answer,
1084            recipient: full_peer_id.to_string(),
1085            peer_id: self.my_peer_id.uuid.clone(),
1086        };
1087        if relay_write_tx.send(answer_msg).is_err() {
1088            warn!("Failed to send answer to {}", full_peer_id.short());
1089        }
1090        info!("Sent answer to {}", full_peer_id.short());
1091
1092        Ok(())
1093    }
1094
1095    /// Handle incoming answer
1096    async fn handle_answer(
1097        &self,
1098        sender_pubkey: &str,
1099        their_uuid: &str,
1100        answer: serde_json::Value,
1101    ) -> Result<()> {
1102        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1103        let peer_key = full_peer_id.to_string();
1104
1105        info!("Received answer from {}", full_peer_id.short());
1106
1107        let mut peers = self.state.peers.write().await;
1108        if let Some(entry) = peers.get_mut(&peer_key) {
1109            // Skip if already connected - duplicate answers from multiple relays
1110            if entry.state == ConnectionState::Connected {
1111                debug!("Ignoring duplicate answer from {} - already connected", full_peer_id.short());
1112                return Ok(());
1113            }
1114            if let Some(ref mut peer) = entry.peer {
1115                // Check WebRTC signaling state before applying answer
1116                use webrtc::peer_connection::signaling_state::RTCSignalingState;
1117                let signaling_state = peer.signaling_state();
1118                if signaling_state != RTCSignalingState::HaveLocalOffer {
1119                    debug!(
1120                        "Ignoring answer from {} - signaling state is {:?}, not HaveLocalOffer",
1121                        full_peer_id.short(),
1122                        signaling_state
1123                    );
1124                    return Ok(());
1125                }
1126                peer.handle_answer(answer).await?;
1127                info!("Applied answer from {}", full_peer_id.short());
1128            } else {
1129                debug!("Peer {} has no connection object", full_peer_id.short());
1130            }
1131        } else {
1132            debug!("No peer found for key: {}", peer_key);
1133        }
1134
1135        Ok(())
1136    }
1137
1138    /// Handle incoming ICE candidate
1139    async fn handle_candidate(
1140        &self,
1141        sender_pubkey: &str,
1142        their_uuid: &str,
1143        candidate: serde_json::Value,
1144    ) -> Result<()> {
1145        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1146        let peer_key = full_peer_id.to_string();
1147
1148        info!("Received ICE candidate from {}", full_peer_id.short());
1149
1150        let mut peers = self.state.peers.write().await;
1151        if let Some(entry) = peers.get_mut(&peer_key) {
1152            if let Some(ref mut peer) = entry.peer {
1153                peer.handle_candidate(candidate).await?;
1154            }
1155        }
1156
1157        Ok(())
1158    }
1159
1160    /// Handle batched ICE candidates
1161    async fn handle_candidates(
1162        &self,
1163        sender_pubkey: &str,
1164        their_uuid: &str,
1165        candidates: Vec<serde_json::Value>,
1166    ) -> Result<()> {
1167        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1168        let peer_key = full_peer_id.to_string();
1169
1170        debug!(
1171            "Received {} candidates from {}",
1172            candidates.len(),
1173            full_peer_id.short()
1174        );
1175
1176        let mut peers = self.state.peers.write().await;
1177        if let Some(entry) = peers.get_mut(&peer_key) {
1178            if let Some(ref mut peer) = entry.peer {
1179                for candidate in candidates {
1180                    if let Err(e) = peer.handle_candidate(candidate).await {
1181                        debug!("Failed to add candidate: {}", e);
1182                    }
1183                }
1184            }
1185        }
1186
1187        Ok(())
1188    }
1189
1190    /// Handle peer state change events from peer connections
1191    async fn handle_peer_state_event(&self, event: PeerStateEvent) {
1192        match event {
1193            PeerStateEvent::Connected(peer_id) => {
1194                let peer_key = peer_id.to_string();
1195                let mut peers = self.state.peers.write().await;
1196                if let Some(entry) = peers.get_mut(&peer_key) {
1197                    if entry.state != ConnectionState::Connected {
1198                        info!("Peer {} connected (via state event)", peer_id.short());
1199                        entry.state = ConnectionState::Connected;
1200                        // Update connected count
1201                        self.state.connected_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1202                    }
1203                }
1204            }
1205            PeerStateEvent::Failed(peer_id) => {
1206                let peer_key = peer_id.to_string();
1207                info!("Peer {} connection failed - removing from pool", peer_id.short());
1208                let mut peers = self.state.peers.write().await;
1209                if let Some(entry) = peers.remove(&peer_key) {
1210                    // Decrement connected count if was connected
1211                    if entry.state == ConnectionState::Connected {
1212                        self.state.connected_count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
1213                    }
1214                    // Close the peer connection if it exists
1215                    if let Some(peer) = entry.peer {
1216                        let _ = peer.close().await;
1217                    }
1218                }
1219            }
1220            PeerStateEvent::Disconnected(peer_id) => {
1221                let peer_key = peer_id.to_string();
1222                info!("Peer {} disconnected - removing from pool", peer_id.short());
1223                let mut peers = self.state.peers.write().await;
1224                if let Some(entry) = peers.remove(&peer_key) {
1225                    // Decrement connected count if was connected
1226                    if entry.state == ConnectionState::Connected {
1227                        self.state.connected_count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
1228                    }
1229                    // Close the peer connection if it exists
1230                    if let Some(peer) = entry.peer {
1231                        let _ = peer.close().await;
1232                    }
1233                }
1234            }
1235        }
1236    }
1237
1238    /// Cleanup stale peers and sync connection states (fallback, runs every 30s)
1239    async fn cleanup_stale_peers(&self) {
1240        let mut peers = self.state.peers.write().await;
1241        let mut connected_count = 0;
1242        let mut to_remove = Vec::new();
1243        let stale_timeout = Duration::from_secs(60); // Remove peers stuck in Discovered/Connecting for 60s
1244
1245        for (key, entry) in peers.iter_mut() {
1246            if let Some(ref peer) = entry.peer {
1247                // Sync connected state as fallback (in case event was missed)
1248                if peer.is_connected() {
1249                    if entry.state != ConnectionState::Connected {
1250                        info!("Peer {} is now connected (sync fallback)", entry.peer_id.short());
1251                        entry.state = ConnectionState::Connected;
1252                    }
1253                    connected_count += 1;
1254                } else if entry.state == ConnectionState::Connecting && entry.last_seen.elapsed() > stale_timeout {
1255                    // Peer stuck in Connecting for too long - mark for removal
1256                    info!("Removing stale peer {} (stuck in Connecting for {:?})", entry.peer_id.short(), entry.last_seen.elapsed());
1257                    to_remove.push(key.clone());
1258                }
1259            } else if entry.state == ConnectionState::Discovered && entry.last_seen.elapsed() > stale_timeout {
1260                // Discovered peer with no actual connection - remove
1261                debug!("Removing stale discovered peer {}", entry.peer_id.short());
1262                to_remove.push(key.clone());
1263            }
1264        }
1265
1266        // Remove stale peers
1267        for key in to_remove {
1268            if let Some(entry) = peers.remove(&key) {
1269                if let Some(peer) = entry.peer {
1270                    let _ = peer.close().await;
1271                }
1272            }
1273        }
1274
1275        self.state
1276            .connected_count
1277            .store(connected_count, std::sync::atomic::Ordering::Relaxed);
1278    }
1279}
1280
1281// Keep the old PeerState for backward compatibility with tests
1282#[allow(dead_code)]
1283#[derive(Debug, Clone)]
1284pub struct PeerState {
1285    pub peer_id: PeerId,
1286    pub direction: PeerDirection,
1287    pub state: String,
1288    pub last_seen: Instant,
1289}