Skip to main content

hashtree_cli/webrtc/
signaling.rs

1//! WebRTC signaling over Nostr relays
2//!
3//! Protocol (compatible with hashtree-ts):
4//! - All signaling uses ephemeral kind 25050
5//! - Hello messages: #l: "hello" tag, broadcast for peer discovery (unencrypted)
6//! - Directed signaling (offer, answer, candidate, candidates): NIP-17 style
7//!   gift wrap for privacy - wrapped with ephemeral key, #p tag with recipient
8//!
9//! Security: Directed messages use gift wrapping with ephemeral keys so that
10//! relays cannot see the actual sender or correlate messages.
11
12use anyhow::Result;
13use futures::{SinkExt, StreamExt};
14use nostr::{nips::nip44, ClientMessage, EventBuilder, Filter, JsonUtil, Keys, Kind, PublicKey, RelayMessage, Tag};
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use tokio::sync::{mpsc, Mutex, RwLock};
19use tokio_tungstenite::{connect_async, tungstenite::Message};
20use tracing::{debug, error, info, warn};
21
22use crate::nostr_relay::NostrRelay;
23use super::peer::{ContentStore, Peer, PendingRequest};
24use super::types::{
25    PeerDirection, PeerId, PeerPool, PeerStateEvent, PeerStatus, SignalingMessage, WebRTCConfig, WEBRTC_KIND, HELLO_TAG,
26};
27
28/// Callback type for classifying peers into pools
29pub type PeerClassifier = Arc<dyn Fn(&str) -> PeerPool + Send + Sync>;
30
31/// Connection state for a peer
32#[derive(Debug, Clone, PartialEq)]
33pub enum ConnectionState {
34    Discovered,
35    Connecting,
36    Connected,
37    Failed,
38}
39
40impl std::fmt::Display for ConnectionState {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        match self {
43            ConnectionState::Discovered => write!(f, "discovered"),
44            ConnectionState::Connecting => write!(f, "connecting"),
45            ConnectionState::Connected => write!(f, "connected"),
46            ConnectionState::Failed => write!(f, "failed"),
47        }
48    }
49}
50
51/// Peer entry in the manager
52pub struct PeerEntry {
53    pub peer_id: PeerId,
54    pub direction: PeerDirection,
55    pub state: ConnectionState,
56    pub last_seen: Instant,
57    pub peer: Option<Peer>,
58    pub pool: PeerPool,
59    pub bytes_sent: u64,
60    pub bytes_received: u64,
61}
62
63/// Shared state for WebRTC manager
64pub struct WebRTCState {
65    pub peers: RwLock<HashMap<String, PeerEntry>>,
66    pub connected_count: std::sync::atomic::AtomicUsize,
67    /// Total bytes sent across all peers (cumulative)
68    pub bytes_sent: std::sync::atomic::AtomicU64,
69    /// Total bytes received across all peers (cumulative)
70    pub bytes_received: std::sync::atomic::AtomicU64,
71}
72
73impl WebRTCState {
74    pub fn new() -> Self {
75        Self {
76            peers: RwLock::new(HashMap::new()),
77            connected_count: std::sync::atomic::AtomicUsize::new(0),
78            bytes_sent: std::sync::atomic::AtomicU64::new(0),
79            bytes_received: std::sync::atomic::AtomicU64::new(0),
80        }
81    }
82
83    /// Get current bandwidth stats (bytes sent/received)
84    pub fn get_bandwidth(&self) -> (u64, u64) {
85        (
86            self.bytes_sent.load(std::sync::atomic::Ordering::Relaxed),
87            self.bytes_received.load(std::sync::atomic::Ordering::Relaxed),
88        )
89    }
90
91    /// Record bytes sent (global + per-peer)
92    pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
93        self.bytes_sent.fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
94        if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
95            entry.bytes_sent += bytes;
96        }
97    }
98
99    /// Record bytes received (global + per-peer)
100    pub async fn record_received(&self, peer_id: &str, bytes: u64) {
101        self.bytes_received.fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
102        if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
103            entry.bytes_received += bytes;
104        }
105    }
106
107    /// Request content by hash from connected peers
108    /// Queries peers sequentially with 500ms intervals until one responds
109    /// Returns the first successful response, or None if no peer has it
110    pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
111        use super::types::{DataRequest, MAX_HTL, encode_request};
112
113        let peers = self.peers.read().await;
114
115        // Collect connected peers with data channels
116        // We need to collect the Arc references first, then acquire locks outside the iterator
117        let peer_refs: Vec<_> = peers
118            .values()
119            .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
120            .filter_map(|p| {
121                p.peer.as_ref().map(|peer| {
122                    (p.peer_id.short(), peer.data_channel.clone(), peer.pending_requests.clone())
123                })
124            })
125            .collect();
126
127        drop(peers); // Release the read lock
128
129        // Now acquire locks and filter to peers with active data channels
130        let mut connected_peers: Vec<(String, Arc<Mutex<HashMap<String, PendingRequest>>>, Arc<webrtc::data_channel::RTCDataChannel>)> = Vec::new();
131        for (peer_id, dc_mutex, pending) in peer_refs {
132            let dc_guard = dc_mutex.lock().await;
133            if let Some(dc) = dc_guard.as_ref() {
134                connected_peers.push((peer_id, pending, dc.clone()));
135            }
136        }
137
138        if connected_peers.is_empty() {
139            debug!("No connected peers to query for {}", &hash_hex[..8.min(hash_hex.len())]);
140            return None;
141        }
142
143        debug!(
144            "Querying {} connected peers for {} (sequential with 500ms delay)",
145            connected_peers.len(),
146            &hash_hex[..8.min(hash_hex.len())]
147        );
148
149        // Convert hex to binary hash once
150        let hash_bytes = match hex::decode(hash_hex) {
151            Ok(b) => b,
152            Err(_) => return None,
153        };
154
155        // Query peers sequentially with 500ms delay between each
156        for (_i, (peer_id, pending_requests, dc)) in connected_peers.into_iter().enumerate() {
157            debug!("Querying peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
158
159            // Create response channel
160            let (tx, rx) = tokio::sync::oneshot::channel();
161
162            // Store pending request
163            {
164                let mut pending = pending_requests.lock().await;
165                pending.insert(
166                    hash_hex.to_string(),
167                    super::PendingRequest {
168                        hash: hash_bytes.clone(),
169                        response_tx: tx,
170                    },
171                );
172            }
173
174            // Send request
175            let req = DataRequest {
176                h: hash_bytes.clone(),
177                htl: MAX_HTL,
178            };
179            if let Ok(wire) = encode_request(&req) {
180                let wire_len = wire.len() as u64;
181                if dc.send(&bytes::Bytes::from(wire)).await.is_ok() {
182                    self.record_sent(&peer_id, wire_len).await;
183                    // Wait 500ms for response from this peer
184                    match tokio::time::timeout(std::time::Duration::from_millis(500), rx).await {
185                        Ok(Ok(Some(data))) => {
186                            self.record_received(&peer_id, data.len() as u64).await;
187                            debug!("Got response from peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
188                            return Some(data);
189                        }
190                        _ => {
191                            // Timeout or no data - clean up and try next peer
192                            debug!("No response from peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
193                        }
194                    }
195                }
196            }
197
198            // Clean up pending request
199            let mut pending = pending_requests.lock().await;
200            pending.remove(hash_hex);
201        }
202
203        debug!("No peer had data for {}", &hash_hex[..8.min(hash_hex.len())]);
204        None
205    }
206}
207
208/// WebRTC manager handles peer discovery and connection management
209pub struct WebRTCManager {
210    config: WebRTCConfig,
211    my_peer_id: PeerId,
212    keys: Keys,
213    state: Arc<WebRTCState>,
214    shutdown: Arc<tokio::sync::watch::Sender<bool>>,
215    shutdown_rx: tokio::sync::watch::Receiver<bool>,
216    /// Channel to send signaling messages to relays
217    signaling_tx: mpsc::Sender<SignalingMessage>,
218    signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
219    /// Optional content store for serving hash requests
220    store: Option<Arc<dyn ContentStore>>,
221    /// Peer classifier for pool assignment
222    peer_classifier: PeerClassifier,
223    /// Optional Nostr relay for data-channel relay messages
224    nostr_relay: Option<Arc<NostrRelay>>,
225    /// Channel for peer state events (connection success/failure)
226    state_event_tx: mpsc::Sender<PeerStateEvent>,
227    state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
228}
229
230impl WebRTCManager {
231    /// Create a new WebRTC manager
232    pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
233        let pubkey = keys.public_key().to_hex();
234        let my_peer_id = PeerId::new(pubkey, None);
235        let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
236        let (signaling_tx, signaling_rx) = mpsc::channel(100);
237        let (state_event_tx, state_event_rx) = mpsc::channel(100);
238
239        // Default classifier: all peers go to 'other' pool
240        let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
241
242        Self {
243            config,
244            my_peer_id,
245            keys,
246            state: Arc::new(WebRTCState::new()),
247            shutdown: Arc::new(shutdown),
248            shutdown_rx,
249            signaling_tx,
250            signaling_rx: Some(signaling_rx),
251            store: None,
252            peer_classifier,
253            nostr_relay: None,
254            state_event_tx,
255            state_event_rx: Some(state_event_rx),
256        }
257    }
258
259    /// Create a new WebRTC manager with a peer classifier
260    pub fn new_with_classifier(keys: Keys, config: WebRTCConfig, classifier: PeerClassifier) -> Self {
261        let mut manager = Self::new(keys, config);
262        manager.peer_classifier = classifier;
263        manager
264    }
265
266    /// Create a new WebRTC manager with a content store for serving hash requests
267    pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
268        let mut manager = Self::new(keys, config);
269        manager.store = Some(store);
270        manager
271    }
272
273    /// Create a new WebRTC manager with store and classifier
274    pub fn new_with_store_and_classifier(
275        keys: Keys,
276        config: WebRTCConfig,
277        store: Arc<dyn ContentStore>,
278        classifier: PeerClassifier,
279    ) -> Self {
280        let mut manager = Self::new(keys, config);
281        manager.store = Some(store);
282        manager.peer_classifier = classifier;
283        manager
284    }
285
286    /// Set the content store for serving hash requests
287    pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
288        self.store = Some(store);
289    }
290
291    /// Set the peer classifier
292    pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
293        self.peer_classifier = classifier;
294    }
295
296    /// Set the Nostr relay for data-channel relay messages
297    pub fn set_nostr_relay(&mut self, relay: Arc<NostrRelay>) {
298        self.nostr_relay = Some(relay);
299    }
300
301    /// Get my peer ID
302    pub fn my_peer_id(&self) -> &PeerId {
303        &self.my_peer_id
304    }
305
306    /// Get shared state for external access
307    pub fn state(&self) -> Arc<WebRTCState> {
308        self.state.clone()
309    }
310
311    /// Signal shutdown
312    pub fn shutdown(&self) {
313        let _ = self.shutdown.send(true);
314    }
315
316    /// Get connected peer count
317    pub async fn connected_count(&self) -> usize {
318        self.state
319            .connected_count
320            .load(std::sync::atomic::Ordering::Relaxed)
321    }
322
323    /// Get all peer statuses
324    pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
325        self.state
326            .peers
327            .read()
328            .await
329            .values()
330            .map(|p| PeerStatus {
331                peer_id: p.peer_id.to_string(),
332                pubkey: p.peer_id.pubkey.clone(),
333                state: p.state.to_string(),
334                direction: p.direction,
335                connected_at: Some(p.last_seen),
336                pool: p.pool,
337            })
338            .collect()
339    }
340
341    /// Get pool counts
342    /// Returns (follows_connected, follows_active, other_connected, other_active)
343    /// "active" = Connected or Connecting (excludes Discovered and Failed)
344    pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
345        let peers = self.state.peers.read().await;
346        let mut follows_connected = 0;
347        let mut follows_active = 0;
348        let mut other_connected = 0;
349        let mut other_active = 0;
350
351        for entry in peers.values() {
352            // Only count Connected or Connecting as "active" connections
353            // Discovered peers are just seen hellos, not real connections
354            let is_active = entry.state == ConnectionState::Connected
355                || entry.state == ConnectionState::Connecting;
356
357            match entry.pool {
358                PeerPool::Follows => {
359                    if is_active {
360                        follows_active += 1;
361                    }
362                    if entry.state == ConnectionState::Connected {
363                        follows_connected += 1;
364                    }
365                }
366                PeerPool::Other => {
367                    if is_active {
368                        other_active += 1;
369                    }
370                    if entry.state == ConnectionState::Connected {
371                        other_connected += 1;
372                    }
373                }
374            }
375        }
376
377        (follows_connected, follows_active, other_connected, other_active)
378    }
379
380    /// Check if we can accept a peer in a given pool
381    fn can_accept_peer(&self, pool: PeerPool, pool_counts: &(usize, usize, usize, usize)) -> bool {
382        let (_, follows_active, _, other_active) = *pool_counts;
383        match pool {
384            PeerPool::Follows => follows_active < self.config.pools.follows.max_connections,
385            PeerPool::Other => other_active < self.config.pools.other.max_connections,
386        }
387    }
388
389    /// Check if a pool is satisfied
390    #[allow(dead_code)]
391    fn is_pool_satisfied(&self, pool: PeerPool, pool_counts: &(usize, usize, usize, usize)) -> bool {
392        let (follows_connected, _, other_connected, _) = *pool_counts;
393        match pool {
394            PeerPool::Follows => follows_connected >= self.config.pools.follows.satisfied_connections,
395            PeerPool::Other => other_connected >= self.config.pools.other.satisfied_connections,
396        }
397    }
398
399    /// Check if both pools are satisfied
400    #[allow(dead_code)]
401    fn is_satisfied(&self, pool_counts: &(usize, usize, usize, usize)) -> bool {
402        self.is_pool_satisfied(PeerPool::Follows, pool_counts)
403            && self.is_pool_satisfied(PeerPool::Other, pool_counts)
404    }
405
406    /// Check if we should initiate connection (tie-breaking)
407    /// Lower UUID initiates - same as iris-client/hashtree-ts
408    fn should_initiate(&self, their_uuid: &str) -> bool {
409        self.my_peer_id.uuid < their_uuid.to_string()
410    }
411
412    /// Start the WebRTC manager - connects to relays and handles signaling
413    pub async fn run(&mut self) -> Result<()> {
414        info!(
415            "Starting WebRTC manager with peer ID: {}",
416            self.my_peer_id.short()
417        );
418
419        let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr::Event)>(100);
420
421        // Take the signaling receiver
422        let mut signaling_rx = self.signaling_rx.take().expect("signaling_rx already taken");
423
424        // Take the state event receiver
425        let mut state_event_rx = self.state_event_rx.take().expect("state_event_rx already taken");
426
427        // Create a shared write channel for all relay tasks
428        let (relay_write_tx, _) = tokio::sync::broadcast::channel::<SignalingMessage>(100);
429
430        // Spawn relay connections
431        for relay_url in &self.config.relays {
432            let url = relay_url.clone();
433            let event_tx = event_tx.clone();
434            let shutdown_rx = self.shutdown_rx.clone();
435            let keys = self.keys.clone();
436            let my_peer_id = self.my_peer_id.clone();
437            let hello_interval = Duration::from_millis(self.config.hello_interval_ms);
438            let relay_write_rx = relay_write_tx.subscribe();
439
440            tokio::spawn(async move {
441                if let Err(e) = Self::relay_task(
442                    url.clone(),
443                    event_tx,
444                    shutdown_rx,
445                    keys,
446                    my_peer_id,
447                    hello_interval,
448                    relay_write_rx,
449                )
450                .await
451                {
452                    error!("Relay {} error: {}", url, e);
453                }
454            });
455        }
456
457        // Process incoming events and outgoing signaling messages
458        let mut shutdown_rx = self.shutdown_rx.clone();
459        // Cleanup interval - run every 30 seconds as a fallback (not for real-time sync)
460        let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
461        loop {
462            tokio::select! {
463                _ = shutdown_rx.changed() => {
464                    if *shutdown_rx.borrow() {
465                        info!("WebRTC manager shutting down");
466                        break;
467                    }
468                }
469                Some((relay, event)) = event_rx.recv() => {
470                    if let Err(e) = self.handle_event(&relay, &event, &relay_write_tx).await {
471                        debug!("Error handling event from {}: {}", relay, e);
472                    }
473                }
474                Some(msg) = signaling_rx.recv() => {
475                    // Forward signaling messages to relay broadcast
476                    let _ = relay_write_tx.send(msg);
477                }
478                Some(event) = state_event_rx.recv() => {
479                    // Handle peer state events (connected, failed, disconnected)
480                    self.handle_peer_state_event(event).await;
481                }
482                _ = cleanup_interval.tick() => {
483                    // Periodic cleanup of stale peers and state sync (fallback)
484                    self.cleanup_stale_peers().await;
485                }
486            }
487        }
488
489        Ok(())
490    }
491
492    /// Connect to a single relay and handle messages
493    async fn relay_task(
494        url: String,
495        event_tx: mpsc::Sender<(String, nostr::Event)>,
496        mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
497        keys: Keys,
498        my_peer_id: PeerId,
499        hello_interval: Duration,
500        mut signaling_rx: tokio::sync::broadcast::Receiver<SignalingMessage>,
501    ) -> Result<()> {
502        info!("Connecting to relay: {}", url);
503
504        let (ws_stream, _) = connect_async(&url).await?;
505        let (mut write, mut read) = ws_stream.split();
506
507        // Subscribe to webrtc events - two filters:
508        // 1. Hello messages: kind 25050 with #l: "hello" tag
509        // 2. Directed messages: kind 25050 with #p tag (our pubkey)
510        let hello_filter = Filter::new()
511            .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
512            .custom_tag(
513                nostr::SingleLetterTag::lowercase(nostr::Alphabet::L),
514                vec![HELLO_TAG],
515            )
516            .since(nostr::Timestamp::now() - Duration::from_secs(60));
517
518        let directed_filter = Filter::new()
519            .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
520            .custom_tag(
521                nostr::SingleLetterTag::lowercase(nostr::Alphabet::P),
522                vec![keys.public_key().to_hex()],
523            )
524            .since(nostr::Timestamp::now() - Duration::from_secs(60));
525
526        let sub_id = nostr::SubscriptionId::generate();
527        let sub_msg = ClientMessage::req(sub_id.clone(), vec![hello_filter, directed_filter]);
528        write.send(Message::Text(sub_msg.as_json().into())).await?;
529
530        info!("Subscribed to {} for WebRTC events (kind {})", url, WEBRTC_KIND);
531
532        let mut last_hello = Instant::now() - hello_interval; // Send immediately
533        let mut hello_ticker = tokio::time::interval(Duration::from_secs(1));
534
535        loop {
536            tokio::select! {
537                _ = shutdown_rx.changed() => {
538                    if *shutdown_rx.borrow() {
539                        break;
540                    }
541                }
542                _ = hello_ticker.tick() => {
543                    // Send hello periodically
544                    if last_hello.elapsed() >= hello_interval {
545                        let hello = SignalingMessage::hello(&my_peer_id.uuid);
546                        if let Ok(event) = Self::create_signaling_event(&keys, &hello).await {
547                            let msg = ClientMessage::event(event);
548                            if write.send(Message::Text(msg.as_json().into())).await.is_ok() {
549                                debug!("Sent hello to {}", url);
550                            }
551                        }
552                        last_hello = Instant::now();
553                    }
554                }
555                // Handle outgoing signaling messages
556                Ok(signaling_msg) = signaling_rx.recv() => {
557                    info!("Sending {} via {}", signaling_msg.msg_type(), url);
558                    if let Ok(event) = Self::create_signaling_event(&keys, &signaling_msg).await {
559                        let event_id = event.id.to_string();
560                        let msg = ClientMessage::event(event);
561                        if write.send(Message::Text(msg.as_json().into())).await.is_ok() {
562                            info!("Sent {} to {} (event id: {})", signaling_msg.msg_type(), url, &event_id[..16]);
563                        }
564                    }
565                }
566                msg = read.next() => {
567                    match msg {
568                        Some(Ok(Message::Text(text))) => {
569                            if let Ok(relay_msg) = RelayMessage::from_json(&text) {
570                                if let RelayMessage::Event { event, .. } = relay_msg {
571                                    let _ = event_tx.send((url.clone(), *event)).await;
572                                }
573                            }
574                        }
575                        Some(Err(e)) => {
576                            error!("WebSocket error from {}: {}", url, e);
577                            break;
578                        }
579                        None => {
580                            warn!("WebSocket closed: {}", url);
581                            break;
582                        }
583                        _ => {}
584                    }
585                }
586            }
587        }
588
589        Ok(())
590    }
591
592    /// Create a signaling event
593    ///
594    /// For directed messages (offer, answer, candidate, candidates), use NIP-17 style
595    /// gift wrapping with ephemeral keys for privacy.
596    /// Hello messages use kind 25050 with #l: "hello" tag and peerId.
597    async fn create_signaling_event(keys: &Keys, msg: &SignalingMessage) -> Result<nostr::Event> {
598        // Check if message has a recipient (needs gift wrapping)
599        if let Some(recipient_str) = msg.recipient() {
600            // Parse recipient to get their pubkey
601            if let Some(peer_id) = PeerId::from_string(recipient_str) {
602                let recipient_pubkey = PublicKey::from_hex(&peer_id.pubkey)?;
603
604                // Create seal with sender's actual pubkey (the "rumor")
605                let seal = serde_json::json!({
606                    "pubkey": keys.public_key().to_hex(),
607                    "kind": WEBRTC_KIND,
608                    "content": serde_json::to_string(msg)?,
609                    "tags": []
610                });
611
612                // Generate ephemeral keypair for the wrapper
613                let ephemeral_keys = Keys::generate();
614
615                // Encrypt the seal for the recipient using ephemeral key (NIP-44)
616                let encrypted_content = nip44::encrypt(
617                    ephemeral_keys.secret_key(),
618                    &recipient_pubkey,
619                    &seal.to_string(),
620                    nip44::Version::V2
621                )?;
622
623                // Create wrapper event with ephemeral key
624                let created_at = nostr::Timestamp::now();
625                let expiration = created_at + Duration::from_secs(5 * 60); // 5 minutes
626
627                let tags = vec![
628                    Tag::parse(&["p", &recipient_pubkey.to_hex()])?,
629                    Tag::parse(&["expiration", &expiration.as_u64().to_string()])?,
630                ];
631
632                let event = EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), encrypted_content, tags)
633                    .to_event(&ephemeral_keys)?;
634
635                return Ok(event);
636            }
637        }
638
639        // Hello messages - kind 25050 with #l: "hello" tag and peerId
640        let tags = vec![
641            Tag::parse(&["l", HELLO_TAG])?,
642            Tag::parse(&["peerId", msg.peer_id()])?,
643        ];
644
645        let event = EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), "", tags)
646            .to_event(keys)?;
647
648        Ok(event)
649    }
650
651    /// Handle an incoming event
652    ///
653    /// Messages may be:
654    /// 1. Hello messages: kind 25050 with #l: "hello" tag and peerId
655    /// 2. Gift-wrapped directed messages: kind 25050 with #p tag, encrypted with ephemeral key
656    async fn handle_event(
657        &self,
658        relay: &str,
659        event: &nostr::Event,
660        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
661    ) -> Result<()> {
662        // Must be kind 25050
663        if event.kind != Kind::Ephemeral(WEBRTC_KIND as u16) {
664            return Ok(());
665        }
666
667        // Helper to get tag value
668        let get_tag = |name: &str| -> Option<String> {
669            event.tags.iter().find_map(|tag| {
670                let v: Vec<String> = tag.clone().to_vec();
671                if v.len() >= 2 && v[0] == name {
672                    Some(v[1].clone())
673                } else {
674                    None
675                }
676            })
677        };
678
679        // Check if this is a hello message (#l: "hello" tag)
680        let l_tag = get_tag("l");
681        if l_tag.as_deref() == Some(HELLO_TAG) {
682            let sender_pubkey = event.pubkey.to_hex();
683
684            // Skip our own hello messages
685            if sender_pubkey == self.my_peer_id.pubkey {
686                return Ok(());
687            }
688
689            if let Some(their_uuid) = get_tag("peerId") {
690                debug!("Received hello from {} via {}", &sender_pubkey[..8], relay);
691                self.handle_hello(&sender_pubkey, &their_uuid, relay_write_tx)
692                    .await?;
693            }
694            return Ok(());
695        }
696
697        // Check if this is a directed message for us (#p tag with our pubkey)
698        let p_tag = get_tag("p");
699        if p_tag.as_deref() != Some(&self.keys.public_key().to_hex()) {
700            // Not for us - ignore silently
701            return Ok(());
702        }
703
704        // Gift-wrapped directed message - decrypt using our key and ephemeral sender's pubkey
705        if event.content.is_empty() {
706            return Ok(());
707        }
708
709        // Try to unwrap the gift - decrypt with our key and the ephemeral sender's pubkey
710        let seal: serde_json::Value = match nip44::decrypt(self.keys.secret_key(), &event.pubkey, &event.content) {
711            Ok(plaintext) => {
712                match serde_json::from_str(&plaintext) {
713                    Ok(v) => v,
714                    Err(_) => return Ok(()),
715                }
716            }
717            Err(_) => {
718                // Can't decrypt - not for us or invalid
719                return Ok(());
720            }
721        };
722
723        // Extract the actual sender's pubkey and content from the seal
724        let sender_pubkey = seal.get("pubkey")
725            .and_then(|v| v.as_str())
726            .ok_or_else(|| anyhow::anyhow!("Missing pubkey in seal"))?;
727
728        // Skip our own messages
729        if sender_pubkey == self.my_peer_id.pubkey {
730            return Ok(());
731        }
732
733        let content = seal.get("content")
734            .and_then(|v| v.as_str())
735            .ok_or_else(|| anyhow::anyhow!("Missing content in seal"))?;
736
737        let raw_msg: serde_json::Value = serde_json::from_str(content)?;
738        let msg_type = raw_msg.get("type").and_then(|v| v.as_str()).unwrap_or("");
739
740        // Support hashtree-ts format: { type, peerId, targetPeerId, sdp/candidate/candidates }
741        if raw_msg.get("targetPeerId").is_some() {
742            let target_peer = raw_msg
743                .get("targetPeerId")
744                .and_then(|v| v.as_str())
745                .unwrap_or("");
746            if target_peer != self.my_peer_id.to_string() {
747                return Ok(());
748            }
749
750            let peer_id = raw_msg.get("peerId").and_then(|v| v.as_str()).unwrap_or("");
751            let their_uuid = peer_id.split(':').nth(1).unwrap_or(peer_id);
752
753            match msg_type {
754                "offer" => {
755                    let sdp = raw_msg.get("sdp").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
756                    let offer = serde_json::json!({ "type": "offer", "sdp": sdp });
757                    self.handle_offer(&sender_pubkey, their_uuid, offer, relay_write_tx).await?;
758                }
759                "answer" => {
760                    let sdp = raw_msg.get("sdp").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
761                    let answer = serde_json::json!({ "type": "answer", "sdp": sdp });
762                    self.handle_answer(&sender_pubkey, their_uuid, answer).await?;
763                }
764                "candidate" => {
765                    let candidate = raw_msg.get("candidate").and_then(|v| v.as_str()).unwrap_or("");
766                    if !candidate.is_empty() {
767                        let candidate_json = serde_json::json!({
768                            "candidate": candidate,
769                            "sdpMid": raw_msg.get("sdpMid"),
770                            "sdpMLineIndex": raw_msg.get("sdpMLineIndex"),
771                        });
772                        self.handle_candidate(&sender_pubkey, their_uuid, candidate_json).await?;
773                    }
774                }
775                "candidates" => {
776                    let candidates = raw_msg
777                        .get("candidates")
778                        .and_then(|v| v.as_array())
779                        .map(|entries| {
780                            entries.iter().filter_map(|entry| {
781                                if let Some(candidate_str) = entry.get("candidate").and_then(|v| v.as_str()).or_else(|| entry.as_str()) {
782                                    Some(serde_json::json!({
783                                        "candidate": candidate_str,
784                                        "sdpMid": entry.get("sdpMid"),
785                                        "sdpMLineIndex": entry.get("sdpMLineIndex"),
786                                    }))
787                                } else {
788                                    None
789                                }
790                            }).collect::<Vec<_>>()
791                        })
792                        .unwrap_or_default();
793                    self.handle_candidates(&sender_pubkey, their_uuid, candidates).await?;
794                }
795                _ => {}
796            }
797
798            return Ok(());
799        }
800
801        let msg: SignalingMessage = serde_json::from_value(raw_msg)?;
802
803        debug!(
804            "Received {} from {} via {} (gift-wrapped)",
805            msg.msg_type(),
806            &sender_pubkey[..8],
807            relay
808        );
809
810        match msg {
811            SignalingMessage::Hello { .. } => {
812                // Hello messages should come via tags, not gift wrap
813                return Ok(());
814            }
815            SignalingMessage::Offer {
816                recipient,
817                peer_id: their_uuid,
818                offer,
819            } => {
820                if recipient != self.my_peer_id.to_string() {
821                    return Ok(()); // Not for us
822                }
823                if let Err(e) = self.handle_offer(&sender_pubkey, &their_uuid, offer, relay_write_tx)
824                    .await {
825                    error!("handle_offer FAILED: sender={}, uuid={}, error={:?}", &sender_pubkey[..8.min(sender_pubkey.len())], their_uuid, e);
826                    return Err(e);
827                }
828            }
829            SignalingMessage::Answer {
830                recipient,
831                peer_id: their_uuid,
832                answer,
833            } => {
834                if recipient != self.my_peer_id.to_string() {
835                    return Ok(());
836                }
837                self.handle_answer(&sender_pubkey, &their_uuid, answer)
838                    .await?;
839            }
840            SignalingMessage::Candidate {
841                recipient,
842                peer_id: their_uuid,
843                candidate,
844            } => {
845                if recipient != self.my_peer_id.to_string() {
846                    return Ok(());
847                }
848                self.handle_candidate(&sender_pubkey, &their_uuid, candidate)
849                    .await?;
850            }
851            SignalingMessage::Candidates {
852                recipient,
853                peer_id: their_uuid,
854                candidates,
855            } => {
856                if recipient != self.my_peer_id.to_string() {
857                    return Ok(());
858                }
859                self.handle_candidates(&sender_pubkey, &their_uuid, candidates)
860                    .await?;
861            }
862        }
863
864        Ok(())
865    }
866
867    /// Handle incoming hello message
868    async fn handle_hello(
869        &self,
870        sender_pubkey: &str,
871        their_uuid: &str,
872        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
873    ) -> Result<()> {
874        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
875        let peer_key = full_peer_id.to_string();
876
877        // Check if we already have this peer
878        {
879            let peers = self.state.peers.read().await;
880            if let Some(entry) = peers.get(&peer_key) {
881                // Already connected or connecting, just update last_seen
882                if entry.state == ConnectionState::Connected
883                    || entry.state == ConnectionState::Connecting
884                {
885                    return Ok(());
886                }
887            }
888        }
889
890        // Classify the peer into a pool
891        let pool = (self.peer_classifier)(sender_pubkey);
892
893        // Check pool limits
894        let pool_counts = self.get_pool_counts().await;
895        if !self.can_accept_peer(pool, &pool_counts) {
896            debug!("Ignoring hello from {} - pool {:?} is full", full_peer_id.short(), pool);
897            return Ok(());
898        }
899
900        // Decide if we should initiate based on tie-breaking
901        let should_initiate = self.should_initiate(their_uuid);
902
903        // If pool is already satisfied, don't initiate new outbound connections
904        // This reserves space for inbound connections
905        let pool_satisfied = self.is_pool_satisfied(pool, &pool_counts);
906        let will_initiate = should_initiate && !pool_satisfied;
907
908        info!(
909            "Discovered peer: {} (pool: {:?}, initiate: {}, pool_satisfied: {})",
910            full_peer_id.short(),
911            pool,
912            will_initiate,
913            pool_satisfied
914        );
915
916        // If we're not initiating and pool is satisfied, don't even add to discovered
917        // (we won't accept their offer either since pool check happens in handle_offer)
918        if !will_initiate && pool_satisfied {
919            debug!("Pool {:?} is satisfied, not tracking peer {}", pool, full_peer_id.short());
920            return Ok(());
921        }
922
923        // Create peer entry with pool assignment
924        {
925            let mut peers = self.state.peers.write().await;
926            peers.insert(
927                peer_key.clone(),
928                PeerEntry {
929                    peer_id: full_peer_id.clone(),
930                    direction: if will_initiate {
931                        PeerDirection::Outbound
932                    } else {
933                        PeerDirection::Inbound
934                    },
935                    state: ConnectionState::Discovered,
936                    last_seen: Instant::now(),
937                    peer: None,
938                    pool,
939                    bytes_sent: 0,
940                    bytes_received: 0,
941                },
942            );
943        }
944
945        // If we should initiate, create offer
946        if will_initiate {
947            self.initiate_connection(&full_peer_id, pool, relay_write_tx)
948                .await?;
949        }
950
951        Ok(())
952    }
953
954    /// Initiate a connection to a peer (create and send offer)
955    async fn initiate_connection(
956        &self,
957        peer_id: &PeerId,
958        pool: PeerPool,
959        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
960    ) -> Result<()> {
961        let peer_key = peer_id.to_string();
962
963        info!("Initiating connection to {} (pool: {:?})", peer_id.short(), pool);
964
965        // Create peer connection with content store and state events
966        let mut peer = Peer::new_with_store_and_events(
967            peer_id.clone(),
968            PeerDirection::Outbound,
969            self.my_peer_id.clone(),
970            self.signaling_tx.clone(),
971            self.config.stun_servers.clone(),
972            self.store.clone(),
973            Some(self.state_event_tx.clone()),
974            self.nostr_relay.clone(),
975        )
976        .await?;
977
978        peer.setup_handlers().await?;
979
980        // Create offer
981        let offer = peer.connect().await?;
982
983        // Update state
984        {
985            let mut peers = self.state.peers.write().await;
986            if let Some(entry) = peers.get_mut(&peer_key) {
987                entry.state = ConnectionState::Connecting;
988                entry.peer = Some(peer);
989                entry.pool = pool;
990            }
991        }
992
993        // Send offer
994        let offer_msg = SignalingMessage::Offer {
995            offer,
996            recipient: peer_id.to_string(),
997            peer_id: self.my_peer_id.uuid.clone(),
998        };
999        if relay_write_tx.send(offer_msg).is_err() {
1000            warn!("Failed to broadcast offer to {}", peer_id.short());
1001        }
1002
1003        info!("Sent offer to {}", peer_id.short());
1004
1005        Ok(())
1006    }
1007
1008    /// Handle incoming offer
1009    async fn handle_offer(
1010        &self,
1011        sender_pubkey: &str,
1012        their_uuid: &str,
1013        offer: serde_json::Value,
1014        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1015    ) -> Result<()> {
1016        debug!("handle_offer ENTRY: sender={}, uuid={}", &sender_pubkey[..8.min(sender_pubkey.len())], their_uuid);
1017        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1018        let peer_key = full_peer_id.to_string();
1019
1020        // Classify the peer into a pool
1021        let pool = (self.peer_classifier)(sender_pubkey);
1022
1023        info!("Received offer from {} (pool: {:?})", full_peer_id.short(), pool);
1024
1025        // Check if we already have this peer with an actual connection
1026        {
1027            let peers = self.state.peers.read().await;
1028            debug!("Checking for existing peer, peer_key: {}, known_peers: {}", peer_key, peers.len());
1029            if let Some(entry) = peers.get(&peer_key) {
1030                // Only skip if we have an actual peer connection (not just discovered)
1031                if entry.peer.is_some() {
1032                    debug!("Already have peer {} with connection, skipping offer", full_peer_id.short());
1033                    return Ok(());
1034                }
1035                debug!("Peer {} exists but has no connection, proceeding", full_peer_id.short());
1036            } else {
1037                debug!("Peer {} not found in peers map, will create new entry", full_peer_id.short());
1038            }
1039        }
1040
1041        // Check pool limits
1042        let pool_counts = self.get_pool_counts().await;
1043        debug!("Pool counts: {:?}, checking can_accept_peer for {:?}", pool_counts, pool);
1044        if !self.can_accept_peer(pool, &pool_counts) {
1045            warn!("Rejecting offer from {} - pool {:?} is full", full_peer_id.short(), pool);
1046            return Ok(());
1047        }
1048        debug!("Pool check passed for {}", full_peer_id.short());
1049
1050        // Create peer connection with content store and state events
1051        debug!("Creating peer connection for {}", full_peer_id.short());
1052        let mut peer = Peer::new_with_store_and_events(
1053            full_peer_id.clone(),
1054            PeerDirection::Inbound,
1055            self.my_peer_id.clone(),
1056            self.signaling_tx.clone(),
1057            self.config.stun_servers.clone(),
1058            self.store.clone(),
1059            Some(self.state_event_tx.clone()),
1060            self.nostr_relay.clone(),
1061        )
1062        .await?;
1063        debug!("Peer connection created for {}", full_peer_id.short());
1064
1065        peer.setup_handlers().await?;
1066        debug!("Handlers set up for {}", full_peer_id.short());
1067
1068        // Handle offer and create answer
1069        let answer = peer.handle_offer(offer).await?;
1070        debug!("Answer created for {}", full_peer_id.short());
1071
1072        // Update state
1073        {
1074            let mut peers = self.state.peers.write().await;
1075            peers.insert(
1076                peer_key,
1077                PeerEntry {
1078                    peer_id: full_peer_id.clone(),
1079                    direction: PeerDirection::Inbound,
1080                    state: ConnectionState::Connecting,
1081                    last_seen: Instant::now(),
1082                    peer: Some(peer),
1083                    pool,
1084                    bytes_sent: 0,
1085                    bytes_received: 0,
1086                },
1087            );
1088        }
1089
1090        // Send answer
1091        // Note: peer_id is just the UUID, not full pubkey:uuid
1092        // The recipient will construct full peer_id from sender pubkey + this UUID
1093        let answer_msg = SignalingMessage::Answer {
1094            answer,
1095            recipient: full_peer_id.to_string(),
1096            peer_id: self.my_peer_id.uuid.clone(),
1097        };
1098        if relay_write_tx.send(answer_msg).is_err() {
1099            warn!("Failed to send answer to {}", full_peer_id.short());
1100        }
1101        info!("Sent answer to {}", full_peer_id.short());
1102
1103        Ok(())
1104    }
1105
1106    /// Handle incoming answer
1107    async fn handle_answer(
1108        &self,
1109        sender_pubkey: &str,
1110        their_uuid: &str,
1111        answer: serde_json::Value,
1112    ) -> Result<()> {
1113        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1114        let peer_key = full_peer_id.to_string();
1115
1116        info!("Received answer from {}", full_peer_id.short());
1117
1118        let mut peers = self.state.peers.write().await;
1119        if let Some(entry) = peers.get_mut(&peer_key) {
1120            // Skip if already connected - duplicate answers from multiple relays
1121            if entry.state == ConnectionState::Connected {
1122                debug!("Ignoring duplicate answer from {} - already connected", full_peer_id.short());
1123                return Ok(());
1124            }
1125            if let Some(ref mut peer) = entry.peer {
1126                // Check WebRTC signaling state before applying answer
1127                use webrtc::peer_connection::signaling_state::RTCSignalingState;
1128                let signaling_state = peer.signaling_state();
1129                if signaling_state != RTCSignalingState::HaveLocalOffer {
1130                    debug!(
1131                        "Ignoring answer from {} - signaling state is {:?}, not HaveLocalOffer",
1132                        full_peer_id.short(),
1133                        signaling_state
1134                    );
1135                    return Ok(());
1136                }
1137                peer.handle_answer(answer).await?;
1138                info!("Applied answer from {}", full_peer_id.short());
1139            } else {
1140                debug!("Peer {} has no connection object", full_peer_id.short());
1141            }
1142        } else {
1143            debug!("No peer found for key: {}", peer_key);
1144        }
1145
1146        Ok(())
1147    }
1148
1149    /// Handle incoming ICE candidate
1150    async fn handle_candidate(
1151        &self,
1152        sender_pubkey: &str,
1153        their_uuid: &str,
1154        candidate: serde_json::Value,
1155    ) -> Result<()> {
1156        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1157        let peer_key = full_peer_id.to_string();
1158
1159        info!("Received ICE candidate from {}", full_peer_id.short());
1160
1161        let mut peers = self.state.peers.write().await;
1162        if let Some(entry) = peers.get_mut(&peer_key) {
1163            if let Some(ref mut peer) = entry.peer {
1164                peer.handle_candidate(candidate).await?;
1165            }
1166        }
1167
1168        Ok(())
1169    }
1170
1171    /// Handle batched ICE candidates
1172    async fn handle_candidates(
1173        &self,
1174        sender_pubkey: &str,
1175        their_uuid: &str,
1176        candidates: Vec<serde_json::Value>,
1177    ) -> Result<()> {
1178        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1179        let peer_key = full_peer_id.to_string();
1180
1181        debug!(
1182            "Received {} candidates from {}",
1183            candidates.len(),
1184            full_peer_id.short()
1185        );
1186
1187        let mut peers = self.state.peers.write().await;
1188        if let Some(entry) = peers.get_mut(&peer_key) {
1189            if let Some(ref mut peer) = entry.peer {
1190                for candidate in candidates {
1191                    if let Err(e) = peer.handle_candidate(candidate).await {
1192                        debug!("Failed to add candidate: {}", e);
1193                    }
1194                }
1195            }
1196        }
1197
1198        Ok(())
1199    }
1200
1201    /// Handle peer state change events from peer connections
1202    async fn handle_peer_state_event(&self, event: PeerStateEvent) {
1203        match event {
1204            PeerStateEvent::Connected(peer_id) => {
1205                let peer_key = peer_id.to_string();
1206                let mut peers = self.state.peers.write().await;
1207                if let Some(entry) = peers.get_mut(&peer_key) {
1208                    if entry.state != ConnectionState::Connected {
1209                        info!("Peer {} connected (via state event)", peer_id.short());
1210                        entry.state = ConnectionState::Connected;
1211                        // Update connected count
1212                        self.state.connected_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1213                    }
1214                }
1215            }
1216            PeerStateEvent::Failed(peer_id) => {
1217                let peer_key = peer_id.to_string();
1218                info!("Peer {} connection failed - removing from pool", peer_id.short());
1219                let mut peers = self.state.peers.write().await;
1220                if let Some(entry) = peers.remove(&peer_key) {
1221                    // Decrement connected count if was connected
1222                    if entry.state == ConnectionState::Connected {
1223                        self.state.connected_count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
1224                    }
1225                    // Close the peer connection if it exists
1226                    if let Some(peer) = entry.peer {
1227                        let _ = peer.close().await;
1228                    }
1229                }
1230            }
1231            PeerStateEvent::Disconnected(peer_id) => {
1232                let peer_key = peer_id.to_string();
1233                info!("Peer {} disconnected - removing from pool", peer_id.short());
1234                let mut peers = self.state.peers.write().await;
1235                if let Some(entry) = peers.remove(&peer_key) {
1236                    // Decrement connected count if was connected
1237                    if entry.state == ConnectionState::Connected {
1238                        self.state.connected_count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
1239                    }
1240                    // Close the peer connection if it exists
1241                    if let Some(peer) = entry.peer {
1242                        let _ = peer.close().await;
1243                    }
1244                }
1245            }
1246        }
1247    }
1248
1249    /// Cleanup stale peers and sync connection states (fallback, runs every 30s)
1250    async fn cleanup_stale_peers(&self) {
1251        let mut peers = self.state.peers.write().await;
1252        let mut connected_count = 0;
1253        let mut to_remove = Vec::new();
1254        let stale_timeout = Duration::from_secs(60); // Remove peers stuck in Discovered/Connecting for 60s
1255
1256        for (key, entry) in peers.iter_mut() {
1257            if let Some(ref peer) = entry.peer {
1258                // Sync connected state as fallback (in case event was missed)
1259                if peer.is_connected() {
1260                    if entry.state != ConnectionState::Connected {
1261                        info!("Peer {} is now connected (sync fallback)", entry.peer_id.short());
1262                        entry.state = ConnectionState::Connected;
1263                    }
1264                    connected_count += 1;
1265                } else if entry.state == ConnectionState::Connecting && entry.last_seen.elapsed() > stale_timeout {
1266                    // Peer stuck in Connecting for too long - mark for removal
1267                    info!("Removing stale peer {} (stuck in Connecting for {:?})", entry.peer_id.short(), entry.last_seen.elapsed());
1268                    to_remove.push(key.clone());
1269                }
1270            } else if entry.state == ConnectionState::Discovered && entry.last_seen.elapsed() > stale_timeout {
1271                // Discovered peer with no actual connection - remove
1272                debug!("Removing stale discovered peer {}", entry.peer_id.short());
1273                to_remove.push(key.clone());
1274            }
1275        }
1276
1277        // Remove stale peers
1278        for key in to_remove {
1279            if let Some(entry) = peers.remove(&key) {
1280                if let Some(peer) = entry.peer {
1281                    let _ = peer.close().await;
1282                }
1283            }
1284        }
1285
1286        self.state
1287            .connected_count
1288            .store(connected_count, std::sync::atomic::Ordering::Relaxed);
1289    }
1290}
1291
1292// Keep the old PeerState for backward compatibility with tests
1293#[allow(dead_code)]
1294#[derive(Debug, Clone)]
1295pub struct PeerState {
1296    pub peer_id: PeerId,
1297    pub direction: PeerDirection,
1298    pub state: String,
1299    pub last_seen: Instant,
1300}