Skip to main content

hashtree_cli/webrtc/
signaling.rs

1//! WebRTC signaling over Nostr relays
2//!
3//! Protocol (compatible with hashtree-ts):
4//! - All signaling uses ephemeral kind 25050
5//! - Hello messages: #l: "hello" tag, broadcast for peer discovery (unencrypted)
6//! - Directed signaling (offer, answer, candidate, candidates): NIP-17 style
7//!   gift wrap for privacy - wrapped with ephemeral key, #p tag with recipient
8//!
9//! Security: Directed messages use gift wrapping with ephemeral keys so that
10//! relays cannot see the actual sender or correlate messages.
11
12use anyhow::Result;
13use futures::{SinkExt, StreamExt};
14use hashtree_webrtc::{
15    build_hedged_wave_plan, normalize_dispatch_config, sync_selector_peers, PeerSelector,
16};
17use nostr::{
18    nips::nip44, Alphabet, ClientMessage, EventBuilder, Filter, JsonUtil, Keys, Kind, PublicKey,
19    RelayMessage, SingleLetterTag, Tag,
20};
21use std::collections::HashMap;
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24use tokio::sync::{mpsc, Mutex, RwLock};
25use tokio_tungstenite::{connect_async, tungstenite::Message};
26use tracing::{debug, error, info, warn};
27
28use super::cashu::{CashuMintMetadataStore, CashuQuoteState, CashuRoutingConfig, NegotiatedQuote};
29use super::peer::{ContentStore, Peer, PendingRequest};
30use super::types::{
31    decrement_htl_with_policy, encode_quote_request, encode_request, should_forward_htl,
32    validate_mesh_frame, DataQuoteRequest, DataRequest, MeshNostrFrame, MeshNostrPayload,
33    PeerDirection, PeerId, PeerPool, PeerStateEvent, PeerStatus, RequestDispatchConfig,
34    SignalingMessage, TimedSeenSet, WebRTCConfig, HELLO_TAG, MESH_DEFAULT_HTL, MESH_EVENT_POLICY,
35    WEBRTC_KIND,
36};
37use crate::cashu_helper::CashuPaymentClient;
38use crate::nostr_relay::NostrRelay;
39
40/// Callback type for classifying peers into pools
41pub type PeerClassifier = Arc<dyn Fn(&str) -> PeerPool + Send + Sync>;
42
43/// Connection state for a peer
44#[derive(Debug, Clone, PartialEq)]
45pub enum ConnectionState {
46    Discovered,
47    Connecting,
48    Connected,
49    Failed,
50}
51
52impl std::fmt::Display for ConnectionState {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        match self {
55            ConnectionState::Discovered => write!(f, "discovered"),
56            ConnectionState::Connecting => write!(f, "connecting"),
57            ConnectionState::Connected => write!(f, "connected"),
58            ConnectionState::Failed => write!(f, "failed"),
59        }
60    }
61}
62
63/// Peer entry in the manager
64pub struct PeerEntry {
65    pub peer_id: PeerId,
66    pub direction: PeerDirection,
67    pub state: ConnectionState,
68    pub last_seen: Instant,
69    pub peer: Option<Peer>,
70    pub pool: PeerPool,
71    pub bytes_sent: u64,
72    pub bytes_received: u64,
73}
74
75/// Shared state for WebRTC manager
76pub struct WebRTCState {
77    pub peers: RwLock<HashMap<String, PeerEntry>>,
78    pub connected_count: std::sync::atomic::AtomicUsize,
79    /// Total bytes sent across all peers (cumulative)
80    pub bytes_sent: std::sync::atomic::AtomicU64,
81    /// Total bytes received across all peers (cumulative)
82    pub bytes_received: std::sync::atomic::AtomicU64,
83    /// Relayless mesh frames received and accepted.
84    pub mesh_received: std::sync::atomic::AtomicU64,
85    /// Relayless mesh frames forwarded to peers.
86    pub mesh_forwarded: std::sync::atomic::AtomicU64,
87    /// Relayless mesh frames/events dropped due to dedupe.
88    pub mesh_dropped_duplicate: std::sync::atomic::AtomicU64,
89    /// Shared peer selector used by live retrieval; aligned with simulation strategies.
90    peer_selector: Arc<RwLock<PeerSelector>>,
91    /// Hedged dispatch policy for retrieval requests.
92    request_dispatch: RequestDispatchConfig,
93    /// Retrieval timeout for quote negotiation and single-peer fetches.
94    request_timeout: Duration,
95    /// Shared Cashu quote negotiation policy/state.
96    cashu_quotes: Arc<CashuQuoteState>,
97}
98
99#[derive(Debug, Clone)]
100pub struct PeerRootEvent {
101    pub hash: String,
102    pub key: Option<String>,
103    pub encrypted_key: Option<String>,
104    pub self_encrypted_key: Option<String>,
105    pub event_id: String,
106    pub created_at: u64,
107    pub peer_id: String,
108}
109
110const HASHTREE_KIND: u16 = 30078;
111const HASHTREE_LABEL: &str = "hashtree";
112const SEEN_FRAME_CAP: usize = 4096;
113const SEEN_FRAME_TTL: Duration = Duration::from_secs(120);
114const SEEN_EVENT_CAP: usize = 8192;
115const SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
116
117type PendingRequestsMap = Arc<Mutex<HashMap<String, PendingRequest>>>;
118type ConnectedPeer = (
119    String,
120    PendingRequestsMap,
121    Arc<webrtc::data_channel::RTCDataChannel>,
122);
123
124fn hashtree_event_identifier(event: &nostr::Event) -> Option<String> {
125    event.tags.iter().find_map(|tag| {
126        let slice = tag.as_slice();
127        if slice.len() >= 2 && slice[0].as_str() == "d" {
128            Some(slice[1].to_string())
129        } else {
130            None
131        }
132    })
133}
134
135fn is_hashtree_labeled_event(event: &nostr::Event) -> bool {
136    event.tags.iter().any(|tag| {
137        let slice = tag.as_slice();
138        slice.len() >= 2 && slice[0].as_str() == "l" && slice[1].as_str() == HASHTREE_LABEL
139    })
140}
141
142fn pick_latest_event<'a, I>(events: I) -> Option<&'a nostr::Event>
143where
144    I: IntoIterator<Item = &'a nostr::Event>,
145{
146    events.into_iter().max_by(|a, b| {
147        let ordering = a.created_at.cmp(&b.created_at);
148        if ordering == std::cmp::Ordering::Equal {
149            a.id.cmp(&b.id)
150        } else {
151            ordering
152        }
153    })
154}
155
156fn root_event_from_peer(
157    event: &nostr::Event,
158    peer_id: &str,
159    tree_name: &str,
160) -> Option<PeerRootEvent> {
161    if hashtree_event_identifier(event).as_deref() != Some(tree_name)
162        || !is_hashtree_labeled_event(event)
163    {
164        return None;
165    }
166
167    let mut key = None;
168    let mut encrypted_key = None;
169    let mut self_encrypted_key = None;
170    let mut hash_tag = None;
171
172    for tag in &event.tags {
173        let slice = tag.as_slice();
174        if slice.len() < 2 {
175            continue;
176        }
177        match slice[0].as_str() {
178            "hash" => hash_tag = Some(slice[1].to_string()),
179            "key" => key = Some(slice[1].to_string()),
180            "encryptedKey" => encrypted_key = Some(slice[1].to_string()),
181            "selfEncryptedKey" => self_encrypted_key = Some(slice[1].to_string()),
182            _ => {}
183        }
184    }
185
186    let hash = hash_tag.or_else(|| {
187        if event.content.is_empty() {
188            None
189        } else {
190            Some(event.content.clone())
191        }
192    })?;
193
194    Some(PeerRootEvent {
195        hash,
196        key,
197        encrypted_key,
198        self_encrypted_key,
199        event_id: event.id.to_hex(),
200        created_at: event.created_at.as_u64(),
201        peer_id: peer_id.to_string(),
202    })
203}
204
205impl WebRTCState {
206    pub fn new() -> Self {
207        let cfg = WebRTCConfig::default();
208        Self::new_with_routing_and_cashu(
209            cfg.request_selection_strategy,
210            cfg.request_fairness_enabled,
211            cfg.request_dispatch,
212            Duration::from_millis(cfg.message_timeout_ms),
213            CashuRoutingConfig::default(),
214            None,
215            None,
216        )
217    }
218
219    pub fn new_with_routing(
220        selection_strategy: super::types::SelectionStrategy,
221        fairness_enabled: bool,
222        request_dispatch: RequestDispatchConfig,
223    ) -> Self {
224        let cfg = WebRTCConfig::default();
225        Self::new_with_routing_and_cashu(
226            selection_strategy,
227            fairness_enabled,
228            request_dispatch,
229            Duration::from_millis(cfg.message_timeout_ms),
230            CashuRoutingConfig::default(),
231            None,
232            None,
233        )
234    }
235
236    pub fn new_with_routing_and_cashu(
237        selection_strategy: super::types::SelectionStrategy,
238        fairness_enabled: bool,
239        request_dispatch: RequestDispatchConfig,
240        request_timeout: Duration,
241        cashu_routing: CashuRoutingConfig,
242        payment_client: Option<Arc<dyn CashuPaymentClient>>,
243        mint_metadata: Option<Arc<CashuMintMetadataStore>>,
244    ) -> Self {
245        let mut selector = PeerSelector::with_strategy(selection_strategy);
246        selector.set_fairness(fairness_enabled);
247        let peer_selector = Arc::new(RwLock::new(selector));
248        let cashu_quotes = Arc::new(if let Some(mint_metadata) = mint_metadata {
249            CashuQuoteState::new_with_mint_metadata(
250                cashu_routing,
251                peer_selector.clone(),
252                payment_client,
253                mint_metadata,
254            )
255        } else {
256            CashuQuoteState::new(cashu_routing, peer_selector.clone(), payment_client)
257        });
258        Self {
259            peers: RwLock::new(HashMap::new()),
260            connected_count: std::sync::atomic::AtomicUsize::new(0),
261            bytes_sent: std::sync::atomic::AtomicU64::new(0),
262            bytes_received: std::sync::atomic::AtomicU64::new(0),
263            mesh_received: std::sync::atomic::AtomicU64::new(0),
264            mesh_forwarded: std::sync::atomic::AtomicU64::new(0),
265            mesh_dropped_duplicate: std::sync::atomic::AtomicU64::new(0),
266            peer_selector,
267            request_dispatch,
268            request_timeout,
269            cashu_quotes,
270        }
271    }
272
273    /// Get current bandwidth stats (bytes sent/received)
274    pub fn get_bandwidth(&self) -> (u64, u64) {
275        (
276            self.bytes_sent.load(std::sync::atomic::Ordering::Relaxed),
277            self.bytes_received
278                .load(std::sync::atomic::Ordering::Relaxed),
279        )
280    }
281
282    pub fn get_mesh_stats(&self) -> (u64, u64, u64) {
283        (
284            self.mesh_received
285                .load(std::sync::atomic::Ordering::Relaxed),
286            self.mesh_forwarded
287                .load(std::sync::atomic::Ordering::Relaxed),
288            self.mesh_dropped_duplicate
289                .load(std::sync::atomic::Ordering::Relaxed),
290        )
291    }
292
293    pub fn record_mesh_received(&self) {
294        self.mesh_received
295            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
296    }
297
298    pub fn record_mesh_forwarded(&self, count: u64) {
299        self.mesh_forwarded
300            .fetch_add(count, std::sync::atomic::Ordering::Relaxed);
301    }
302
303    pub fn record_mesh_duplicate_drop(&self) {
304        self.mesh_dropped_duplicate
305            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
306    }
307
308    /// Record bytes sent (global + per-peer)
309    pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
310        self.bytes_sent
311            .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
312        if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
313            entry.bytes_sent += bytes;
314        }
315    }
316
317    /// Record bytes received (global + per-peer)
318    pub async fn record_received(&self, peer_id: &str, bytes: u64) {
319        self.bytes_received
320            .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
321        if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
322            entry.bytes_received += bytes;
323        }
324    }
325
326    /// Request content by hash from connected peers
327    /// Queries peers in adaptive selector order with hedged fanout waves.
328    /// Returns the first successful response, or None if no peer has it
329    pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
330        self.request_from_peers_with_source(hash_hex)
331            .await
332            .map(|(data, _peer_id)| data)
333    }
334
335    /// Request content by hash from connected peers, returning data and source peer.
336    pub async fn request_from_peers_with_source(
337        &self,
338        hash_hex: &str,
339    ) -> Option<(Vec<u8>, String)> {
340        use super::types::BLOB_REQUEST_POLICY;
341        use tokio::sync::oneshot::error::TryRecvError;
342
343        let peers = self.peers.read().await;
344
345        // Collect connected peers with data channels
346        // We need to collect the Arc references first, then acquire locks outside the iterator
347        let peer_refs: Vec<_> = peers
348            .values()
349            .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
350            .filter_map(|p| {
351                p.peer.as_ref().map(|peer| {
352                    (
353                        p.peer_id.to_string(),
354                        peer.data_channel.clone(),
355                        peer.pending_requests.clone(),
356                    )
357                })
358            })
359            .collect();
360
361        drop(peers); // Release the read lock
362
363        // Now acquire locks and filter to peers with active data channels
364        let mut connected_peers: Vec<ConnectedPeer> = Vec::new();
365        for (peer_id, dc_mutex, pending) in peer_refs {
366            let dc_guard = dc_mutex.lock().await;
367            if let Some(dc) = dc_guard.as_ref() {
368                connected_peers.push((peer_id, pending, dc.clone()));
369            }
370        }
371
372        if connected_peers.is_empty() {
373            debug!(
374                "No connected peers to query for {}",
375                &hash_hex[..8.min(hash_hex.len())]
376            );
377            return None;
378        }
379
380        // Convert hex to binary hash once
381        let hash_bytes = match hex::decode(hash_hex) {
382            Ok(b) => b,
383            Err(_) => return None,
384        };
385
386        let expected_hash: [u8; 32] = match hash_bytes.as_slice().try_into() {
387            Ok(h) => h,
388            Err(_) => {
389                debug!(
390                    "Invalid hash length {}, expected 32 bytes",
391                    hash_bytes.len()
392                );
393                return None;
394            }
395        };
396
397        let connected_peer_ids: Vec<String> = connected_peers
398            .iter()
399            .map(|(peer_id, _, _)| peer_id.clone())
400            .collect();
401        sync_selector_peers(self.peer_selector.as_ref(), &connected_peer_ids).await;
402
403        let ordered_peer_ids = self.peer_selector.write().await.select_peers();
404        let mut by_peer: HashMap<
405            String,
406            (
407                PendingRequestsMap,
408                Arc<webrtc::data_channel::RTCDataChannel>,
409            ),
410        > = connected_peers
411            .into_iter()
412            .map(|(peer_id, pending, dc)| (peer_id, (pending, dc)))
413            .collect();
414
415        let mut ordered_peers: Vec<ConnectedPeer> = Vec::new();
416        for peer_id in ordered_peer_ids {
417            if let Some((pending, dc)) = by_peer.remove(&peer_id) {
418                ordered_peers.push((peer_id, pending, dc));
419            }
420        }
421        for (peer_id, (pending, dc)) in by_peer {
422            ordered_peers.push((peer_id, pending, dc));
423        }
424
425        let dispatch = normalize_dispatch_config(self.request_dispatch, ordered_peers.len());
426        let wave_plan = build_hedged_wave_plan(ordered_peers.len(), dispatch);
427        if wave_plan.is_empty() {
428            return None;
429        }
430
431        debug!(
432            "Querying {} peers for {} (strategy order + hedged waves {:?})",
433            ordered_peers.len(),
434            &hash_hex[..8.min(hash_hex.len())],
435            wave_plan
436        );
437
438        if let Some((requested_mint, payment_sat, quote_ttl_ms)) =
439            self.cashu_quotes.requester_quote_terms().await
440        {
441            if let Some(quote) = self
442                .request_quote_from_peers(
443                    &hash_bytes,
444                    requested_mint,
445                    payment_sat,
446                    quote_ttl_ms,
447                    &ordered_peers,
448                )
449                .await
450            {
451                if let Some(data) = self
452                    .request_from_single_peer(
453                        hash_hex,
454                        &hash_bytes,
455                        expected_hash,
456                        &quote.peer_id,
457                        Some(&quote),
458                        &ordered_peers,
459                    )
460                    .await
461                {
462                    debug!(
463                        "Got quoted response from peer {} for {}",
464                        quote.peer_id,
465                        &hash_hex[..8.min(hash_hex.len())]
466                    );
467                    return Some((data, quote.peer_id));
468                }
469            }
470        }
471
472        let request = DataRequest {
473            h: hash_bytes.clone(),
474            htl: BLOB_REQUEST_POLICY.max_htl,
475            q: None,
476        };
477        let wire = match encode_request(&request) {
478            Ok(w) => w,
479            Err(_) => return None,
480        };
481        let wire_len = wire.len() as u64;
482        let wait_window = Duration::from_millis(dispatch.hedge_interval_ms.max(1));
483
484        let mut next_peer_idx = 0usize;
485        for wave_size in wave_plan {
486            let from = next_peer_idx;
487            let to = (next_peer_idx + wave_size).min(ordered_peers.len());
488            next_peer_idx = to;
489
490            #[allow(clippy::type_complexity)]
491            let mut outstanding: Vec<(
492                String,
493                Arc<Mutex<HashMap<String, PendingRequest>>>,
494                Instant,
495                tokio::sync::oneshot::Receiver<Option<Vec<u8>>>,
496            )> = Vec::new();
497
498            for (peer_id, pending_requests, dc) in &ordered_peers[from..to] {
499                let (tx, rx) = tokio::sync::oneshot::channel();
500                {
501                    let mut pending = pending_requests.lock().await;
502                    pending.insert(
503                        hash_hex.to_string(),
504                        PendingRequest::standard(hash_bytes.clone(), tx),
505                    );
506                }
507
508                if dc.send(&bytes::Bytes::copy_from_slice(&wire)).await.is_ok() {
509                    self.record_sent(peer_id, wire_len).await;
510                    self.peer_selector
511                        .write()
512                        .await
513                        .record_request(peer_id, wire_len);
514                    outstanding.push((
515                        peer_id.clone(),
516                        pending_requests.clone(),
517                        Instant::now(),
518                        rx,
519                    ));
520                } else {
521                    let mut pending = pending_requests.lock().await;
522                    pending.remove(hash_hex);
523                    self.peer_selector.write().await.record_failure(peer_id);
524                }
525            }
526
527            if outstanding.is_empty() {
528                continue;
529            }
530
531            let deadline = Instant::now() + wait_window;
532            let mut success: Option<(String, Vec<u8>, u64)> = None;
533            while !outstanding.is_empty() && Instant::now() < deadline {
534                let mut i = 0usize;
535                while i < outstanding.len() {
536                    let mut drop_entry = false;
537                    let (peer_id, pending_requests, sent_at, rx) = &mut outstanding[i];
538                    match rx.try_recv() {
539                        Ok(Some(data)) => {
540                            let rtt_ms = sent_at.elapsed().as_millis() as u64;
541                            if hashtree_core::sha256(&data) == expected_hash {
542                                success = Some((peer_id.clone(), data, rtt_ms));
543                                break;
544                            }
545                            self.peer_selector.write().await.record_failure(peer_id);
546                            let mut pending = pending_requests.lock().await;
547                            pending.remove(hash_hex);
548                            drop_entry = true;
549                        }
550                        Ok(None) => {
551                            let mut pending = pending_requests.lock().await;
552                            pending.remove(hash_hex);
553                            drop_entry = true;
554                        }
555                        Err(TryRecvError::Closed) => {
556                            let mut pending = pending_requests.lock().await;
557                            pending.remove(hash_hex);
558                            drop_entry = true;
559                        }
560                        Err(TryRecvError::Empty) => {}
561                    }
562
563                    if drop_entry {
564                        outstanding.swap_remove(i);
565                    } else {
566                        i += 1;
567                    }
568                }
569
570                if success.is_some() {
571                    break;
572                }
573
574                let now = Instant::now();
575                if now >= deadline {
576                    break;
577                }
578                tokio::time::sleep(Duration::from_millis(10).min(deadline - now)).await;
579            }
580
581            if let Some((peer_id, data, rtt_ms)) = success {
582                self.record_received(&peer_id, data.len() as u64).await;
583                self.peer_selector.write().await.record_success(
584                    &peer_id,
585                    rtt_ms,
586                    data.len() as u64,
587                );
588
589                for (other_peer_id, pending_requests, _, _) in outstanding {
590                    if other_peer_id != peer_id {
591                        let mut pending = pending_requests.lock().await;
592                        pending.remove(hash_hex);
593                    }
594                }
595
596                debug!(
597                    "Got response from peer {} for {}",
598                    peer_id,
599                    &hash_hex[..8.min(hash_hex.len())]
600                );
601                return Some((data, peer_id));
602            }
603
604            for (peer_id, pending_requests, _, _) in outstanding {
605                let mut pending = pending_requests.lock().await;
606                pending.remove(hash_hex);
607                self.peer_selector.write().await.record_timeout(&peer_id);
608            }
609        }
610
611        debug!(
612            "No peer had data for {}",
613            &hash_hex[..8.min(hash_hex.len())]
614        );
615        None
616    }
617
618    async fn request_quote_from_peers(
619        &self,
620        hash_bytes: &[u8],
621        requested_mint: String,
622        payment_sat: u64,
623        quote_ttl_ms: u32,
624        ordered_peers: &[ConnectedPeer],
625    ) -> Option<NegotiatedQuote> {
626        if ordered_peers.is_empty() || quote_ttl_ms == 0 {
627            return None;
628        }
629
630        let dispatch = normalize_dispatch_config(self.request_dispatch, ordered_peers.len());
631        let wave_plan = build_hedged_wave_plan(ordered_peers.len(), dispatch);
632        if wave_plan.is_empty() {
633            return None;
634        }
635
636        let hash_hex = hex::encode(hash_bytes);
637        let mut rx = self
638            .cashu_quotes
639            .register_pending_quote(hash_hex.clone(), Some(requested_mint.clone()), payment_sat)
640            .await;
641        let quote_request = DataQuoteRequest {
642            h: hash_bytes.to_vec(),
643            p: payment_sat,
644            t: quote_ttl_ms,
645            m: Some(requested_mint),
646        };
647        let wire = match encode_quote_request(&quote_request) {
648            Ok(wire) => wire,
649            Err(_) => {
650                self.cashu_quotes.clear_pending_quote(&hash_hex).await;
651                return None;
652            }
653        };
654        let deadline = Instant::now() + self.request_timeout;
655        let mut sent_total = 0usize;
656        let mut next_peer_idx = 0usize;
657
658        for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
659            let from = next_peer_idx;
660            let to = (next_peer_idx + wave_size).min(ordered_peers.len());
661            for (_, _, dc) in &ordered_peers[from..to] {
662                if dc.send(&bytes::Bytes::copy_from_slice(&wire)).await.is_ok() {
663                    sent_total += 1;
664                }
665            }
666            next_peer_idx = to;
667
668            if sent_total == 0 {
669                if next_peer_idx >= ordered_peers.len() {
670                    break;
671                }
672                continue;
673            }
674
675            let now = Instant::now();
676            if now >= deadline {
677                break;
678            }
679            let remaining = deadline.saturating_duration_since(now);
680            let is_last_wave =
681                wave_idx + 1 == wave_plan.len() || next_peer_idx >= ordered_peers.len();
682            let wait = if is_last_wave {
683                remaining
684            } else if dispatch.hedge_interval_ms == 0 {
685                Duration::ZERO
686            } else {
687                Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
688            };
689
690            if wait.is_zero() {
691                continue;
692            }
693
694            match tokio::time::timeout(wait, &mut rx).await {
695                Ok(Ok(Some(quote))) => {
696                    self.cashu_quotes.clear_pending_quote(&hash_hex).await;
697                    return Some(quote);
698                }
699                Ok(Ok(None)) | Ok(Err(_)) => break,
700                Err(_) => {}
701            }
702        }
703
704        self.cashu_quotes.clear_pending_quote(&hash_hex).await;
705        None
706    }
707
708    async fn request_from_single_peer(
709        &self,
710        hash_hex: &str,
711        hash_bytes: &[u8],
712        expected_hash: [u8; 32],
713        target_peer_id: &str,
714        quote: Option<&NegotiatedQuote>,
715        ordered_peers: &[ConnectedPeer],
716    ) -> Option<Vec<u8>> {
717        use super::types::BLOB_REQUEST_POLICY;
718
719        let (pending_requests, dc) = ordered_peers
720            .iter()
721            .find(|(peer_id, _, _)| peer_id == target_peer_id)
722            .map(|(_, pending_requests, dc)| (pending_requests.clone(), dc.clone()))?;
723
724        let request = DataRequest {
725            h: hash_bytes.to_vec(),
726            htl: BLOB_REQUEST_POLICY.max_htl,
727            q: quote.map(|quote| quote.quote_id),
728        };
729        let wire = encode_request(&request).ok()?;
730        let wire_len = wire.len() as u64;
731        let sent_at = Instant::now();
732        let (tx, mut rx) = tokio::sync::oneshot::channel();
733
734        {
735            let mut pending = pending_requests.lock().await;
736            pending.insert(
737                hash_hex.to_string(),
738                if let Some(quote) = quote {
739                    PendingRequest::quoted(
740                        hash_bytes.to_vec(),
741                        tx,
742                        quote.quote_id,
743                        quote.mint_url.clone().unwrap_or_default(),
744                        quote.payment_sat,
745                    )
746                } else {
747                    PendingRequest::standard(hash_bytes.to_vec(), tx)
748                },
749            );
750        }
751
752        if dc
753            .send(&bytes::Bytes::copy_from_slice(&wire))
754            .await
755            .is_err()
756        {
757            let mut pending = pending_requests.lock().await;
758            pending.remove(hash_hex);
759            self.peer_selector
760                .write()
761                .await
762                .record_failure(target_peer_id);
763            return None;
764        }
765
766        self.record_sent(target_peer_id, wire_len).await;
767        self.peer_selector
768            .write()
769            .await
770            .record_request(target_peer_id, wire_len);
771
772        let wait_timeout = if let Some(quote) = quote {
773            let multiplier = quote.payment_sat.clamp(1, 32) as u128;
774            let extra_ms = self
775                .cashu_quotes
776                .settlement_timeout()
777                .as_millis()
778                .saturating_mul(multiplier);
779            self.request_timeout + Duration::from_millis(extra_ms.min(u64::MAX as u128) as u64)
780        } else {
781            self.request_timeout
782        };
783
784        match tokio::time::timeout(wait_timeout, &mut rx).await {
785            Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == expected_hash => {
786                let rtt_ms = sent_at.elapsed().as_millis() as u64;
787                self.record_received(target_peer_id, data.len() as u64)
788                    .await;
789                self.peer_selector.write().await.record_success(
790                    target_peer_id,
791                    rtt_ms,
792                    data.len() as u64,
793                );
794                Some(data)
795            }
796            Ok(Ok(Some(_))) => {
797                self.peer_selector
798                    .write()
799                    .await
800                    .record_failure(target_peer_id);
801                let pending = pending_requests.lock().await.remove(hash_hex);
802                if let Some(pending) = pending {
803                    if let Some(quoted) = pending.quoted {
804                        if let Some(in_flight) = quoted.in_flight_payment {
805                            let _ = self
806                                .cashu_quotes
807                                .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
808                                .await;
809                        }
810                    }
811                }
812                None
813            }
814            Ok(Ok(None)) | Ok(Err(_)) | Err(_) => {
815                let pending = pending_requests.lock().await.remove(hash_hex);
816                if let Some(pending) = pending {
817                    if let Some(quoted) = pending.quoted {
818                        if let Some(in_flight) = quoted.in_flight_payment {
819                            let _ = self
820                                .cashu_quotes
821                                .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
822                                .await;
823                        }
824                    }
825                }
826                self.peer_selector
827                    .write()
828                    .await
829                    .record_timeout(target_peer_id);
830                None
831            }
832        }
833    }
834
835    /// Resolve a hashtree root event through connected peers using Nostr REQ/EOSE over WebRTC.
836    pub async fn resolve_root_from_peers(
837        &self,
838        owner_pubkey: &str,
839        tree_name: &str,
840        per_peer_timeout: Duration,
841    ) -> Option<PeerRootEvent> {
842        let author = PublicKey::from_hex(owner_pubkey).ok()?;
843        let filter = Filter::new()
844            .kind(Kind::Custom(HASHTREE_KIND))
845            .author(author)
846            .custom_tag(
847                SingleLetterTag::lowercase(Alphabet::D),
848                vec![tree_name.to_string()],
849            )
850            .custom_tag(
851                SingleLetterTag::lowercase(Alphabet::L),
852                vec![HASHTREE_LABEL.to_string()],
853            )
854            .limit(50);
855
856        let peers = self.peers.read().await;
857        for entry in peers.values() {
858            if entry.state != ConnectionState::Connected {
859                continue;
860            }
861            let Some(peer) = entry.peer.as_ref() else {
862                continue;
863            };
864            if !peer.has_data_channel() {
865                continue;
866            }
867
868            debug!(
869                "Querying peer {} for root event {}/{}",
870                entry.peer_id.short(),
871                owner_pubkey,
872                tree_name
873            );
874            let events = match peer
875                .query_nostr_events(vec![filter.clone()], per_peer_timeout)
876                .await
877            {
878                Ok(events) => events,
879                Err(e) => {
880                    debug!(
881                        "Peer {} Nostr query failed for {}/{}: {}",
882                        entry.peer_id.short(),
883                        owner_pubkey,
884                        tree_name,
885                        e
886                    );
887                    continue;
888                }
889            };
890            debug!(
891                "Peer {} returned {} Nostr event(s) for {}/{}",
892                entry.peer_id.short(),
893                events.len(),
894                owner_pubkey,
895                tree_name
896            );
897
898            let latest = pick_latest_event(events.iter().filter(|event| {
899                hashtree_event_identifier(event).as_deref() == Some(tree_name)
900                    && is_hashtree_labeled_event(event)
901            }));
902            if let Some(event) = latest {
903                if let Some(root) = root_event_from_peer(event, &entry.peer_id.short(), tree_name) {
904                    debug!(
905                        "Resolved {}/{} via peer {} event {}",
906                        owner_pubkey,
907                        tree_name,
908                        entry.peer_id.short(),
909                        event.id.to_hex()
910                    );
911                    return Some(root);
912                }
913            }
914        }
915
916        None
917    }
918}
919
920impl Default for WebRTCState {
921    fn default() -> Self {
922        Self::new()
923    }
924}
925
926/// WebRTC manager handles peer discovery and connection management
927pub struct WebRTCManager {
928    config: WebRTCConfig,
929    my_peer_id: PeerId,
930    keys: Keys,
931    state: Arc<WebRTCState>,
932    shutdown: Arc<tokio::sync::watch::Sender<bool>>,
933    shutdown_rx: tokio::sync::watch::Receiver<bool>,
934    /// Channel to send signaling messages to relays
935    signaling_tx: mpsc::Sender<SignalingMessage>,
936    signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
937    /// Optional content store for serving hash requests
938    store: Option<Arc<dyn ContentStore>>,
939    /// Peer classifier for pool assignment
940    peer_classifier: PeerClassifier,
941    /// Optional Nostr relay for data-channel relay messages
942    nostr_relay: Option<Arc<NostrRelay>>,
943    /// Channel for peer state events (connection success/failure)
944    state_event_tx: mpsc::Sender<PeerStateEvent>,
945    state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
946    /// Channel for relayless mesh signaling frames received from peers.
947    mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
948    mesh_frame_rx: Option<mpsc::Receiver<(PeerId, MeshNostrFrame)>>,
949    seen_frame_ids: Arc<Mutex<TimedSeenSet>>,
950    seen_event_ids: Arc<Mutex<TimedSeenSet>>,
951}
952
953impl WebRTCManager {
954    /// Create a new WebRTC manager
955    pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
956        let pubkey = keys.public_key().to_hex();
957        let my_peer_id = PeerId::new(pubkey, None);
958        let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
959        let (signaling_tx, signaling_rx) = mpsc::channel(100);
960        let (state_event_tx, state_event_rx) = mpsc::channel(100);
961        let (mesh_frame_tx, mesh_frame_rx) = mpsc::channel(256);
962        let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
963            config.request_selection_strategy,
964            config.request_fairness_enabled,
965            config.request_dispatch,
966            Duration::from_millis(config.message_timeout_ms),
967            CashuRoutingConfig::default(),
968            None,
969            None,
970        ));
971
972        // Default classifier: all peers go to 'other' pool
973        let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
974
975        Self {
976            config,
977            my_peer_id,
978            keys,
979            state,
980            shutdown: Arc::new(shutdown),
981            shutdown_rx,
982            signaling_tx,
983            signaling_rx: Some(signaling_rx),
984            store: None,
985            peer_classifier,
986            nostr_relay: None,
987            state_event_tx,
988            state_event_rx: Some(state_event_rx),
989            mesh_frame_tx,
990            mesh_frame_rx: Some(mesh_frame_rx),
991            seen_frame_ids: Arc::new(Mutex::new(TimedSeenSet::new(
992                SEEN_FRAME_CAP,
993                SEEN_FRAME_TTL,
994            ))),
995            seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
996                SEEN_EVENT_CAP,
997                SEEN_EVENT_TTL,
998            ))),
999        }
1000    }
1001
1002    /// Create a new WebRTC manager with a peer classifier
1003    pub fn new_with_classifier(
1004        keys: Keys,
1005        config: WebRTCConfig,
1006        classifier: PeerClassifier,
1007    ) -> Self {
1008        let mut manager = Self::new(keys, config);
1009        manager.peer_classifier = classifier;
1010        manager
1011    }
1012
1013    /// Create a new WebRTC manager with a content store for serving hash requests
1014    pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
1015        let mut manager = Self::new(keys, config);
1016        manager.store = Some(store);
1017        manager
1018    }
1019
1020    /// Create a new WebRTC manager with store and classifier
1021    pub fn new_with_store_and_classifier(
1022        keys: Keys,
1023        config: WebRTCConfig,
1024        store: Arc<dyn ContentStore>,
1025        classifier: PeerClassifier,
1026    ) -> Self {
1027        Self::new_with_store_and_classifier_and_cashu(
1028            keys,
1029            config,
1030            store,
1031            classifier,
1032            CashuRoutingConfig::default(),
1033            None,
1034            None,
1035        )
1036    }
1037
1038    pub fn new_with_store_and_classifier_and_cashu(
1039        keys: Keys,
1040        config: WebRTCConfig,
1041        store: Arc<dyn ContentStore>,
1042        classifier: PeerClassifier,
1043        cashu_routing: CashuRoutingConfig,
1044        payment_client: Option<Arc<dyn CashuPaymentClient>>,
1045        mint_metadata: Option<Arc<CashuMintMetadataStore>>,
1046    ) -> Self {
1047        let mut manager = Self::new(keys, config);
1048        manager.state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1049            manager.config.request_selection_strategy,
1050            manager.config.request_fairness_enabled,
1051            manager.config.request_dispatch,
1052            Duration::from_millis(manager.config.message_timeout_ms),
1053            cashu_routing,
1054            payment_client,
1055            mint_metadata,
1056        ));
1057        manager.store = Some(store);
1058        manager.peer_classifier = classifier;
1059        manager
1060    }
1061
1062    /// Set the content store for serving hash requests
1063    pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
1064        self.store = Some(store);
1065    }
1066
1067    /// Set the peer classifier
1068    pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
1069        self.peer_classifier = classifier;
1070    }
1071
1072    /// Set the Nostr relay for data-channel relay messages
1073    pub fn set_nostr_relay(&mut self, relay: Arc<NostrRelay>) {
1074        self.nostr_relay = Some(relay);
1075    }
1076
1077    /// Get my peer ID
1078    pub fn my_peer_id(&self) -> &PeerId {
1079        &self.my_peer_id
1080    }
1081
1082    /// Get shared state for external access
1083    pub fn state(&self) -> Arc<WebRTCState> {
1084        self.state.clone()
1085    }
1086
1087    /// Signal shutdown
1088    pub fn shutdown(&self) {
1089        let _ = self.shutdown.send(true);
1090    }
1091
1092    /// Get connected peer count
1093    pub async fn connected_count(&self) -> usize {
1094        self.state
1095            .connected_count
1096            .load(std::sync::atomic::Ordering::Relaxed)
1097    }
1098
1099    /// Get all peer statuses
1100    pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
1101        self.state
1102            .peers
1103            .read()
1104            .await
1105            .values()
1106            .map(|p| PeerStatus {
1107                peer_id: p.peer_id.to_string(),
1108                pubkey: p.peer_id.pubkey.clone(),
1109                state: p.state.to_string(),
1110                direction: p.direction,
1111                connected_at: Some(p.last_seen),
1112                pool: p.pool,
1113            })
1114            .collect()
1115    }
1116
1117    /// Get pool counts
1118    /// Returns (follows_connected, follows_active, other_connected, other_active)
1119    /// "active" = Connected or Connecting (excludes Discovered and Failed)
1120    pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
1121        let peers = self.state.peers.read().await;
1122        let mut follows_connected = 0;
1123        let mut follows_active = 0;
1124        let mut other_connected = 0;
1125        let mut other_active = 0;
1126
1127        for entry in peers.values() {
1128            // Only count Connected or Connecting as "active" connections
1129            // Discovered peers are just seen hellos, not real connections
1130            let is_active = entry.state == ConnectionState::Connected
1131                || entry.state == ConnectionState::Connecting;
1132
1133            match entry.pool {
1134                PeerPool::Follows => {
1135                    if is_active {
1136                        follows_active += 1;
1137                    }
1138                    if entry.state == ConnectionState::Connected {
1139                        follows_connected += 1;
1140                    }
1141                }
1142                PeerPool::Other => {
1143                    if is_active {
1144                        other_active += 1;
1145                    }
1146                    if entry.state == ConnectionState::Connected {
1147                        other_connected += 1;
1148                    }
1149                }
1150            }
1151        }
1152
1153        (
1154            follows_connected,
1155            follows_active,
1156            other_connected,
1157            other_active,
1158        )
1159    }
1160
1161    /// Check if we can accept a peer in a given pool
1162    fn can_accept_peer(&self, pool: PeerPool, pool_counts: &(usize, usize, usize, usize)) -> bool {
1163        let (_, follows_active, _, other_active) = *pool_counts;
1164        match pool {
1165            PeerPool::Follows => follows_active < self.config.pools.follows.max_connections,
1166            PeerPool::Other => other_active < self.config.pools.other.max_connections,
1167        }
1168    }
1169
1170    /// Check if a pool is satisfied
1171    #[allow(dead_code)]
1172    fn is_pool_satisfied(
1173        &self,
1174        pool: PeerPool,
1175        pool_counts: &(usize, usize, usize, usize),
1176    ) -> bool {
1177        let (follows_connected, _, other_connected, _) = *pool_counts;
1178        match pool {
1179            PeerPool::Follows => {
1180                follows_connected >= self.config.pools.follows.satisfied_connections
1181            }
1182            PeerPool::Other => other_connected >= self.config.pools.other.satisfied_connections,
1183        }
1184    }
1185
1186    /// Check if both pools are satisfied
1187    #[allow(dead_code)]
1188    fn is_satisfied(&self, pool_counts: &(usize, usize, usize, usize)) -> bool {
1189        self.is_pool_satisfied(PeerPool::Follows, pool_counts)
1190            && self.is_pool_satisfied(PeerPool::Other, pool_counts)
1191    }
1192
1193    /// Check if we should initiate connection (tie-breaking)
1194    /// Lower UUID initiates - same as iris-client/hashtree-ts
1195    fn should_initiate(&self, their_uuid: &str) -> bool {
1196        self.my_peer_id.uuid.as_str() < their_uuid
1197    }
1198
1199    /// Start the WebRTC manager - connects to relays and handles signaling
1200    pub async fn run(&mut self) -> Result<()> {
1201        info!(
1202            "Starting WebRTC manager with peer ID: {}",
1203            self.my_peer_id.short()
1204        );
1205
1206        let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr::Event)>(100);
1207
1208        // Take the signaling receiver
1209        let mut signaling_rx = self
1210            .signaling_rx
1211            .take()
1212            .expect("signaling_rx already taken");
1213
1214        // Take the state event receiver
1215        let mut state_event_rx = self
1216            .state_event_rx
1217            .take()
1218            .expect("state_event_rx already taken");
1219        let mut mesh_frame_rx = self
1220            .mesh_frame_rx
1221            .take()
1222            .expect("mesh_frame_rx already taken");
1223
1224        // Create a shared write channel for all relay tasks
1225        let (relay_write_tx, _) = tokio::sync::broadcast::channel::<SignalingMessage>(100);
1226
1227        // Spawn relay connections
1228        for relay_url in &self.config.relays {
1229            let url = relay_url.clone();
1230            let event_tx = event_tx.clone();
1231            let shutdown_rx = self.shutdown_rx.clone();
1232            let keys = self.keys.clone();
1233            let relay_write_rx = relay_write_tx.subscribe();
1234
1235            tokio::spawn(async move {
1236                if let Err(e) =
1237                    Self::relay_task(url.clone(), event_tx, shutdown_rx, keys, relay_write_rx).await
1238                {
1239                    error!("Relay {} error: {}", url, e);
1240                }
1241            });
1242        }
1243
1244        // Process incoming events and outgoing signaling messages
1245        let mut shutdown_rx = self.shutdown_rx.clone();
1246        // Cleanup interval - run every 30 seconds as a fallback (not for real-time sync)
1247        let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
1248        let mut hello_ticker =
1249            tokio::time::interval(Duration::from_millis(self.config.hello_interval_ms));
1250        self.dispatch_signaling_message(
1251            SignalingMessage::hello(&self.my_peer_id.uuid),
1252            &relay_write_tx,
1253        )
1254        .await;
1255        loop {
1256            tokio::select! {
1257                _ = shutdown_rx.changed() => {
1258                    if *shutdown_rx.borrow() {
1259                        info!("WebRTC manager shutting down");
1260                        break;
1261                    }
1262                }
1263                Some((relay, event)) = event_rx.recv() => {
1264                    if let Err(e) = self.handle_event(&relay, &event, &relay_write_tx).await {
1265                        debug!("Error handling event from {}: {}", relay, e);
1266                    }
1267                }
1268                Some(msg) = signaling_rx.recv() => {
1269                    self.dispatch_signaling_message(msg, &relay_write_tx).await;
1270                }
1271                Some(event) = state_event_rx.recv() => {
1272                    // Handle peer state events (connected, failed, disconnected)
1273                    self.handle_peer_state_event(event, &relay_write_tx).await;
1274                }
1275                Some((from_peer_id, frame)) = mesh_frame_rx.recv() => {
1276                    self.handle_mesh_frame(from_peer_id, frame, &relay_write_tx).await;
1277                }
1278                _ = hello_ticker.tick() => {
1279                    self.dispatch_signaling_message(
1280                        SignalingMessage::hello(&self.my_peer_id.uuid),
1281                        &relay_write_tx,
1282                    ).await;
1283                }
1284                _ = cleanup_interval.tick() => {
1285                    // Periodic cleanup of stale peers and state sync (fallback)
1286                    self.cleanup_stale_peers().await;
1287                }
1288            }
1289        }
1290
1291        Ok(())
1292    }
1293
1294    /// Connect to a single relay and handle messages
1295    async fn relay_task(
1296        url: String,
1297        event_tx: mpsc::Sender<(String, nostr::Event)>,
1298        mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
1299        keys: Keys,
1300        mut signaling_rx: tokio::sync::broadcast::Receiver<SignalingMessage>,
1301    ) -> Result<()> {
1302        info!("Connecting to relay: {}", url);
1303
1304        let (ws_stream, _) = connect_async(&url).await?;
1305        let (mut write, mut read) = ws_stream.split();
1306
1307        // Subscribe to webrtc events - two filters:
1308        // 1. Hello messages: kind 25050 with #l: "hello" tag
1309        // 2. Directed messages: kind 25050 with #p tag (our pubkey)
1310        let hello_filter = Filter::new()
1311            .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
1312            .custom_tag(
1313                nostr::SingleLetterTag::lowercase(nostr::Alphabet::L),
1314                vec![HELLO_TAG],
1315            )
1316            .since(nostr::Timestamp::now() - Duration::from_secs(60));
1317
1318        let directed_filter = Filter::new()
1319            .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
1320            .custom_tag(
1321                nostr::SingleLetterTag::lowercase(nostr::Alphabet::P),
1322                vec![keys.public_key().to_hex()],
1323            )
1324            .since(nostr::Timestamp::now() - Duration::from_secs(60));
1325
1326        let sub_id = nostr::SubscriptionId::generate();
1327        let sub_msg = ClientMessage::req(sub_id.clone(), vec![hello_filter, directed_filter]);
1328        write.send(Message::Text(sub_msg.as_json())).await?;
1329
1330        info!(
1331            "Subscribed to {} for WebRTC events (kind {})",
1332            url, WEBRTC_KIND
1333        );
1334
1335        loop {
1336            tokio::select! {
1337                _ = shutdown_rx.changed() => {
1338                    if *shutdown_rx.borrow() {
1339                        break;
1340                    }
1341                }
1342                // Handle outgoing signaling messages
1343                Ok(signaling_msg) = signaling_rx.recv() => {
1344                    info!("Sending {} via {}", signaling_msg.msg_type(), url);
1345                    if let Ok(event) = Self::create_signaling_event(&keys, &signaling_msg).await {
1346                        let event_id = event.id.to_string();
1347                        let msg = ClientMessage::event(event);
1348                        if write.send(Message::Text(msg.as_json())).await.is_ok() {
1349                            info!("Sent {} to {} (event id: {})", signaling_msg.msg_type(), url, &event_id[..16]);
1350                        }
1351                    }
1352                }
1353                msg = read.next() => {
1354                    match msg {
1355                        Some(Ok(Message::Text(text))) => {
1356                            if let Ok(RelayMessage::Event { event, .. }) =
1357                                RelayMessage::from_json(&text)
1358                            {
1359                                let _ = event_tx.send((url.clone(), *event)).await;
1360                            }
1361                        }
1362                        Some(Err(e)) => {
1363                            error!("WebSocket error from {}: {}", url, e);
1364                            break;
1365                        }
1366                        None => {
1367                            warn!("WebSocket closed: {}", url);
1368                            break;
1369                        }
1370                        _ => {}
1371                    }
1372                }
1373            }
1374        }
1375
1376        Ok(())
1377    }
1378
1379    async fn mark_seen_frame_id(&self, frame_id: String) -> bool {
1380        let mut seen = self.seen_frame_ids.lock().await;
1381        seen.insert_if_new(frame_id)
1382    }
1383
1384    async fn mark_seen_event_id(&self, event_id: String) -> bool {
1385        let mut seen = self.seen_event_ids.lock().await;
1386        seen.insert_if_new(event_id)
1387    }
1388
1389    async fn dispatch_signaling_message(
1390        &self,
1391        msg: SignalingMessage,
1392        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1393    ) {
1394        if relay_write_tx.send(msg.clone()).is_err() {
1395            debug!(
1396                "No relay subscribers for signaling message {}",
1397                msg.msg_type()
1398            );
1399        }
1400
1401        let event = match Self::create_signaling_event(&self.keys, &msg).await {
1402            Ok(event) => event,
1403            Err(e) => {
1404                debug!("Failed to create signaling event for mesh dispatch: {}", e);
1405                return;
1406            }
1407        };
1408
1409        let mut frame =
1410            MeshNostrFrame::new_event(event, &self.my_peer_id.to_string(), MESH_DEFAULT_HTL);
1411        if !self.mark_seen_frame_id(frame.frame_id.clone()).await {
1412            self.state.record_mesh_duplicate_drop();
1413            return;
1414        }
1415        if !self.mark_seen_event_id(frame.event().id.to_hex()).await {
1416            self.state.record_mesh_duplicate_drop();
1417            return;
1418        }
1419
1420        // Keep the sender peer id stable even if this is forwarded later.
1421        frame.sender_peer_id = self.my_peer_id.to_string();
1422        let forwarded = self.forward_mesh_frame(&frame, None).await;
1423        if forwarded > 0 {
1424            self.state.record_mesh_forwarded(forwarded as u64);
1425        }
1426    }
1427
1428    async fn forward_mesh_frame(
1429        &self,
1430        frame: &MeshNostrFrame,
1431        exclude_peer_id: Option<&str>,
1432    ) -> usize {
1433        let peers = self.state.peers.read().await;
1434        let peer_refs: Vec<_> = peers
1435            .values()
1436            .filter(|entry| entry.state == ConnectionState::Connected)
1437            .filter(|entry| {
1438                entry
1439                    .peer
1440                    .as_ref()
1441                    .map(|peer| peer.has_data_channel())
1442                    .unwrap_or(false)
1443            })
1444            .filter(|entry| {
1445                exclude_peer_id
1446                    .map(|exclude| exclude != entry.peer_id.to_string())
1447                    .unwrap_or(true)
1448            })
1449            .filter_map(|entry| {
1450                entry.peer.as_ref().map(|peer| {
1451                    (
1452                        entry.peer_id.to_string(),
1453                        entry.peer_id.short(),
1454                        peer.data_channel.clone(),
1455                        *peer.htl_config(),
1456                    )
1457                })
1458            })
1459            .collect();
1460        drop(peers);
1461
1462        let mut forwarded = 0usize;
1463        for (_peer_key, peer_short, dc_mutex, htl_cfg) in peer_refs {
1464            let next_htl = decrement_htl_with_policy(frame.htl, &MESH_EVENT_POLICY, &htl_cfg);
1465            if !should_forward_htl(next_htl) {
1466                continue;
1467            }
1468
1469            let mut outbound = frame.clone();
1470            outbound.htl = next_htl;
1471            let text = match serde_json::to_string(&outbound) {
1472                Ok(text) => text,
1473                Err(e) => {
1474                    debug!("Failed to serialize mesh frame for {}: {}", peer_short, e);
1475                    continue;
1476                }
1477            };
1478
1479            let dc_guard = dc_mutex.lock().await;
1480            let Some(dc) = dc_guard.as_ref() else {
1481                continue;
1482            };
1483            if dc.ready_state()
1484                != webrtc::data_channel::data_channel_state::RTCDataChannelState::Open
1485            {
1486                continue;
1487            }
1488            if dc.send_text(text).await.is_ok() {
1489                forwarded += 1;
1490            }
1491        }
1492
1493        forwarded
1494    }
1495
1496    async fn handle_mesh_frame(
1497        &self,
1498        from_peer_id: PeerId,
1499        frame: MeshNostrFrame,
1500        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1501    ) {
1502        if let Err(reason) = validate_mesh_frame(&frame) {
1503            debug!(
1504                "Ignoring mesh frame from {} (invalid: {})",
1505                from_peer_id.short(),
1506                reason
1507            );
1508            return;
1509        }
1510
1511        if !self.mark_seen_frame_id(frame.frame_id.clone()).await {
1512            self.state.record_mesh_duplicate_drop();
1513            return;
1514        }
1515
1516        let event = match &frame.payload {
1517            MeshNostrPayload::Event { event } => event.clone(),
1518        };
1519
1520        if !self.mark_seen_event_id(event.id.to_hex()).await {
1521            self.state.record_mesh_duplicate_drop();
1522            return;
1523        }
1524
1525        if event.verify().is_err() {
1526            debug!(
1527                "Ignoring mesh event from {} due to invalid signature",
1528                from_peer_id.short()
1529            );
1530            return;
1531        }
1532
1533        self.state.record_mesh_received();
1534
1535        if let Err(e) = self.handle_event("mesh", &event, relay_write_tx).await {
1536            debug!(
1537                "Error handling mesh event from {}: {}",
1538                from_peer_id.short(),
1539                e
1540            );
1541        }
1542
1543        let forwarded = self
1544            .forward_mesh_frame(&frame, Some(&from_peer_id.to_string()))
1545            .await;
1546        if forwarded > 0 {
1547            self.state.record_mesh_forwarded(forwarded as u64);
1548        }
1549    }
1550
1551    /// Create a signaling event
1552    ///
1553    /// For directed messages (offer, answer, candidate, candidates), use NIP-17 style
1554    /// gift wrapping with ephemeral keys for privacy.
1555    /// Hello messages use kind 25050 with #l: "hello" tag and peerId.
1556    async fn create_signaling_event(keys: &Keys, msg: &SignalingMessage) -> Result<nostr::Event> {
1557        // Check if message has a recipient (needs gift wrapping)
1558        if let Some(recipient_str) = msg.recipient() {
1559            // Parse recipient to get their pubkey
1560            if let Some(peer_id) = PeerId::from_string(recipient_str) {
1561                let recipient_pubkey = PublicKey::from_hex(&peer_id.pubkey)?;
1562
1563                // Create seal with sender's actual pubkey (the "rumor")
1564                let seal = serde_json::json!({
1565                    "pubkey": keys.public_key().to_hex(),
1566                    "kind": WEBRTC_KIND,
1567                    "content": serde_json::to_string(msg)?,
1568                    "tags": []
1569                });
1570
1571                // Generate ephemeral keypair for the wrapper
1572                let ephemeral_keys = Keys::generate();
1573
1574                // Encrypt the seal for the recipient using ephemeral key (NIP-44)
1575                let encrypted_content = nip44::encrypt(
1576                    ephemeral_keys.secret_key(),
1577                    &recipient_pubkey,
1578                    seal.to_string(),
1579                    nip44::Version::V2,
1580                )?;
1581
1582                // Create wrapper event with ephemeral key
1583                let created_at = nostr::Timestamp::now();
1584                let expiration = created_at + Duration::from_secs(5 * 60); // 5 minutes
1585
1586                let tags = vec![
1587                    Tag::parse(&["p", &recipient_pubkey.to_hex()])?,
1588                    Tag::parse(&["expiration", &expiration.as_u64().to_string()])?,
1589                ];
1590
1591                let event =
1592                    EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), encrypted_content, tags)
1593                        .to_event(&ephemeral_keys)?;
1594
1595                return Ok(event);
1596            }
1597        }
1598
1599        // Hello messages - kind 25050 with #l: "hello" tag and peerId
1600        let tags = vec![
1601            Tag::parse(&["l", HELLO_TAG])?,
1602            Tag::parse(&["peerId", msg.peer_id()])?,
1603        ];
1604
1605        let event =
1606            EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), "", tags).to_event(keys)?;
1607
1608        Ok(event)
1609    }
1610
1611    /// Handle an incoming event
1612    ///
1613    /// Messages may be:
1614    /// 1. Hello messages: kind 25050 with #l: "hello" tag and peerId
1615    /// 2. Gift-wrapped directed messages: kind 25050 with #p tag, encrypted with ephemeral key
1616    async fn handle_event(
1617        &self,
1618        relay: &str,
1619        event: &nostr::Event,
1620        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1621    ) -> Result<()> {
1622        // Must be kind 25050
1623        if event.kind != Kind::Ephemeral(WEBRTC_KIND as u16) {
1624            return Ok(());
1625        }
1626
1627        // Helper to get tag value
1628        let get_tag = |name: &str| -> Option<String> {
1629            event.tags.iter().find_map(|tag| {
1630                let v: Vec<String> = tag.clone().to_vec();
1631                if v.len() >= 2 && v[0] == name {
1632                    Some(v[1].clone())
1633                } else {
1634                    None
1635                }
1636            })
1637        };
1638
1639        // Check if this is a hello message (#l: "hello" tag)
1640        let l_tag = get_tag("l");
1641        if l_tag.as_deref() == Some(HELLO_TAG) {
1642            let sender_pubkey = event.pubkey.to_hex();
1643
1644            // Skip our own hello messages
1645            if sender_pubkey == self.my_peer_id.pubkey {
1646                return Ok(());
1647            }
1648
1649            if let Some(their_uuid) = get_tag("peerId") {
1650                debug!("Received hello from {} via {}", &sender_pubkey[..8], relay);
1651                self.handle_hello(&sender_pubkey, &their_uuid, relay_write_tx)
1652                    .await?;
1653            }
1654            return Ok(());
1655        }
1656
1657        // Check if this is a directed message for us (#p tag with our pubkey)
1658        let p_tag = get_tag("p");
1659        if p_tag.as_deref() != Some(&self.keys.public_key().to_hex()) {
1660            // Not for us - ignore silently
1661            return Ok(());
1662        }
1663
1664        // Gift-wrapped directed message - decrypt using our key and ephemeral sender's pubkey
1665        if event.content.is_empty() {
1666            return Ok(());
1667        }
1668
1669        // Try to unwrap the gift - decrypt with our key and the ephemeral sender's pubkey
1670        let seal: serde_json::Value =
1671            match nip44::decrypt(self.keys.secret_key(), &event.pubkey, &event.content) {
1672                Ok(plaintext) => match serde_json::from_str(&plaintext) {
1673                    Ok(v) => v,
1674                    Err(_) => return Ok(()),
1675                },
1676                Err(_) => {
1677                    // Can't decrypt - not for us or invalid
1678                    return Ok(());
1679                }
1680            };
1681
1682        // Extract the actual sender's pubkey and content from the seal
1683        let sender_pubkey = seal
1684            .get("pubkey")
1685            .and_then(|v| v.as_str())
1686            .ok_or_else(|| anyhow::anyhow!("Missing pubkey in seal"))?;
1687
1688        // Skip our own messages
1689        if sender_pubkey == self.my_peer_id.pubkey {
1690            return Ok(());
1691        }
1692
1693        let content = seal
1694            .get("content")
1695            .and_then(|v| v.as_str())
1696            .ok_or_else(|| anyhow::anyhow!("Missing content in seal"))?;
1697
1698        let raw_msg: serde_json::Value = serde_json::from_str(content)?;
1699        let msg_type = raw_msg.get("type").and_then(|v| v.as_str()).unwrap_or("");
1700
1701        // Support hashtree-ts format: { type, peerId, targetPeerId, sdp/candidate/candidates }
1702        if raw_msg.get("targetPeerId").is_some() {
1703            let target_peer = raw_msg
1704                .get("targetPeerId")
1705                .and_then(|v| v.as_str())
1706                .unwrap_or("");
1707            if target_peer != self.my_peer_id.to_string() {
1708                return Ok(());
1709            }
1710
1711            let peer_id = raw_msg.get("peerId").and_then(|v| v.as_str()).unwrap_or("");
1712            let their_uuid = peer_id.split(':').nth(1).unwrap_or(peer_id);
1713
1714            match msg_type {
1715                "offer" => {
1716                    let sdp = raw_msg
1717                        .get("sdp")
1718                        .and_then(|v| v.as_str())
1719                        .ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
1720                    let offer = serde_json::json!({ "type": "offer", "sdp": sdp });
1721                    self.handle_offer(sender_pubkey, their_uuid, offer, relay_write_tx)
1722                        .await?;
1723                }
1724                "answer" => {
1725                    let sdp = raw_msg
1726                        .get("sdp")
1727                        .and_then(|v| v.as_str())
1728                        .ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
1729                    let answer = serde_json::json!({ "type": "answer", "sdp": sdp });
1730                    self.handle_answer(sender_pubkey, their_uuid, answer)
1731                        .await?;
1732                }
1733                "candidate" => {
1734                    let candidate = raw_msg
1735                        .get("candidate")
1736                        .and_then(|v| v.as_str())
1737                        .unwrap_or("");
1738                    if !candidate.is_empty() {
1739                        let candidate_json = serde_json::json!({
1740                            "candidate": candidate,
1741                            "sdpMid": raw_msg.get("sdpMid"),
1742                            "sdpMLineIndex": raw_msg.get("sdpMLineIndex"),
1743                        });
1744                        self.handle_candidate(sender_pubkey, their_uuid, candidate_json)
1745                            .await?;
1746                    }
1747                }
1748                "candidates" => {
1749                    let candidates = raw_msg
1750                        .get("candidates")
1751                        .and_then(|v| v.as_array())
1752                        .map(|entries| {
1753                            entries
1754                                .iter()
1755                                .filter_map(|entry| {
1756                                    entry
1757                                        .get("candidate")
1758                                        .and_then(|v| v.as_str())
1759                                        .or_else(|| entry.as_str())
1760                                        .map(|candidate_str| {
1761                                            serde_json::json!({
1762                                                "candidate": candidate_str,
1763                                                "sdpMid": entry.get("sdpMid"),
1764                                                "sdpMLineIndex": entry.get("sdpMLineIndex"),
1765                                            })
1766                                        })
1767                                })
1768                                .collect::<Vec<_>>()
1769                        })
1770                        .unwrap_or_default();
1771                    self.handle_candidates(sender_pubkey, their_uuid, candidates)
1772                        .await?;
1773                }
1774                _ => {}
1775            }
1776
1777            return Ok(());
1778        }
1779
1780        let msg: SignalingMessage = serde_json::from_value(raw_msg)?;
1781
1782        debug!(
1783            "Received {} from {} via {} (gift-wrapped)",
1784            msg.msg_type(),
1785            &sender_pubkey[..8],
1786            relay
1787        );
1788
1789        match msg {
1790            SignalingMessage::Hello { .. } => {
1791                // Hello messages should come via tags, not gift wrap
1792                return Ok(());
1793            }
1794            SignalingMessage::Offer {
1795                recipient,
1796                peer_id: their_uuid,
1797                offer,
1798            } => {
1799                if recipient != self.my_peer_id.to_string() {
1800                    return Ok(()); // Not for us
1801                }
1802                if let Err(e) = self
1803                    .handle_offer(sender_pubkey, &their_uuid, offer, relay_write_tx)
1804                    .await
1805                {
1806                    error!(
1807                        "handle_offer FAILED: sender={}, uuid={}, error={:?}",
1808                        &sender_pubkey[..8.min(sender_pubkey.len())],
1809                        their_uuid,
1810                        e
1811                    );
1812                    return Err(e);
1813                }
1814            }
1815            SignalingMessage::Answer {
1816                recipient,
1817                peer_id: their_uuid,
1818                answer,
1819            } => {
1820                if recipient != self.my_peer_id.to_string() {
1821                    return Ok(());
1822                }
1823                self.handle_answer(sender_pubkey, &their_uuid, answer)
1824                    .await?;
1825            }
1826            SignalingMessage::Candidate {
1827                recipient,
1828                peer_id: their_uuid,
1829                candidate,
1830            } => {
1831                if recipient != self.my_peer_id.to_string() {
1832                    return Ok(());
1833                }
1834                self.handle_candidate(sender_pubkey, &their_uuid, candidate)
1835                    .await?;
1836            }
1837            SignalingMessage::Candidates {
1838                recipient,
1839                peer_id: their_uuid,
1840                candidates,
1841            } => {
1842                if recipient != self.my_peer_id.to_string() {
1843                    return Ok(());
1844                }
1845                self.handle_candidates(sender_pubkey, &their_uuid, candidates)
1846                    .await?;
1847            }
1848        }
1849
1850        Ok(())
1851    }
1852
1853    /// Handle incoming hello message
1854    async fn handle_hello(
1855        &self,
1856        sender_pubkey: &str,
1857        their_uuid: &str,
1858        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1859    ) -> Result<()> {
1860        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1861        let peer_key = full_peer_id.to_string();
1862        let mut already_discovered = false;
1863
1864        // Check if we already have this peer
1865        {
1866            let peers = self.state.peers.read().await;
1867            if let Some(entry) = peers.get(&peer_key) {
1868                // Already connected or connecting, just update last_seen
1869                if entry.state == ConnectionState::Connected
1870                    || entry.state == ConnectionState::Connecting
1871                {
1872                    return Ok(());
1873                }
1874                already_discovered = true;
1875            }
1876        }
1877
1878        // Classify the peer into a pool
1879        let pool = (self.peer_classifier)(sender_pubkey);
1880
1881        // Check pool limits
1882        let pool_counts = self.get_pool_counts().await;
1883        if !self.can_accept_peer(pool, &pool_counts) {
1884            debug!(
1885                "Ignoring hello from {} - pool {:?} is full",
1886                full_peer_id.short(),
1887                pool
1888            );
1889            return Ok(());
1890        }
1891
1892        // Decide if we should initiate based on tie-breaking
1893        let should_initiate = self.should_initiate(their_uuid);
1894
1895        // If pool is already satisfied, don't initiate new outbound connections
1896        // This reserves space for inbound connections
1897        let pool_satisfied = self.is_pool_satisfied(pool, &pool_counts);
1898        let will_initiate = should_initiate && !pool_satisfied;
1899
1900        info!(
1901            "Discovered peer: {} (pool: {:?}, initiate: {}, pool_satisfied: {})",
1902            full_peer_id.short(),
1903            pool,
1904            will_initiate,
1905            pool_satisfied
1906        );
1907
1908        // If we're not initiating and pool is satisfied, don't even add to discovered
1909        // (we won't accept their offer either since pool check happens in handle_offer)
1910        if !will_initiate && pool_satisfied {
1911            debug!(
1912                "Pool {:?} is satisfied, not tracking peer {}",
1913                pool,
1914                full_peer_id.short()
1915            );
1916            return Ok(());
1917        }
1918
1919        // Create peer entry with pool assignment
1920        {
1921            let mut peers = self.state.peers.write().await;
1922            peers.insert(
1923                peer_key.clone(),
1924                PeerEntry {
1925                    peer_id: full_peer_id.clone(),
1926                    direction: if will_initiate {
1927                        PeerDirection::Outbound
1928                    } else {
1929                        PeerDirection::Inbound
1930                    },
1931                    state: ConnectionState::Discovered,
1932                    last_seen: Instant::now(),
1933                    peer: None,
1934                    pool,
1935                    bytes_sent: 0,
1936                    bytes_received: 0,
1937                },
1938            );
1939        }
1940
1941        // If we discovered a peer but are not the initiator, send one immediate hello
1942        // to accelerate reciprocal discovery over relayless mesh paths.
1943        if !will_initiate && !already_discovered {
1944            self.dispatch_signaling_message(
1945                SignalingMessage::hello(&self.my_peer_id.uuid),
1946                relay_write_tx,
1947            )
1948            .await;
1949        }
1950
1951        // If we should initiate, create offer
1952        if will_initiate {
1953            self.initiate_connection(&full_peer_id, pool, relay_write_tx)
1954                .await?;
1955        }
1956
1957        Ok(())
1958    }
1959
1960    /// Initiate a connection to a peer (create and send offer)
1961    async fn initiate_connection(
1962        &self,
1963        peer_id: &PeerId,
1964        pool: PeerPool,
1965        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1966    ) -> Result<()> {
1967        let peer_key = peer_id.to_string();
1968
1969        info!(
1970            "Initiating connection to {} (pool: {:?})",
1971            peer_id.short(),
1972            pool
1973        );
1974
1975        // Create peer connection with content store and state events
1976        let mut peer = Peer::new_with_store_and_events(
1977            peer_id.clone(),
1978            PeerDirection::Outbound,
1979            self.my_peer_id.clone(),
1980            self.signaling_tx.clone(),
1981            self.config.stun_servers.clone(),
1982            self.store.clone(),
1983            Some(self.state_event_tx.clone()),
1984            self.nostr_relay.clone(),
1985            Some(self.mesh_frame_tx.clone()),
1986            Some(self.state.cashu_quotes.clone()),
1987        )
1988        .await?;
1989
1990        peer.setup_handlers().await?;
1991
1992        // Create offer
1993        let offer = peer.connect().await?;
1994
1995        // Update state
1996        {
1997            let mut peers = self.state.peers.write().await;
1998            if let Some(entry) = peers.get_mut(&peer_key) {
1999                entry.state = ConnectionState::Connecting;
2000                entry.peer = Some(peer);
2001                entry.pool = pool;
2002            }
2003        }
2004
2005        // Send offer
2006        let offer_msg = SignalingMessage::Offer {
2007            offer,
2008            recipient: peer_id.to_string(),
2009            peer_id: self.my_peer_id.uuid.clone(),
2010        };
2011        self.dispatch_signaling_message(offer_msg, relay_write_tx)
2012            .await;
2013
2014        info!("Sent offer to {}", peer_id.short());
2015
2016        Ok(())
2017    }
2018
2019    /// Handle incoming offer
2020    async fn handle_offer(
2021        &self,
2022        sender_pubkey: &str,
2023        their_uuid: &str,
2024        offer: serde_json::Value,
2025        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
2026    ) -> Result<()> {
2027        debug!(
2028            "handle_offer ENTRY: sender={}, uuid={}",
2029            &sender_pubkey[..8.min(sender_pubkey.len())],
2030            their_uuid
2031        );
2032        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
2033        let peer_key = full_peer_id.to_string();
2034
2035        // Classify the peer into a pool
2036        let pool = (self.peer_classifier)(sender_pubkey);
2037
2038        info!(
2039            "Received offer from {} (pool: {:?})",
2040            full_peer_id.short(),
2041            pool
2042        );
2043
2044        // Check if we already have this peer with an actual connection
2045        {
2046            let peers = self.state.peers.read().await;
2047            debug!(
2048                "Checking for existing peer, peer_key: {}, known_peers: {}",
2049                peer_key,
2050                peers.len()
2051            );
2052            if let Some(entry) = peers.get(&peer_key) {
2053                // Only skip if we have an actual peer connection (not just discovered)
2054                if entry.peer.is_some() {
2055                    debug!(
2056                        "Already have peer {} with connection, skipping offer",
2057                        full_peer_id.short()
2058                    );
2059                    return Ok(());
2060                }
2061                debug!(
2062                    "Peer {} exists but has no connection, proceeding",
2063                    full_peer_id.short()
2064                );
2065            } else {
2066                debug!(
2067                    "Peer {} not found in peers map, will create new entry",
2068                    full_peer_id.short()
2069                );
2070            }
2071        }
2072
2073        // Check pool limits
2074        let pool_counts = self.get_pool_counts().await;
2075        debug!(
2076            "Pool counts: {:?}, checking can_accept_peer for {:?}",
2077            pool_counts, pool
2078        );
2079        if !self.can_accept_peer(pool, &pool_counts) {
2080            warn!(
2081                "Rejecting offer from {} - pool {:?} is full",
2082                full_peer_id.short(),
2083                pool
2084            );
2085            return Ok(());
2086        }
2087        debug!("Pool check passed for {}", full_peer_id.short());
2088
2089        // Create peer connection with content store and state events
2090        debug!("Creating peer connection for {}", full_peer_id.short());
2091        let mut peer = Peer::new_with_store_and_events(
2092            full_peer_id.clone(),
2093            PeerDirection::Inbound,
2094            self.my_peer_id.clone(),
2095            self.signaling_tx.clone(),
2096            self.config.stun_servers.clone(),
2097            self.store.clone(),
2098            Some(self.state_event_tx.clone()),
2099            self.nostr_relay.clone(),
2100            Some(self.mesh_frame_tx.clone()),
2101            Some(self.state.cashu_quotes.clone()),
2102        )
2103        .await?;
2104        debug!("Peer connection created for {}", full_peer_id.short());
2105
2106        peer.setup_handlers().await?;
2107        debug!("Handlers set up for {}", full_peer_id.short());
2108
2109        // Handle offer and create answer
2110        let answer = peer.handle_offer(offer).await?;
2111        debug!("Answer created for {}", full_peer_id.short());
2112
2113        // Update state
2114        {
2115            let mut peers = self.state.peers.write().await;
2116            peers.insert(
2117                peer_key,
2118                PeerEntry {
2119                    peer_id: full_peer_id.clone(),
2120                    direction: PeerDirection::Inbound,
2121                    state: ConnectionState::Connecting,
2122                    last_seen: Instant::now(),
2123                    peer: Some(peer),
2124                    pool,
2125                    bytes_sent: 0,
2126                    bytes_received: 0,
2127                },
2128            );
2129        }
2130
2131        // Send answer
2132        // Note: peer_id is just the UUID, not full pubkey:uuid
2133        // The recipient will construct full peer_id from sender pubkey + this UUID
2134        let answer_msg = SignalingMessage::Answer {
2135            answer,
2136            recipient: full_peer_id.to_string(),
2137            peer_id: self.my_peer_id.uuid.clone(),
2138        };
2139        self.dispatch_signaling_message(answer_msg, relay_write_tx)
2140            .await;
2141        info!("Sent answer to {}", full_peer_id.short());
2142
2143        Ok(())
2144    }
2145
2146    /// Handle incoming answer
2147    async fn handle_answer(
2148        &self,
2149        sender_pubkey: &str,
2150        their_uuid: &str,
2151        answer: serde_json::Value,
2152    ) -> Result<()> {
2153        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
2154        let peer_key = full_peer_id.to_string();
2155
2156        info!("Received answer from {}", full_peer_id.short());
2157
2158        let mut peers = self.state.peers.write().await;
2159        if let Some(entry) = peers.get_mut(&peer_key) {
2160            // Skip if already connected - duplicate answers from multiple relays
2161            if entry.state == ConnectionState::Connected {
2162                debug!(
2163                    "Ignoring duplicate answer from {} - already connected",
2164                    full_peer_id.short()
2165                );
2166                return Ok(());
2167            }
2168            if let Some(ref mut peer) = entry.peer {
2169                // Check WebRTC signaling state before applying answer
2170                use webrtc::peer_connection::signaling_state::RTCSignalingState;
2171                let signaling_state = peer.signaling_state();
2172                if signaling_state != RTCSignalingState::HaveLocalOffer {
2173                    debug!(
2174                        "Ignoring answer from {} - signaling state is {:?}, not HaveLocalOffer",
2175                        full_peer_id.short(),
2176                        signaling_state
2177                    );
2178                    return Ok(());
2179                }
2180                peer.handle_answer(answer).await?;
2181                info!("Applied answer from {}", full_peer_id.short());
2182            } else {
2183                debug!("Peer {} has no connection object", full_peer_id.short());
2184            }
2185        } else {
2186            debug!("No peer found for key: {}", peer_key);
2187        }
2188
2189        Ok(())
2190    }
2191
2192    /// Handle incoming ICE candidate
2193    async fn handle_candidate(
2194        &self,
2195        sender_pubkey: &str,
2196        their_uuid: &str,
2197        candidate: serde_json::Value,
2198    ) -> Result<()> {
2199        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
2200        let peer_key = full_peer_id.to_string();
2201
2202        info!("Received ICE candidate from {}", full_peer_id.short());
2203
2204        let mut peers = self.state.peers.write().await;
2205        if let Some(entry) = peers.get_mut(&peer_key) {
2206            if let Some(ref mut peer) = entry.peer {
2207                peer.handle_candidate(candidate).await?;
2208            }
2209        }
2210
2211        Ok(())
2212    }
2213
2214    /// Handle batched ICE candidates
2215    async fn handle_candidates(
2216        &self,
2217        sender_pubkey: &str,
2218        their_uuid: &str,
2219        candidates: Vec<serde_json::Value>,
2220    ) -> Result<()> {
2221        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
2222        let peer_key = full_peer_id.to_string();
2223
2224        debug!(
2225            "Received {} candidates from {}",
2226            candidates.len(),
2227            full_peer_id.short()
2228        );
2229
2230        let mut peers = self.state.peers.write().await;
2231        if let Some(entry) = peers.get_mut(&peer_key) {
2232            if let Some(ref mut peer) = entry.peer {
2233                for candidate in candidates {
2234                    if let Err(e) = peer.handle_candidate(candidate).await {
2235                        debug!("Failed to add candidate: {}", e);
2236                    }
2237                }
2238            }
2239        }
2240
2241        Ok(())
2242    }
2243
2244    /// Handle peer state change events from peer connections
2245    async fn handle_peer_state_event(
2246        &self,
2247        event: PeerStateEvent,
2248        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
2249    ) {
2250        match event {
2251            PeerStateEvent::Connected(peer_id) => {
2252                let peer_key = peer_id.to_string();
2253                let mut emit_hello = false;
2254                let mut peers = self.state.peers.write().await;
2255                if let Some(entry) = peers.get_mut(&peer_key) {
2256                    if entry.state != ConnectionState::Connected {
2257                        info!("Peer {} connected (via state event)", peer_id.short());
2258                        entry.state = ConnectionState::Connected;
2259                        emit_hello = true;
2260                        // Update connected count
2261                        self.state
2262                            .connected_count
2263                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2264                    }
2265                }
2266                drop(peers);
2267                if emit_hello {
2268                    self.dispatch_signaling_message(
2269                        SignalingMessage::hello(&self.my_peer_id.uuid),
2270                        relay_write_tx,
2271                    )
2272                    .await;
2273                }
2274            }
2275            PeerStateEvent::Failed(peer_id) => {
2276                let peer_key = peer_id.to_string();
2277                info!(
2278                    "Peer {} connection failed - removing from pool",
2279                    peer_id.short()
2280                );
2281                let mut peers = self.state.peers.write().await;
2282                if let Some(entry) = peers.remove(&peer_key) {
2283                    // Decrement connected count if was connected
2284                    if entry.state == ConnectionState::Connected {
2285                        self.state
2286                            .connected_count
2287                            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
2288                    }
2289                    // Close the peer connection if it exists
2290                    if let Some(peer) = entry.peer {
2291                        let _ = peer.close().await;
2292                    }
2293                }
2294            }
2295            PeerStateEvent::Disconnected(peer_id) => {
2296                let peer_key = peer_id.to_string();
2297                info!("Peer {} disconnected - removing from pool", peer_id.short());
2298                let mut peers = self.state.peers.write().await;
2299                if let Some(entry) = peers.remove(&peer_key) {
2300                    // Decrement connected count if was connected
2301                    if entry.state == ConnectionState::Connected {
2302                        self.state
2303                            .connected_count
2304                            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
2305                    }
2306                    // Close the peer connection if it exists
2307                    if let Some(peer) = entry.peer {
2308                        let _ = peer.close().await;
2309                    }
2310                }
2311            }
2312        }
2313    }
2314
2315    /// Cleanup stale peers and sync connection states (fallback, runs every 30s)
2316    async fn cleanup_stale_peers(&self) {
2317        let mut peers = self.state.peers.write().await;
2318        let mut connected_count = 0;
2319        let mut to_remove = Vec::new();
2320        let stale_timeout = Duration::from_secs(60); // Remove peers stuck in Discovered/Connecting for 60s
2321
2322        for (key, entry) in peers.iter_mut() {
2323            if let Some(ref peer) = entry.peer {
2324                // Sync connected state as fallback (in case event was missed)
2325                if peer.is_connected() {
2326                    if entry.state != ConnectionState::Connected {
2327                        info!(
2328                            "Peer {} is now connected (sync fallback)",
2329                            entry.peer_id.short()
2330                        );
2331                        entry.state = ConnectionState::Connected;
2332                    }
2333                    connected_count += 1;
2334                } else if entry.state == ConnectionState::Connecting
2335                    && entry.last_seen.elapsed() > stale_timeout
2336                {
2337                    // Peer stuck in Connecting for too long - mark for removal
2338                    info!(
2339                        "Removing stale peer {} (stuck in Connecting for {:?})",
2340                        entry.peer_id.short(),
2341                        entry.last_seen.elapsed()
2342                    );
2343                    to_remove.push(key.clone());
2344                }
2345            } else if entry.state == ConnectionState::Discovered
2346                && entry.last_seen.elapsed() > stale_timeout
2347            {
2348                // Discovered peer with no actual connection - remove
2349                debug!("Removing stale discovered peer {}", entry.peer_id.short());
2350                to_remove.push(key.clone());
2351            }
2352        }
2353
2354        // Remove stale peers
2355        for key in to_remove {
2356            if let Some(entry) = peers.remove(&key) {
2357                if let Some(peer) = entry.peer {
2358                    let _ = peer.close().await;
2359                }
2360            }
2361        }
2362
2363        self.state
2364            .connected_count
2365            .store(connected_count, std::sync::atomic::Ordering::Relaxed);
2366    }
2367}
2368
2369// Keep the old PeerState for backward compatibility with tests
2370#[allow(dead_code)]
2371#[derive(Debug, Clone)]
2372pub struct PeerState {
2373    pub peer_id: PeerId,
2374    pub direction: PeerDirection,
2375    pub state: String,
2376    pub last_seen: Instant,
2377}
2378
2379#[cfg(test)]
2380mod tests {
2381    use super::*;
2382    use nostr::{EventBuilder, Keys, Tag};
2383
2384    #[test]
2385    fn root_event_from_peer_extracts_tags() {
2386        let keys = Keys::generate();
2387        let hash = "ab".repeat(32);
2388        let event = EventBuilder::new(
2389            Kind::Custom(HASHTREE_KIND),
2390            "",
2391            [
2392                Tag::parse(&["d", "repo"]).unwrap(),
2393                Tag::parse(&["l", HASHTREE_LABEL]).unwrap(),
2394                Tag::parse(&["hash", &hash]).unwrap(),
2395                Tag::parse(&["encryptedKey", &"11".repeat(32)]).unwrap(),
2396            ],
2397        )
2398        .to_event(&keys)
2399        .unwrap();
2400
2401        let parsed = root_event_from_peer(&event, "peer-a", "repo").unwrap();
2402        let expected_encrypted = "11".repeat(32);
2403        assert_eq!(parsed.hash, hash);
2404        assert_eq!(parsed.peer_id, "peer-a");
2405        assert_eq!(
2406            parsed.encrypted_key.as_deref(),
2407            Some(expected_encrypted.as_str())
2408        );
2409        assert!(parsed.key.is_none());
2410    }
2411
2412    #[test]
2413    fn pick_latest_event_prefers_higher_event_id_on_timestamp_tie() {
2414        let keys = Keys::generate();
2415        let created_at = nostr::Timestamp::from_secs(1_700_000_000);
2416        let event_a = EventBuilder::new(Kind::Custom(HASHTREE_KIND), "", [])
2417            .custom_created_at(created_at)
2418            .to_event(&keys)
2419            .unwrap();
2420        let event_b = EventBuilder::new(Kind::Custom(HASHTREE_KIND), "", [])
2421            .custom_created_at(created_at)
2422            .to_event(&keys)
2423            .unwrap();
2424
2425        let expected = if event_a.id > event_b.id {
2426            event_a.id
2427        } else {
2428            event_b.id
2429        };
2430        let picked = pick_latest_event([&event_a, &event_b]).unwrap();
2431        assert_eq!(picked.id, expected);
2432    }
2433
2434    #[test]
2435    fn test_formal_timed_seen_set_rejects_duplicates() {
2436        let mut seen = TimedSeenSet::new(4, Duration::from_secs(60));
2437        assert!(seen.insert_if_new("frame-1".to_string()));
2438        assert!(!seen.insert_if_new("frame-1".to_string()));
2439        assert!(seen.insert_if_new("frame-2".to_string()));
2440    }
2441
2442    #[test]
2443    fn test_formal_timed_seen_set_evicts_oldest_when_capacity_exceeded() {
2444        let mut seen = TimedSeenSet::new(2, Duration::from_secs(60));
2445        assert!(seen.insert_if_new("a".to_string()));
2446        assert!(seen.insert_if_new("b".to_string()));
2447        assert!(seen.insert_if_new("c".to_string()));
2448
2449        // "a" should be evicted due to cap=2, so re-insert becomes new again.
2450        assert!(seen.insert_if_new("a".to_string()));
2451        assert!(!seen.insert_if_new("a".to_string()));
2452    }
2453
2454    #[test]
2455    fn test_request_dispatch_normalization_caps_to_available_peers() {
2456        let normalized = normalize_dispatch_config(
2457            RequestDispatchConfig {
2458                initial_fanout: 8,
2459                hedge_fanout: 6,
2460                max_fanout: 5,
2461                hedge_interval_ms: 120,
2462            },
2463            3,
2464        );
2465        assert_eq!(normalized.max_fanout, 3);
2466        assert_eq!(normalized.initial_fanout, 3);
2467        assert_eq!(normalized.hedge_fanout, 3);
2468    }
2469
2470    #[test]
2471    fn test_hedged_wave_plan_matches_dispatch_policy() {
2472        let plan = build_hedged_wave_plan(
2473            7,
2474            RequestDispatchConfig {
2475                initial_fanout: 2,
2476                hedge_fanout: 3,
2477                max_fanout: 6,
2478                hedge_interval_ms: 120,
2479            },
2480        );
2481        assert_eq!(plan, vec![2, 3, 1]);
2482    }
2483}