Skip to main content

hashtree_cli/webrtc/
signaling.rs

1//! WebRTC signaling over Nostr relays
2//!
3//! Protocol (compatible with hashtree-ts):
4//! - All signaling uses ephemeral kind 25050
5//! - Hello messages: #l: "hello" tag, broadcast for peer discovery (unencrypted)
6//! - Directed signaling (offer, answer, candidate, candidates): NIP-17 style
7//!   gift wrap for privacy - wrapped with ephemeral key, #p tag with recipient
8//!
9//! Security: Directed messages use gift wrapping with ephemeral keys so that
10//! relays cannot see the actual sender or correlate messages.
11
12use anyhow::Result;
13use async_trait::async_trait;
14use futures::{SinkExt, StreamExt};
15use hashtree_network::{
16    decode_signaling_event, encode_signaling_event, run_hedged_waves, sync_selector_peers,
17    ClassifyRequest as SharedClassifyRequest, HedgedWaveAction, IceCandidate as SharedIceCandidate,
18    MeshRouter, PeerLink as SharedPeerLink, PeerLinkFactory as SharedPeerLinkFactory, PeerSelector,
19    SignalingTransport as SharedSignalingTransport, TransportError as SharedTransportError,
20};
21use nostr::{ClientMessage, Filter, JsonUtil, Keys, Kind, RelayMessage};
22use std::collections::{BTreeSet, HashMap};
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25use tokio::sync::{mpsc, Mutex, RwLock};
26use tokio_tungstenite::{connect_async, tungstenite::Message};
27use tracing::{debug, error, info, warn};
28
29use super::bluetooth::{BluetoothMesh, BluetoothPeerRegistrar, BluetoothRuntimeContext};
30use super::cashu::{CashuMintMetadataStore, CashuQuoteState, CashuRoutingConfig, NegotiatedQuote};
31use super::local_bus::SharedLocalNostrBus;
32use super::multicast::MulticastNostrBus;
33use super::peer::{ContentStore, Peer, PendingRequest};
34use super::root_events::{
35    build_root_filter, hashtree_event_identifier, is_hashtree_labeled_event, pick_latest_event,
36    root_event_from_peer, PeerRootEvent,
37};
38use super::session::MeshPeer;
39use super::types::{
40    decrement_htl_with_policy, encode_quote_request, encode_request, should_forward_htl,
41    validate_mesh_frame, DataQuoteRequest, DataRequest, MeshNostrFrame, MeshNostrPayload,
42    PeerDirection, PeerId, PeerPool, PeerStateEvent, PeerStatus, RequestDispatchConfig,
43    SignalingMessage, TimedSeenSet, WebRTCConfig, HELLO_TAG, MESH_DEFAULT_HTL, MESH_EVENT_POLICY,
44    WEBRTC_KIND,
45};
46use super::wifi_aware::{mobile_wifi_aware_bridge, WifiAwareNostrBus, WIFI_AWARE_SOURCE};
47use crate::cashu_helper::CashuPaymentClient;
48use crate::nostr_relay::NostrRelay;
49
50/// Callback type for classifying peers into pools
51pub type PeerClassifier = Arc<dyn Fn(&str) -> PeerPool + Send + Sync>;
52
53/// Active data transport used for a peer session.
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
55pub enum PeerTransport {
56    WebRtc,
57    Bluetooth,
58}
59
60impl PeerTransport {
61    pub const fn as_str(self) -> &'static str {
62        match self {
63            PeerTransport::WebRtc => "webrtc",
64            PeerTransport::Bluetooth => "bluetooth",
65        }
66    }
67}
68
69impl std::fmt::Display for PeerTransport {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.write_str((*self).as_str())
72    }
73}
74
75fn bluetooth_nostr_only_mode() -> bool {
76    matches!(
77        std::env::var("HTREE_BLUETOOTH_NOSTR_ONLY").ok().as_deref(),
78        Some("1" | "true" | "TRUE" | "yes" | "YES")
79    )
80}
81
82/// Signaling/discovery path through which a peer was seen.
83#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
84pub enum PeerSignalPath {
85    Relay,
86    Multicast,
87    WifiAware,
88    Bluetooth,
89}
90
91impl PeerSignalPath {
92    pub const fn as_str(self) -> &'static str {
93        match self {
94            PeerSignalPath::Relay => "relay",
95            PeerSignalPath::Multicast => "multicast",
96            PeerSignalPath::WifiAware => WIFI_AWARE_SOURCE,
97            PeerSignalPath::Bluetooth => "bluetooth",
98        }
99    }
100
101    pub fn from_source_name(source: &str) -> Self {
102        match source {
103            "multicast" => PeerSignalPath::Multicast,
104            WIFI_AWARE_SOURCE => PeerSignalPath::WifiAware,
105            "bluetooth" => PeerSignalPath::Bluetooth,
106            _ => PeerSignalPath::Relay,
107        }
108    }
109}
110
111impl std::fmt::Display for PeerSignalPath {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        f.write_str((*self).as_str())
114    }
115}
116
117/// Connection state for a peer
118#[derive(Debug, Clone, PartialEq)]
119pub enum ConnectionState {
120    Discovered,
121    Connecting,
122    Connected,
123    Failed,
124}
125
126impl std::fmt::Display for ConnectionState {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        match self {
129            ConnectionState::Discovered => write!(f, "discovered"),
130            ConnectionState::Connecting => write!(f, "connecting"),
131            ConnectionState::Connected => write!(f, "connected"),
132            ConnectionState::Failed => write!(f, "failed"),
133        }
134    }
135}
136
137/// Peer entry in the manager
138pub struct PeerEntry {
139    pub peer_id: PeerId,
140    pub direction: PeerDirection,
141    pub state: ConnectionState,
142    pub last_seen: Instant,
143    pub peer: Option<MeshPeer>,
144    pub pool: PeerPool,
145    pub transport: PeerTransport,
146    pub signal_paths: BTreeSet<PeerSignalPath>,
147    pub bytes_sent: u64,
148    pub bytes_received: u64,
149}
150
151/// Shared state for the native mesh router.
152pub struct WebRTCState {
153    pub peers: RwLock<HashMap<String, PeerEntry>>,
154    pub connected_count: std::sync::atomic::AtomicUsize,
155    /// Total bytes sent across all peers (cumulative)
156    pub bytes_sent: std::sync::atomic::AtomicU64,
157    /// Total bytes received across all peers (cumulative)
158    pub bytes_received: std::sync::atomic::AtomicU64,
159    /// Relayless mesh frames received and accepted.
160    pub mesh_received: std::sync::atomic::AtomicU64,
161    /// Relayless mesh frames forwarded to peers.
162    pub mesh_forwarded: std::sync::atomic::AtomicU64,
163    /// Relayless mesh frames/events dropped due to dedupe.
164    pub mesh_dropped_duplicate: std::sync::atomic::AtomicU64,
165    /// Shared peer selector used by live retrieval; aligned with simulation strategies.
166    peer_selector: Arc<RwLock<PeerSelector>>,
167    /// Hedged dispatch policy for retrieval requests.
168    request_dispatch: RequestDispatchConfig,
169    /// Retrieval timeout for quote negotiation and single-peer fetches.
170    request_timeout: Duration,
171    /// Shared Cashu quote negotiation policy/state.
172    cashu_quotes: Arc<CashuQuoteState>,
173    /// Optional local buses such as multicast or BLE that carry signed Nostr
174    /// envelopes for nearby/offline peers.
175    local_buses: RwLock<Vec<SharedLocalNostrBus>>,
176}
177const SEEN_FRAME_CAP: usize = 4096;
178const SEEN_FRAME_TTL: Duration = Duration::from_secs(120);
179const SEEN_EVENT_CAP: usize = 8192;
180const SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
181
182type PendingRequestsMap = Arc<Mutex<HashMap<String, PendingRequest>>>;
183type ConnectedPeer = (
184    String,
185    PendingRequestsMap,
186    Arc<webrtc::data_channel::RTCDataChannel>,
187);
188type ConnectedSession = (String, MeshPeer, PeerTransport);
189type SharedProductionRouter = MeshRouter<RouterSignalingBridge, SharedRouterPeerFactory>;
190
191async fn remember_peer_signal_path(state: &WebRTCState, peer_id: &str, source: &str) {
192    if let Some(entry) = state.peers.write().await.get_mut(peer_id) {
193        entry
194            .signal_paths
195            .insert(PeerSignalPath::from_source_name(source));
196    }
197}
198
199#[derive(Clone)]
200struct RouterSignalingBridge {
201    peer_id: String,
202    pubkey: String,
203    signaling_tx: mpsc::Sender<SignalingMessage>,
204}
205
206impl RouterSignalingBridge {
207    fn new(peer_id: String, pubkey: String, signaling_tx: mpsc::Sender<SignalingMessage>) -> Self {
208        Self {
209            peer_id,
210            pubkey,
211            signaling_tx,
212        }
213    }
214}
215
216#[async_trait]
217impl SharedSignalingTransport for RouterSignalingBridge {
218    async fn connect(&self, _relays: &[String]) -> Result<(), SharedTransportError> {
219        Ok(())
220    }
221
222    async fn disconnect(&self) {}
223
224    async fn publish(&self, msg: SignalingMessage) -> Result<(), SharedTransportError> {
225        self.signaling_tx
226            .send(msg)
227            .await
228            .map_err(|e| SharedTransportError::SendFailed(e.to_string()))
229    }
230
231    async fn recv(&self) -> Option<SignalingMessage> {
232        None
233    }
234
235    fn try_recv(&self) -> Option<SignalingMessage> {
236        None
237    }
238
239    fn peer_id(&self) -> &str {
240        &self.peer_id
241    }
242
243    fn pubkey(&self) -> &str {
244        &self.pubkey
245    }
246}
247
248struct SharedRouterPeerLink {
249    peer: Arc<Peer>,
250}
251
252#[async_trait]
253impl SharedPeerLink for SharedRouterPeerLink {
254    async fn send(&self, data: Vec<u8>) -> Result<(), SharedTransportError> {
255        let dc = self
256            .peer
257            .data_channel
258            .lock()
259            .await
260            .as_ref()
261            .cloned()
262            .ok_or(SharedTransportError::NotConnected)?;
263        dc.send(&bytes::Bytes::from(data))
264            .await
265            .map(|_| ())
266            .map_err(|e| SharedTransportError::SendFailed(e.to_string()))
267    }
268
269    async fn recv(&self) -> Option<Vec<u8>> {
270        None
271    }
272
273    fn try_recv(&self) -> Option<Vec<u8>> {
274        None
275    }
276
277    fn is_open(&self) -> bool {
278        self.peer.has_data_channel()
279    }
280
281    async fn close(&self) {
282        let _ = self.peer.close().await;
283    }
284}
285
286struct SharedRouterPeerFactory {
287    my_peer_id: PeerId,
288    signaling_tx: mpsc::Sender<SignalingMessage>,
289    stun_servers: Vec<String>,
290    store: Option<Arc<dyn ContentStore>>,
291    state: Arc<WebRTCState>,
292    state_event_tx: mpsc::Sender<PeerStateEvent>,
293    nostr_relay: Option<Arc<NostrRelay>>,
294    mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
295    peer_classifier: PeerClassifier,
296    peers: RwLock<HashMap<String, Arc<Peer>>>,
297}
298
299impl SharedRouterPeerFactory {
300    fn new(
301        my_peer_id: PeerId,
302        signaling_tx: mpsc::Sender<SignalingMessage>,
303        stun_servers: Vec<String>,
304        store: Option<Arc<dyn ContentStore>>,
305        state: Arc<WebRTCState>,
306        state_event_tx: mpsc::Sender<PeerStateEvent>,
307        nostr_relay: Option<Arc<NostrRelay>>,
308        mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
309        peer_classifier: PeerClassifier,
310    ) -> Self {
311        Self {
312            my_peer_id,
313            signaling_tx,
314            stun_servers,
315            store,
316            state,
317            state_event_tx,
318            nostr_relay,
319            mesh_frame_tx,
320            peer_classifier,
321            peers: RwLock::new(HashMap::new()),
322        }
323    }
324
325    async fn register_peer(&self, peer_id: PeerId, direction: PeerDirection, peer: Arc<Peer>) {
326        let peer_key = peer_id.to_string();
327        let pool = (self.peer_classifier)(&peer_id.pubkey);
328        self.peers
329            .write()
330            .await
331            .insert(peer_key.clone(), peer.clone());
332
333        let mut peers = self.state.peers.write().await;
334        peers.insert(
335            peer_key,
336            PeerEntry {
337                peer_id,
338                direction,
339                state: ConnectionState::Connecting,
340                last_seen: Instant::now(),
341                peer: Some(MeshPeer::WebRtc(peer)),
342                pool,
343                transport: PeerTransport::WebRtc,
344                signal_paths: BTreeSet::from([PeerSignalPath::Relay]),
345                bytes_sent: 0,
346                bytes_received: 0,
347            },
348        );
349    }
350
351    async fn create_peer(
352        &self,
353        peer_id: PeerId,
354        direction: PeerDirection,
355    ) -> Result<Peer, SharedTransportError> {
356        Peer::new_with_store_and_events(
357            peer_id,
358            direction,
359            self.my_peer_id.clone(),
360            self.signaling_tx.clone(),
361            self.stun_servers.clone(),
362            self.store.clone(),
363            Some(self.state_event_tx.clone()),
364            self.nostr_relay.clone(),
365            Some(self.mesh_frame_tx.clone()),
366            Some(self.state.cashu_quotes.clone()),
367        )
368        .await
369        .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))
370    }
371}
372
373#[async_trait]
374impl SharedPeerLinkFactory for SharedRouterPeerFactory {
375    async fn create_offer(
376        &self,
377        target_peer_id: &str,
378    ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
379        let target_peer = PeerId::from_string(target_peer_id).ok_or_else(|| {
380            SharedTransportError::ConnectionFailed(format!("invalid peer id {target_peer_id}"))
381        })?;
382        let peer = Arc::new(
383            self.create_peer(target_peer.clone(), PeerDirection::Outbound)
384                .await?,
385        );
386        peer.setup_handlers()
387            .await
388            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
389        let offer = peer
390            .connect()
391            .await
392            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
393        let sdp = offer
394            .get("sdp")
395            .and_then(|value| value.as_str())
396            .ok_or_else(|| {
397                SharedTransportError::ConnectionFailed("missing SDP in CLI peer offer".to_string())
398            })?
399            .to_string();
400        self.register_peer(target_peer, PeerDirection::Outbound, peer.clone())
401            .await;
402        Ok((Arc::new(SharedRouterPeerLink { peer }), sdp))
403    }
404
405    async fn accept_offer(
406        &self,
407        from_peer_id: &str,
408        offer_sdp: &str,
409    ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
410        let from_peer = PeerId::from_string(from_peer_id).ok_or_else(|| {
411            SharedTransportError::ConnectionFailed(format!("invalid peer id {from_peer_id}"))
412        })?;
413        let peer = Arc::new(
414            self.create_peer(from_peer.clone(), PeerDirection::Inbound)
415                .await?,
416        );
417        peer.setup_handlers()
418            .await
419            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
420        let answer = peer
421            .handle_offer(serde_json::json!({ "type": "offer", "sdp": offer_sdp }))
422            .await
423            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
424        let sdp = answer
425            .get("sdp")
426            .and_then(|value| value.as_str())
427            .ok_or_else(|| {
428                SharedTransportError::ConnectionFailed("missing SDP in CLI peer answer".to_string())
429            })?
430            .to_string();
431        self.register_peer(from_peer, PeerDirection::Inbound, peer.clone())
432            .await;
433        Ok((Arc::new(SharedRouterPeerLink { peer }), sdp))
434    }
435
436    async fn handle_answer(
437        &self,
438        target_peer_id: &str,
439        answer_sdp: &str,
440    ) -> Result<Arc<dyn SharedPeerLink>, SharedTransportError> {
441        let peer = self
442            .peers
443            .read()
444            .await
445            .get(target_peer_id)
446            .cloned()
447            .ok_or_else(|| {
448                SharedTransportError::ConnectionFailed(format!(
449                    "missing outbound peer for {target_peer_id}"
450                ))
451            })?;
452        peer.handle_answer(serde_json::json!({ "type": "answer", "sdp": answer_sdp }))
453            .await
454            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
455        Ok(Arc::new(SharedRouterPeerLink { peer }))
456    }
457
458    async fn handle_candidate(
459        &self,
460        peer_id: &str,
461        candidate: SharedIceCandidate,
462    ) -> Result<(), SharedTransportError> {
463        let peer = self.peers.read().await.get(peer_id).cloned();
464        if let Some(peer) = peer {
465            peer.handle_candidate(serde_json::json!({
466                "candidate": candidate.candidate,
467                "sdpMLineIndex": candidate.sdp_m_line_index,
468                "sdpMid": candidate.sdp_mid,
469            }))
470            .await
471            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
472        }
473        Ok(())
474    }
475
476    async fn remove_peer(&self, peer_id: &str) -> Result<(), SharedTransportError> {
477        self.peers.write().await.remove(peer_id);
478        Ok(())
479    }
480}
481
482impl WebRTCState {
483    pub fn new() -> Self {
484        let cfg = WebRTCConfig::default();
485        Self::new_with_routing_and_cashu(
486            cfg.request_selection_strategy,
487            cfg.request_fairness_enabled,
488            cfg.request_dispatch,
489            Duration::from_millis(cfg.message_timeout_ms),
490            CashuRoutingConfig::default(),
491            None,
492            None,
493        )
494    }
495
496    pub fn new_with_routing(
497        selection_strategy: super::types::SelectionStrategy,
498        fairness_enabled: bool,
499        request_dispatch: RequestDispatchConfig,
500    ) -> Self {
501        let cfg = WebRTCConfig::default();
502        Self::new_with_routing_and_cashu(
503            selection_strategy,
504            fairness_enabled,
505            request_dispatch,
506            Duration::from_millis(cfg.message_timeout_ms),
507            CashuRoutingConfig::default(),
508            None,
509            None,
510        )
511    }
512
513    pub fn new_with_routing_and_cashu(
514        selection_strategy: super::types::SelectionStrategy,
515        fairness_enabled: bool,
516        request_dispatch: RequestDispatchConfig,
517        request_timeout: Duration,
518        cashu_routing: CashuRoutingConfig,
519        payment_client: Option<Arc<dyn CashuPaymentClient>>,
520        mint_metadata: Option<Arc<CashuMintMetadataStore>>,
521    ) -> Self {
522        let mut selector = PeerSelector::with_strategy(selection_strategy);
523        selector.set_fairness(fairness_enabled);
524        let peer_selector = Arc::new(RwLock::new(selector));
525        let cashu_quotes = Arc::new(if let Some(mint_metadata) = mint_metadata {
526            CashuQuoteState::new_with_mint_metadata(
527                cashu_routing,
528                peer_selector.clone(),
529                payment_client,
530                mint_metadata,
531            )
532        } else {
533            CashuQuoteState::new(cashu_routing, peer_selector.clone(), payment_client)
534        });
535        Self {
536            peers: RwLock::new(HashMap::new()),
537            connected_count: std::sync::atomic::AtomicUsize::new(0),
538            bytes_sent: std::sync::atomic::AtomicU64::new(0),
539            bytes_received: std::sync::atomic::AtomicU64::new(0),
540            mesh_received: std::sync::atomic::AtomicU64::new(0),
541            mesh_forwarded: std::sync::atomic::AtomicU64::new(0),
542            mesh_dropped_duplicate: std::sync::atomic::AtomicU64::new(0),
543            peer_selector,
544            request_dispatch,
545            request_timeout,
546            cashu_quotes,
547            local_buses: RwLock::new(Vec::new()),
548        }
549    }
550
551    pub async fn set_local_buses(&self, buses: Vec<SharedLocalNostrBus>) {
552        *self.local_buses.write().await = buses;
553    }
554
555    pub async fn add_local_bus(&self, bus: SharedLocalNostrBus) {
556        self.local_buses.write().await.push(bus);
557    }
558
559    pub async fn set_multicast_bus(&self, bus: Option<Arc<MulticastNostrBus>>) {
560        let buses = bus
561            .into_iter()
562            .map(|bus| bus as SharedLocalNostrBus)
563            .collect();
564        self.set_local_buses(buses).await;
565    }
566
567    /// Drop all live peer sessions and clear topology-specific state while
568    /// keeping cumulative bandwidth counters intact.
569    pub async fn reset_runtime_state(&self) {
570        self.set_local_buses(Vec::new()).await;
571        let peers = {
572            let mut peers = self.peers.write().await;
573            std::mem::take(&mut *peers)
574        };
575        self.connected_count
576            .store(0, std::sync::atomic::Ordering::Relaxed);
577        for entry in peers.into_values() {
578            if let Some(peer) = entry.peer {
579                let _ = peer.close().await;
580            }
581        }
582    }
583
584    /// Get current bandwidth stats (bytes sent/received)
585    pub fn get_bandwidth(&self) -> (u64, u64) {
586        (
587            self.bytes_sent.load(std::sync::atomic::Ordering::Relaxed),
588            self.bytes_received
589                .load(std::sync::atomic::Ordering::Relaxed),
590        )
591    }
592
593    pub fn get_mesh_stats(&self) -> (u64, u64, u64) {
594        (
595            self.mesh_received
596                .load(std::sync::atomic::Ordering::Relaxed),
597            self.mesh_forwarded
598                .load(std::sync::atomic::Ordering::Relaxed),
599            self.mesh_dropped_duplicate
600                .load(std::sync::atomic::Ordering::Relaxed),
601        )
602    }
603
604    pub fn record_mesh_received(&self) {
605        self.mesh_received
606            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
607    }
608
609    pub fn record_mesh_forwarded(&self, count: u64) {
610        self.mesh_forwarded
611            .fetch_add(count, std::sync::atomic::Ordering::Relaxed);
612    }
613
614    pub fn record_mesh_duplicate_drop(&self) {
615        self.mesh_dropped_duplicate
616            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
617    }
618
619    /// Record bytes sent (global + per-peer)
620    pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
621        self.bytes_sent
622            .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
623        if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
624            entry.bytes_sent += bytes;
625        }
626    }
627
628    /// Record bytes received (global + per-peer)
629    pub async fn record_received(&self, peer_id: &str, bytes: u64) {
630        self.bytes_received
631            .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
632        if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
633            entry.bytes_received += bytes;
634        }
635    }
636
637    /// Request content by hash from connected peers
638    /// Queries peers in adaptive selector order with hedged fanout waves.
639    /// Returns the first successful response, or None if no peer has it
640    pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
641        self.request_from_peers_with_source(hash_hex)
642            .await
643            .map(|(data, _peer_id)| data)
644    }
645
646    /// Request content by hash from connected peers, returning data and source peer.
647    pub async fn request_from_peers_with_source(
648        &self,
649        hash_hex: &str,
650    ) -> Option<(Vec<u8>, String)> {
651        use super::types::BLOB_REQUEST_POLICY;
652
653        let peers = self.peers.read().await;
654
655        let peer_refs: Vec<_> = peers
656            .values()
657            .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
658            .filter_map(|p| {
659                p.peer
660                    .clone()
661                    .map(|peer| (p.peer_id.to_string(), peer, p.transport))
662            })
663            .collect();
664
665        drop(peers); // Release the read lock
666
667        let mut connected_peers: Vec<ConnectedPeer> = Vec::new();
668        let mut connected_sessions: Vec<ConnectedSession> = Vec::new();
669        for (peer_id, peer, transport) in peer_refs {
670            if !peer.is_ready() {
671                continue;
672            }
673            if bluetooth_nostr_only_mode() && transport == PeerTransport::Bluetooth {
674                continue;
675            }
676            if let Some(webrtc_peer) = peer.as_webrtc() {
677                let dc_guard = webrtc_peer.data_channel.lock().await;
678                if let Some(dc) = dc_guard.as_ref() {
679                    connected_peers.push((
680                        peer_id.clone(),
681                        webrtc_peer.pending_requests.clone(),
682                        dc.clone(),
683                    ));
684                }
685            }
686            connected_sessions.push((peer_id, peer, transport));
687        }
688
689        if connected_sessions.is_empty() {
690            debug!(
691                "No connected peers to query for {}",
692                &hash_hex[..8.min(hash_hex.len())]
693            );
694            return None;
695        }
696
697        // Convert hex to binary hash once
698        let hash_bytes = match hex::decode(hash_hex) {
699            Ok(b) => b,
700            Err(_) => return None,
701        };
702
703        let expected_hash: [u8; 32] = match hash_bytes.as_slice().try_into() {
704            Ok(h) => h,
705            Err(_) => {
706                debug!(
707                    "Invalid hash length {}, expected 32 bytes",
708                    hash_bytes.len()
709                );
710                return None;
711            }
712        };
713
714        let connected_peer_ids: Vec<String> = connected_sessions
715            .iter()
716            .map(|(peer_id, _, _)| peer_id.clone())
717            .collect();
718        sync_selector_peers(self.peer_selector.as_ref(), &connected_peer_ids).await;
719
720        let ordered_peer_ids = self.peer_selector.write().await.select_peers();
721        let mut quote_by_peer: HashMap<
722            String,
723            (
724                PendingRequestsMap,
725                Arc<webrtc::data_channel::RTCDataChannel>,
726            ),
727        > = connected_peers
728            .iter()
729            .cloned()
730            .map(|(peer_id, pending, dc)| (peer_id, (pending, dc)))
731            .collect();
732        let mut ordered_quote_peers: Vec<ConnectedPeer> = Vec::new();
733        for peer_id in &ordered_peer_ids {
734            if let Some((pending, dc)) = quote_by_peer.remove(peer_id) {
735                ordered_quote_peers.push((peer_id.clone(), pending, dc));
736            }
737        }
738        for (peer_id, (pending, dc)) in quote_by_peer {
739            ordered_quote_peers.push((peer_id, pending, dc));
740        }
741
742        let mut by_peer: HashMap<String, (MeshPeer, PeerTransport)> = connected_sessions
743            .into_iter()
744            .map(|(peer_id, peer, transport)| (peer_id, (peer, transport)))
745            .collect();
746
747        let mut ordered_peers: Vec<ConnectedSession> = Vec::new();
748        for peer_id in ordered_peer_ids {
749            if let Some((peer, transport)) = by_peer.remove(&peer_id) {
750                ordered_peers.push((peer_id, peer, transport));
751            }
752        }
753        for (peer_id, (peer, transport)) in by_peer {
754            ordered_peers.push((peer_id, peer, transport));
755        }
756
757        debug!(
758            "Querying {} peers for {} with shared hedged scheduler",
759            ordered_peers.len(),
760            &hash_hex[..8.min(hash_hex.len())],
761        );
762
763        if let Some((requested_mint, payment_sat, quote_ttl_ms)) =
764            self.cashu_quotes.requester_quote_terms().await
765        {
766            if let Some(quote) = self
767                .request_quote_from_peers(
768                    &hash_bytes,
769                    requested_mint,
770                    payment_sat,
771                    quote_ttl_ms,
772                    &ordered_quote_peers,
773                )
774                .await
775            {
776                if let Some(data) = self
777                    .request_from_single_peer(
778                        hash_hex,
779                        &hash_bytes,
780                        expected_hash,
781                        &quote.peer_id,
782                        Some(&quote),
783                        &ordered_quote_peers,
784                    )
785                    .await
786                {
787                    debug!(
788                        "Got quoted response from peer {} for {}",
789                        quote.peer_id,
790                        &hash_hex[..8.min(hash_hex.len())]
791                    );
792                    return Some((data, quote.peer_id));
793                }
794            }
795        }
796
797        let request = DataRequest {
798            h: hash_bytes.clone(),
799            htl: BLOB_REQUEST_POLICY.max_htl,
800            q: None,
801        };
802        let wire = match encode_request(&request) {
803            Ok(w) => w,
804            Err(_) => return None,
805        };
806        let wire_len = wire.len() as u64;
807        let current_result_rx = Arc::new(Mutex::new(None));
808        if let Some((data, peer_id)) = run_hedged_waves(
809            ordered_peers.len(),
810            self.request_dispatch,
811            self.request_timeout,
812            |range| {
813                let wave_peers = ordered_peers[range].to_vec();
814                let (result_tx, result_rx) =
815                    mpsc::channel::<(String, Instant, Result<Option<Vec<u8>>>)>(wave_peers.len());
816                let current_result_rx = current_result_rx.clone();
817                let hash_hex = hash_hex.to_string();
818                async move {
819                    *current_result_rx.lock().await = Some(result_rx);
820                    let sent = wave_peers.len();
821                    for (peer_id, peer, transport) in wave_peers {
822                        if transport != PeerTransport::Bluetooth {
823                            self.record_sent(&peer_id, wire_len).await;
824                        }
825                        self.peer_selector
826                            .write()
827                            .await
828                            .record_request(&peer_id, wire_len);
829
830                        let result_tx = result_tx.clone();
831                        let peer_id_for_task = peer_id.clone();
832                        let peer = peer.clone();
833                        let hash_hex = hash_hex.clone();
834                        let per_request_timeout = self.request_timeout;
835                        tokio::spawn(async move {
836                            let started = Instant::now();
837                            let result = peer.request(&hash_hex, per_request_timeout).await;
838                            let _ = result_tx.send((peer_id_for_task, started, result)).await;
839                        });
840                    }
841                    drop(result_tx);
842                    sent
843                }
844            },
845            |wait| {
846                let current_result_rx = current_result_rx.clone();
847                async move {
848                    let mut current_result_rx = current_result_rx.lock().await;
849                    let Some(result_rx) = current_result_rx.as_mut() else {
850                        return HedgedWaveAction::Abort;
851                    };
852                    let deadline = Instant::now() + wait;
853                    loop {
854                        let now = Instant::now();
855                        if now >= deadline {
856                            return HedgedWaveAction::Continue;
857                        }
858                        let remaining = deadline.saturating_duration_since(now);
859                        match tokio::time::timeout(remaining, result_rx.recv()).await {
860                            Ok(Some((peer_id, started, Ok(Some(data))))) => {
861                                let rtt_ms = started.elapsed().as_millis() as u64;
862                                if hashtree_core::sha256(&data) == expected_hash {
863                                    let should_record = {
864                                        let peers = self.peers.read().await;
865                                        peers
866                                            .get(&peer_id)
867                                            .map(|entry| {
868                                                entry.transport != PeerTransport::Bluetooth
869                                            })
870                                            .unwrap_or(true)
871                                    };
872                                    if should_record {
873                                        self.record_received(&peer_id, data.len() as u64).await;
874                                    }
875                                    self.peer_selector.write().await.record_success(
876                                        &peer_id,
877                                        rtt_ms,
878                                        data.len() as u64,
879                                    );
880                                    return HedgedWaveAction::Success((data, peer_id));
881                                }
882                                self.peer_selector.write().await.record_failure(&peer_id);
883                            }
884                            Ok(Some((peer_id, _, Ok(None)))) | Ok(Some((peer_id, _, Err(_)))) => {
885                                self.peer_selector.write().await.record_timeout(&peer_id);
886                            }
887                            Ok(None) | Err(_) => return HedgedWaveAction::Continue,
888                        }
889                    }
890                }
891            },
892        )
893        .await
894        {
895            debug!(
896                "Got response from peer {} for {}",
897                peer_id,
898                &hash_hex[..8.min(hash_hex.len())]
899            );
900            return Some((data, peer_id));
901        }
902
903        debug!(
904            "No peer had data for {}",
905            &hash_hex[..8.min(hash_hex.len())]
906        );
907        None
908    }
909
910    async fn request_quote_from_peers(
911        &self,
912        hash_bytes: &[u8],
913        requested_mint: String,
914        payment_sat: u64,
915        quote_ttl_ms: u32,
916        ordered_peers: &[ConnectedPeer],
917    ) -> Option<NegotiatedQuote> {
918        if ordered_peers.is_empty() || quote_ttl_ms == 0 {
919            return None;
920        }
921
922        let hash_hex = hex::encode(hash_bytes);
923        let rx = self
924            .cashu_quotes
925            .register_pending_quote(hash_hex.clone(), Some(requested_mint.clone()), payment_sat)
926            .await;
927        let quote_request = DataQuoteRequest {
928            h: hash_bytes.to_vec(),
929            p: payment_sat,
930            t: quote_ttl_ms,
931            m: Some(requested_mint),
932        };
933        let wire = match encode_quote_request(&quote_request) {
934            Ok(wire) => wire,
935            Err(_) => {
936                self.cashu_quotes.clear_pending_quote(&hash_hex).await;
937                return None;
938            }
939        };
940        let rx = Arc::new(Mutex::new(rx));
941        let result = run_hedged_waves(
942            ordered_peers.len(),
943            self.request_dispatch,
944            self.request_timeout,
945            |range| {
946                let wave_peers = ordered_peers[range].to_vec();
947                let wire = wire.clone();
948                async move {
949                    let mut sent = 0usize;
950                    for (_, _, dc) in wave_peers {
951                        if dc.send(&bytes::Bytes::copy_from_slice(&wire)).await.is_ok() {
952                            sent += 1;
953                        }
954                    }
955                    sent
956                }
957            },
958            |wait| {
959                let rx = rx.clone();
960                async move {
961                    let mut rx = rx.lock().await;
962                    match tokio::time::timeout(wait, &mut *rx).await {
963                        Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
964                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
965                        Err(_) => HedgedWaveAction::Continue,
966                    }
967                }
968            },
969        )
970        .await;
971
972        self.cashu_quotes.clear_pending_quote(&hash_hex).await;
973        result
974    }
975
976    async fn request_from_single_peer(
977        &self,
978        hash_hex: &str,
979        hash_bytes: &[u8],
980        expected_hash: [u8; 32],
981        target_peer_id: &str,
982        quote: Option<&NegotiatedQuote>,
983        ordered_peers: &[ConnectedPeer],
984    ) -> Option<Vec<u8>> {
985        use super::types::BLOB_REQUEST_POLICY;
986
987        let (pending_requests, dc) = ordered_peers
988            .iter()
989            .find(|(peer_id, _, _)| peer_id == target_peer_id)
990            .map(|(_, pending_requests, dc)| (pending_requests.clone(), dc.clone()))?;
991
992        let request = DataRequest {
993            h: hash_bytes.to_vec(),
994            htl: BLOB_REQUEST_POLICY.max_htl,
995            q: quote.map(|quote| quote.quote_id),
996        };
997        let wire = encode_request(&request).ok()?;
998        let wire_len = wire.len() as u64;
999        let sent_at = Instant::now();
1000        let (tx, mut rx) = tokio::sync::oneshot::channel();
1001
1002        {
1003            let mut pending = pending_requests.lock().await;
1004            pending.insert(
1005                hash_hex.to_string(),
1006                if let Some(quote) = quote {
1007                    PendingRequest::quoted(
1008                        hash_bytes.to_vec(),
1009                        tx,
1010                        quote.quote_id,
1011                        quote.mint_url.clone().unwrap_or_default(),
1012                        quote.payment_sat,
1013                    )
1014                } else {
1015                    PendingRequest::standard(hash_bytes.to_vec(), tx)
1016                },
1017            );
1018        }
1019
1020        if dc
1021            .send(&bytes::Bytes::copy_from_slice(&wire))
1022            .await
1023            .is_err()
1024        {
1025            let mut pending = pending_requests.lock().await;
1026            pending.remove(hash_hex);
1027            self.peer_selector
1028                .write()
1029                .await
1030                .record_failure(target_peer_id);
1031            return None;
1032        }
1033
1034        self.record_sent(target_peer_id, wire_len).await;
1035        self.peer_selector
1036            .write()
1037            .await
1038            .record_request(target_peer_id, wire_len);
1039
1040        let wait_timeout = if let Some(quote) = quote {
1041            let multiplier = quote.payment_sat.clamp(1, 32) as u128;
1042            let extra_ms = self
1043                .cashu_quotes
1044                .settlement_timeout()
1045                .as_millis()
1046                .saturating_mul(multiplier);
1047            self.request_timeout + Duration::from_millis(extra_ms.min(u64::MAX as u128) as u64)
1048        } else {
1049            self.request_timeout
1050        };
1051
1052        match tokio::time::timeout(wait_timeout, &mut rx).await {
1053            Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == expected_hash => {
1054                let rtt_ms = sent_at.elapsed().as_millis() as u64;
1055                self.record_received(target_peer_id, data.len() as u64)
1056                    .await;
1057                self.peer_selector.write().await.record_success(
1058                    target_peer_id,
1059                    rtt_ms,
1060                    data.len() as u64,
1061                );
1062                Some(data)
1063            }
1064            Ok(Ok(Some(_))) => {
1065                self.peer_selector
1066                    .write()
1067                    .await
1068                    .record_failure(target_peer_id);
1069                let pending = pending_requests.lock().await.remove(hash_hex);
1070                if let Some(pending) = pending {
1071                    if let Some(quoted) = pending.quoted {
1072                        if let Some(in_flight) = quoted.in_flight_payment {
1073                            let _ = self
1074                                .cashu_quotes
1075                                .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
1076                                .await;
1077                        }
1078                    }
1079                }
1080                None
1081            }
1082            Ok(Ok(None)) | Ok(Err(_)) | Err(_) => {
1083                let pending = pending_requests.lock().await.remove(hash_hex);
1084                if let Some(pending) = pending {
1085                    if let Some(quoted) = pending.quoted {
1086                        if let Some(in_flight) = quoted.in_flight_payment {
1087                            let _ = self
1088                                .cashu_quotes
1089                                .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
1090                                .await;
1091                        }
1092                    }
1093                }
1094                self.peer_selector
1095                    .write()
1096                    .await
1097                    .record_timeout(target_peer_id);
1098                None
1099            }
1100        }
1101    }
1102
1103    /// Resolve a hashtree root event through connected peers using Nostr REQ/EOSE over WebRTC.
1104    pub async fn resolve_root_from_peers(
1105        &self,
1106        owner_pubkey: &str,
1107        tree_name: &str,
1108        per_peer_timeout: Duration,
1109    ) -> Option<PeerRootEvent> {
1110        let filter = build_root_filter(owner_pubkey, tree_name)?;
1111
1112        let peer_refs: Vec<_> = {
1113            let peers = self.peers.read().await;
1114            peers
1115                .values()
1116                .filter(|entry| entry.state == ConnectionState::Connected)
1117                .filter(|entry| {
1118                    !bluetooth_nostr_only_mode() || entry.transport != PeerTransport::Bluetooth
1119                })
1120                .filter_map(|entry| {
1121                    let peer = entry.peer.as_ref()?;
1122                    if !peer.is_ready() {
1123                        return None;
1124                    }
1125                    Some((entry.peer_id.short(), peer.clone()))
1126                })
1127                .collect()
1128        };
1129
1130        for (peer_short, peer) in peer_refs {
1131            debug!(
1132                "Querying peer {} for root event {}/{}",
1133                peer_short, owner_pubkey, tree_name
1134            );
1135            let events = match peer
1136                .query_nostr_events(vec![filter.clone()], per_peer_timeout)
1137                .await
1138            {
1139                Ok(events) => events,
1140                Err(e) => {
1141                    debug!(
1142                        "Peer {} Nostr query failed for {}/{}: {}",
1143                        peer_short, owner_pubkey, tree_name, e
1144                    );
1145                    continue;
1146                }
1147            };
1148            debug!(
1149                "Peer {} returned {} Nostr event(s) for {}/{}",
1150                peer_short,
1151                events.len(),
1152                owner_pubkey,
1153                tree_name
1154            );
1155
1156            let latest = pick_latest_event(events.iter().filter(|event| {
1157                hashtree_event_identifier(event).as_deref() == Some(tree_name)
1158                    && is_hashtree_labeled_event(event)
1159            }));
1160            if let Some(event) = latest {
1161                if let Some(root) = root_event_from_peer(event, &peer_short, tree_name) {
1162                    debug!(
1163                        "Resolved {}/{} via peer {} event {}",
1164                        owner_pubkey,
1165                        tree_name,
1166                        peer_short,
1167                        event.id.to_hex()
1168                    );
1169                    return Some(root);
1170                }
1171            }
1172        }
1173
1174        None
1175    }
1176
1177    pub async fn resolve_root_from_local_buses_with_source(
1178        &self,
1179        owner_pubkey: &str,
1180        tree_name: &str,
1181        timeout: Duration,
1182    ) -> Option<(&'static str, PeerRootEvent)> {
1183        let buses = self.local_buses.read().await.clone();
1184        for bus in buses {
1185            if let Some(root) = bus.query_root(owner_pubkey, tree_name, timeout).await {
1186                return Some((bus.source_name(), root));
1187            }
1188        }
1189        None
1190    }
1191
1192    pub async fn resolve_root_from_local_buses(
1193        &self,
1194        owner_pubkey: &str,
1195        tree_name: &str,
1196        timeout: Duration,
1197    ) -> Option<PeerRootEvent> {
1198        self.resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
1199            .await
1200            .map(|(_, root)| root)
1201    }
1202
1203    pub async fn resolve_root_from_multicast(
1204        &self,
1205        owner_pubkey: &str,
1206        tree_name: &str,
1207        timeout: Duration,
1208    ) -> Option<PeerRootEvent> {
1209        self.resolve_root_from_local_buses(owner_pubkey, tree_name, timeout)
1210            .await
1211    }
1212}
1213
1214impl Default for WebRTCState {
1215    fn default() -> Self {
1216        Self::new()
1217    }
1218}
1219
1220pub type PeerRouterState = WebRTCState;
1221
1222/// Native peer router handles peer discovery and transport fan-out.
1223pub struct WebRTCManager {
1224    config: WebRTCConfig,
1225    my_peer_id: PeerId,
1226    keys: Keys,
1227    state: Arc<WebRTCState>,
1228    shutdown: Arc<tokio::sync::watch::Sender<bool>>,
1229    shutdown_rx: tokio::sync::watch::Receiver<bool>,
1230    /// Channel to send signaling messages to relays
1231    signaling_tx: mpsc::Sender<SignalingMessage>,
1232    signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
1233    /// Optional content store for serving hash requests
1234    store: Option<Arc<dyn ContentStore>>,
1235    /// Peer classifier for pool assignment
1236    peer_classifier: PeerClassifier,
1237    /// Optional Nostr relay for data-channel relay messages
1238    nostr_relay: Option<Arc<NostrRelay>>,
1239    local_buses: Vec<SharedLocalNostrBus>,
1240    /// Channel for peer state events (connection success/failure)
1241    state_event_tx: mpsc::Sender<PeerStateEvent>,
1242    state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
1243    /// Channel for relayless mesh signaling frames received from peers.
1244    mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
1245    mesh_frame_rx: Option<mpsc::Receiver<(PeerId, MeshNostrFrame)>>,
1246    shared_router: Option<Arc<SharedProductionRouter>>,
1247    seen_frame_ids: Arc<Mutex<TimedSeenSet>>,
1248    seen_event_ids: Arc<Mutex<TimedSeenSet>>,
1249}
1250
1251impl WebRTCManager {
1252    /// Create a new WebRTC manager
1253    pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
1254        let pubkey = keys.public_key().to_hex();
1255        let my_peer_id = PeerId::new(pubkey, None);
1256        let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
1257        let (signaling_tx, signaling_rx) = mpsc::channel(100);
1258        let (state_event_tx, state_event_rx) = mpsc::channel(100);
1259        let (mesh_frame_tx, mesh_frame_rx) = mpsc::channel(256);
1260        let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1261            config.request_selection_strategy,
1262            config.request_fairness_enabled,
1263            config.request_dispatch,
1264            Duration::from_millis(config.message_timeout_ms),
1265            CashuRoutingConfig::default(),
1266            None,
1267            None,
1268        ));
1269
1270        // Default classifier: all peers go to 'other' pool
1271        let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
1272
1273        Self {
1274            config,
1275            my_peer_id,
1276            keys,
1277            state,
1278            shutdown: Arc::new(shutdown),
1279            shutdown_rx,
1280            signaling_tx,
1281            signaling_rx: Some(signaling_rx),
1282            store: None,
1283            peer_classifier,
1284            nostr_relay: None,
1285            local_buses: Vec::new(),
1286            state_event_tx,
1287            state_event_rx: Some(state_event_rx),
1288            mesh_frame_tx,
1289            mesh_frame_rx: Some(mesh_frame_rx),
1290            shared_router: None,
1291            seen_frame_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1292                SEEN_FRAME_CAP,
1293                SEEN_FRAME_TTL,
1294            ))),
1295            seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1296                SEEN_EVENT_CAP,
1297                SEEN_EVENT_TTL,
1298            ))),
1299        }
1300    }
1301
1302    /// Create a new WebRTC manager reusing an existing shared state object.
1303    pub fn new_with_state(keys: Keys, config: WebRTCConfig, state: Arc<WebRTCState>) -> Self {
1304        let mut manager = Self::new(keys, config);
1305        manager.state = state;
1306        manager
1307    }
1308
1309    /// Create a new WebRTC manager with a peer classifier
1310    pub fn new_with_classifier(
1311        keys: Keys,
1312        config: WebRTCConfig,
1313        classifier: PeerClassifier,
1314    ) -> Self {
1315        let mut manager = Self::new(keys, config);
1316        manager.peer_classifier = classifier;
1317        manager
1318    }
1319
1320    /// Create a new WebRTC manager with a content store for serving hash requests
1321    pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
1322        let mut manager = Self::new(keys, config);
1323        manager.store = Some(store);
1324        manager
1325    }
1326
1327    /// Create a new WebRTC manager with store and classifier
1328    pub fn new_with_store_and_classifier(
1329        keys: Keys,
1330        config: WebRTCConfig,
1331        store: Arc<dyn ContentStore>,
1332        classifier: PeerClassifier,
1333    ) -> Self {
1334        Self::new_with_store_and_classifier_and_cashu(
1335            keys,
1336            config,
1337            store,
1338            classifier,
1339            CashuRoutingConfig::default(),
1340            None,
1341            None,
1342        )
1343    }
1344
1345    pub fn new_with_state_and_store_and_classifier(
1346        keys: Keys,
1347        config: WebRTCConfig,
1348        state: Arc<WebRTCState>,
1349        store: Arc<dyn ContentStore>,
1350        classifier: PeerClassifier,
1351    ) -> Self {
1352        let mut manager = Self::new_with_state(keys, config, state);
1353        manager.store = Some(store);
1354        manager.peer_classifier = classifier;
1355        manager
1356    }
1357
1358    pub fn new_with_store_and_classifier_and_cashu(
1359        keys: Keys,
1360        config: WebRTCConfig,
1361        store: Arc<dyn ContentStore>,
1362        classifier: PeerClassifier,
1363        cashu_routing: CashuRoutingConfig,
1364        payment_client: Option<Arc<dyn CashuPaymentClient>>,
1365        mint_metadata: Option<Arc<CashuMintMetadataStore>>,
1366    ) -> Self {
1367        let mut manager = Self::new(keys, config);
1368        manager.state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1369            manager.config.request_selection_strategy,
1370            manager.config.request_fairness_enabled,
1371            manager.config.request_dispatch,
1372            Duration::from_millis(manager.config.message_timeout_ms),
1373            cashu_routing,
1374            payment_client,
1375            mint_metadata,
1376        ));
1377        manager.store = Some(store);
1378        manager.peer_classifier = classifier;
1379        manager
1380    }
1381
1382    /// Set the content store for serving hash requests
1383    pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
1384        self.store = Some(store);
1385    }
1386
1387    /// Set the peer classifier
1388    pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
1389        self.peer_classifier = classifier;
1390    }
1391
1392    /// Set the Nostr relay for data-channel relay messages
1393    pub fn set_nostr_relay(&mut self, relay: Arc<NostrRelay>) {
1394        self.nostr_relay = Some(relay);
1395    }
1396
1397    /// Get my peer ID
1398    pub fn my_peer_id(&self) -> &PeerId {
1399        &self.my_peer_id
1400    }
1401
1402    /// Get shared state for external access
1403    pub fn state(&self) -> Arc<WebRTCState> {
1404        self.state.clone()
1405    }
1406
1407    /// Cloneable shutdown handle for external lifecycle control.
1408    pub fn shutdown_signal(&self) -> Arc<tokio::sync::watch::Sender<bool>> {
1409        self.shutdown.clone()
1410    }
1411
1412    /// Signal shutdown
1413    pub fn shutdown(&self) {
1414        let _ = self.shutdown.send(true);
1415    }
1416
1417    /// Get connected peer count
1418    pub async fn connected_count(&self) -> usize {
1419        self.state
1420            .connected_count
1421            .load(std::sync::atomic::Ordering::Relaxed)
1422    }
1423
1424    /// Get all peer statuses
1425    pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
1426        self.state
1427            .peers
1428            .read()
1429            .await
1430            .values()
1431            .map(|p| PeerStatus {
1432                peer_id: p.peer_id.to_string(),
1433                pubkey: p.peer_id.pubkey.clone(),
1434                state: p.state.to_string(),
1435                direction: p.direction,
1436                connected_at: Some(p.last_seen),
1437                pool: p.pool,
1438            })
1439            .collect()
1440    }
1441
1442    /// Get pool counts
1443    /// Returns (follows_connected, follows_active, other_connected, other_active)
1444    /// "active" = Connected or Connecting (excludes Discovered and Failed)
1445    pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
1446        let peers = self.state.peers.read().await;
1447        let mut follows_connected = 0;
1448        let mut follows_active = 0;
1449        let mut other_connected = 0;
1450        let mut other_active = 0;
1451
1452        for entry in peers.values() {
1453            // Only count Connected or Connecting as "active" connections
1454            // Discovered peers are just seen hellos, not real connections
1455            let is_active = entry.state == ConnectionState::Connected
1456                || entry.state == ConnectionState::Connecting;
1457
1458            match entry.pool {
1459                PeerPool::Follows => {
1460                    if is_active {
1461                        follows_active += 1;
1462                    }
1463                    if entry.state == ConnectionState::Connected {
1464                        follows_connected += 1;
1465                    }
1466                }
1467                PeerPool::Other => {
1468                    if is_active {
1469                        other_active += 1;
1470                    }
1471                    if entry.state == ConnectionState::Connected {
1472                        other_connected += 1;
1473                    }
1474                }
1475            }
1476        }
1477
1478        (
1479            follows_connected,
1480            follows_active,
1481            other_connected,
1482            other_active,
1483        )
1484    }
1485
1486    fn local_hello_message(&self) -> SignalingMessage {
1487        SignalingMessage::Hello {
1488            peer_id: self.my_peer_id.to_string(),
1489            roots: Vec::new(),
1490        }
1491    }
1492
1493    fn local_bus_max_peers(&self, source: &str) -> Option<usize> {
1494        match source {
1495            "multicast" => Some(self.config.multicast.max_peers),
1496            WIFI_AWARE_SOURCE => Some(self.config.wifi_aware.max_peers),
1497            _ => None,
1498        }
1499    }
1500
1501    fn can_track_local_bus_peer(
1502        &self,
1503        source: &str,
1504        peer_key: &str,
1505        peers: &HashMap<String, PeerEntry>,
1506    ) -> bool {
1507        let Some(max_peers) = self.local_bus_max_peers(source) else {
1508            return true;
1509        };
1510        if peers.contains_key(peer_key) {
1511            return true;
1512        }
1513        if max_peers == 0 {
1514            return false;
1515        }
1516        let signal_path = PeerSignalPath::from_source_name(source);
1517        peers
1518            .values()
1519            .filter(|entry| {
1520                entry.signal_paths.contains(&signal_path) && entry.state != ConnectionState::Failed
1521            })
1522            .count()
1523            < max_peers
1524    }
1525
1526    /// Start the native peer router - connects transports and handles signaling.
1527    pub async fn run(&mut self) -> Result<()> {
1528        info!(
1529            "Starting peer router with peer ID: {}",
1530            self.my_peer_id.short()
1531        );
1532
1533        let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr::Event)>(100);
1534
1535        // Take the signaling receiver
1536        let mut signaling_rx = self
1537            .signaling_rx
1538            .take()
1539            .expect("signaling_rx already taken");
1540
1541        // Take the state event receiver
1542        let mut state_event_rx = self
1543            .state_event_rx
1544            .take()
1545            .expect("state_event_rx already taken");
1546        let mut mesh_frame_rx = self
1547            .mesh_frame_rx
1548            .take()
1549            .expect("mesh_frame_rx already taken");
1550
1551        if self.config.bluetooth.is_enabled() {
1552            let bluetooth = BluetoothMesh::new(self.config.bluetooth.clone());
1553            let context = BluetoothRuntimeContext {
1554                my_peer_id: self.my_peer_id.clone(),
1555                store: if bluetooth_nostr_only_mode() {
1556                    None
1557                } else {
1558                    self.store.clone()
1559                },
1560                nostr_relay: self.nostr_relay.clone(),
1561                mesh_frame_tx: self.mesh_frame_tx.clone(),
1562                registrar: BluetoothPeerRegistrar::new(
1563                    self.state.clone(),
1564                    self.peer_classifier.clone(),
1565                    self.config.pools.clone(),
1566                    self.config.bluetooth.max_peers,
1567                ),
1568            };
1569            let _ = bluetooth.start(context).await;
1570        }
1571
1572        // Create a shared write channel for all relay tasks
1573        let (relay_write_tx, _) = tokio::sync::broadcast::channel::<SignalingMessage>(100);
1574
1575        // Spawn relay connections
1576        for relay_url in &self.config.relays {
1577            let url = relay_url.clone();
1578            let event_tx = event_tx.clone();
1579            let shutdown_rx = self.shutdown_rx.clone();
1580            let keys = self.keys.clone();
1581            let relay_write_rx = relay_write_tx.subscribe();
1582
1583            tokio::spawn(async move {
1584                if let Err(e) =
1585                    Self::relay_task(url.clone(), event_tx, shutdown_rx, keys, relay_write_rx).await
1586                {
1587                    error!("Relay {} error: {}", url, e);
1588                }
1589            });
1590        }
1591
1592        if self.config.multicast.is_enabled() {
1593            if let Some(relay) = self.nostr_relay.clone() {
1594                match MulticastNostrBus::bind(
1595                    self.config.multicast.clone(),
1596                    self.keys.clone(),
1597                    relay,
1598                )
1599                .await
1600                {
1601                    Ok(bus) => {
1602                        let local_bus: SharedLocalNostrBus = bus.clone();
1603                        self.state.add_local_bus(local_bus.clone()).await;
1604                        self.local_buses.push(local_bus);
1605                        let shutdown_rx = self.shutdown_rx.clone();
1606                        let signaling_tx = event_tx.clone();
1607                        tokio::spawn(async move {
1608                            if let Err(err) = bus.run(shutdown_rx, signaling_tx).await {
1609                                error!("Multicast bus error: {}", err);
1610                            }
1611                        });
1612                    }
1613                    Err(err) => {
1614                        warn!("Failed to start multicast bus: {}", err);
1615                    }
1616                }
1617            } else {
1618                warn!("Multicast enabled but Nostr relay is unavailable");
1619            }
1620        }
1621
1622        if self.config.wifi_aware.is_enabled() {
1623            if let Some(relay) = self.nostr_relay.clone() {
1624                if let Some(bridge) = mobile_wifi_aware_bridge() {
1625                    let bus = WifiAwareNostrBus::new(
1626                        self.config.wifi_aware.clone(),
1627                        self.keys.clone(),
1628                        relay,
1629                        bridge,
1630                    );
1631                    let local_bus: SharedLocalNostrBus = bus.clone();
1632                    self.state.add_local_bus(local_bus.clone()).await;
1633                    self.local_buses.push(local_bus);
1634                    let shutdown_rx = self.shutdown_rx.clone();
1635                    let signaling_tx = event_tx.clone();
1636                    let local_peer_id = self.my_peer_id.to_string();
1637                    tokio::spawn(async move {
1638                        if let Err(err) = bus.run(local_peer_id, shutdown_rx, signaling_tx).await {
1639                            error!("Wi-Fi Aware bus error: {}", err);
1640                        }
1641                    });
1642                } else {
1643                    warn!("Wi-Fi Aware enabled but no mobile bridge is installed");
1644                }
1645            } else {
1646                warn!("Wi-Fi Aware enabled but Nostr relay is unavailable");
1647            }
1648        }
1649
1650        if self.config.signaling_enabled {
1651            let transport = Arc::new(RouterSignalingBridge::new(
1652                self.my_peer_id.to_string(),
1653                self.my_peer_id.pubkey.clone(),
1654                self.signaling_tx.clone(),
1655            ));
1656            let factory = Arc::new(SharedRouterPeerFactory::new(
1657                self.my_peer_id.clone(),
1658                self.signaling_tx.clone(),
1659                self.config.stun_servers.clone(),
1660                self.store.clone(),
1661                self.state.clone(),
1662                self.state_event_tx.clone(),
1663                self.nostr_relay.clone(),
1664                self.mesh_frame_tx.clone(),
1665                self.peer_classifier.clone(),
1666            ));
1667            let (classifier_tx, mut classifier_rx) = mpsc::channel::<SharedClassifyRequest>(32);
1668            let classifier = self.peer_classifier.clone();
1669            tokio::spawn(async move {
1670                while let Some(request) = classifier_rx.recv().await {
1671                    let _ = request.response.send(classifier(&request.pubkey));
1672                }
1673            });
1674
1675            let mut router = MeshRouter::new(
1676                self.my_peer_id.to_string(),
1677                self.my_peer_id.pubkey.clone(),
1678                transport,
1679                factory.clone(),
1680                self.config.pools.clone(),
1681                self.config.debug,
1682            );
1683            router.set_classifier(classifier_tx);
1684            self.shared_router = Some(Arc::new(router));
1685        }
1686
1687        // Process incoming events and outgoing signaling messages
1688        let mut shutdown_rx = self.shutdown_rx.clone();
1689        // Cleanup interval - run every 30 seconds as a fallback (not for real-time sync)
1690        let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
1691        let mut hello_ticker =
1692            tokio::time::interval(Duration::from_millis(self.config.hello_interval_ms));
1693        if self.config.signaling_enabled {
1694            if let Some(shared_router) = self.shared_router.as_ref() {
1695                let _ = shared_router.send_hello(Vec::new()).await;
1696            } else {
1697                self.dispatch_signaling_message(self.local_hello_message(), &relay_write_tx)
1698                    .await;
1699            }
1700        }
1701        loop {
1702            tokio::select! {
1703                _ = shutdown_rx.changed() => {
1704                    if *shutdown_rx.borrow() {
1705                        info!("WebRTC manager shutting down");
1706                        break;
1707                    }
1708                }
1709                Some((relay, event)) = event_rx.recv() => {
1710                    if let Err(e) = self
1711                        .handle_event(&relay, &event, self.shared_router.as_ref())
1712                        .await
1713                    {
1714                        debug!("Error handling event from {}: {}", relay, e);
1715                    }
1716                }
1717                Some(msg) = signaling_rx.recv() => {
1718                    self.dispatch_signaling_message(msg, &relay_write_tx).await;
1719                }
1720                Some(event) = state_event_rx.recv() => {
1721                    // Handle peer state events (connected, failed, disconnected)
1722                    self.handle_peer_state_event(event, &relay_write_tx).await;
1723                }
1724                Some((from_peer_id, frame)) = mesh_frame_rx.recv() => {
1725                    self.handle_mesh_frame(from_peer_id, frame).await;
1726                }
1727                _ = hello_ticker.tick(), if self.config.signaling_enabled => {
1728                    if let Some(shared_router) = self.shared_router.as_ref() {
1729                        let _ = shared_router.send_hello(Vec::new()).await;
1730                    } else {
1731                        self.dispatch_signaling_message(self.local_hello_message(), &relay_write_tx)
1732                            .await;
1733                    }
1734                }
1735                _ = cleanup_interval.tick() => {
1736                    // Periodic cleanup of stale peers and state sync (fallback)
1737                    self.cleanup_stale_peers().await;
1738                }
1739            }
1740        }
1741
1742        Ok(())
1743    }
1744
1745    /// Connect to a single relay and handle messages
1746    async fn relay_task(
1747        url: String,
1748        event_tx: mpsc::Sender<(String, nostr::Event)>,
1749        mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
1750        keys: Keys,
1751        mut signaling_rx: tokio::sync::broadcast::Receiver<SignalingMessage>,
1752    ) -> Result<()> {
1753        info!("Connecting to relay: {}", url);
1754
1755        let (ws_stream, _) = connect_async(&url).await?;
1756        let (mut write, mut read) = ws_stream.split();
1757
1758        // Subscribe to webrtc events - two filters:
1759        // 1. Hello messages: kind 25050 with #l: "hello" tag
1760        // 2. Directed messages: kind 25050 with #p tag (our pubkey)
1761        let hello_filter = Filter::new()
1762            .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
1763            .custom_tag(
1764                nostr::SingleLetterTag::lowercase(nostr::Alphabet::L),
1765                vec![HELLO_TAG],
1766            )
1767            .since(nostr::Timestamp::now() - Duration::from_secs(60));
1768
1769        let directed_filter = Filter::new()
1770            .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
1771            .custom_tag(
1772                nostr::SingleLetterTag::lowercase(nostr::Alphabet::P),
1773                vec![keys.public_key().to_hex()],
1774            )
1775            .since(nostr::Timestamp::now() - Duration::from_secs(60));
1776
1777        let sub_id = nostr::SubscriptionId::generate();
1778        let sub_msg = ClientMessage::req(sub_id.clone(), vec![hello_filter, directed_filter]);
1779        write.send(Message::Text(sub_msg.as_json())).await?;
1780
1781        info!(
1782            "Subscribed to {} for WebRTC events (kind {})",
1783            url, WEBRTC_KIND
1784        );
1785
1786        loop {
1787            tokio::select! {
1788                _ = shutdown_rx.changed() => {
1789                    if *shutdown_rx.borrow() {
1790                        break;
1791                    }
1792                }
1793                // Handle outgoing signaling messages
1794                Ok(signaling_msg) = signaling_rx.recv() => {
1795                    info!("Sending {} via {}", signaling_msg.msg_type(), url);
1796                    if let Ok(event) = Self::create_signaling_event(&keys, &signaling_msg).await {
1797                        let event_id = event.id.to_string();
1798                        let msg = ClientMessage::event(event);
1799                        if write.send(Message::Text(msg.as_json())).await.is_ok() {
1800                            info!("Sent {} to {} (event id: {})", signaling_msg.msg_type(), url, &event_id[..16]);
1801                        }
1802                    }
1803                }
1804                msg = read.next() => {
1805                    match msg {
1806                        Some(Ok(Message::Text(text))) => {
1807                            if let Ok(RelayMessage::Event { event, .. }) =
1808                                RelayMessage::from_json(&text)
1809                            {
1810                                let _ = event_tx.send((url.clone(), *event)).await;
1811                            }
1812                        }
1813                        Some(Err(e)) => {
1814                            error!("WebSocket error from {}: {}", url, e);
1815                            break;
1816                        }
1817                        None => {
1818                            warn!("WebSocket closed: {}", url);
1819                            break;
1820                        }
1821                        _ => {}
1822                    }
1823                }
1824            }
1825        }
1826
1827        Ok(())
1828    }
1829
1830    async fn mark_seen_frame_id(&self, frame_id: String) -> bool {
1831        let mut seen = self.seen_frame_ids.lock().await;
1832        seen.insert_if_new(frame_id)
1833    }
1834
1835    async fn mark_seen_event_id(&self, event_id: String) -> bool {
1836        let mut seen = self.seen_event_ids.lock().await;
1837        seen.insert_if_new(event_id)
1838    }
1839
1840    async fn dispatch_signaling_message(
1841        &self,
1842        msg: SignalingMessage,
1843        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1844    ) {
1845        if !self.config.signaling_enabled {
1846            debug!(
1847                "Skipping signaling message {} because WebRTC signaling is disabled",
1848                msg.msg_type()
1849            );
1850            return;
1851        }
1852
1853        if relay_write_tx.send(msg.clone()).is_err() {
1854            debug!(
1855                "No relay subscribers for signaling message {}",
1856                msg.msg_type()
1857            );
1858        }
1859
1860        let event = match Self::create_signaling_event(&self.keys, &msg).await {
1861            Ok(event) => event,
1862            Err(e) => {
1863                debug!("Failed to create signaling event for mesh dispatch: {}", e);
1864                return;
1865            }
1866        };
1867
1868        for bus in &self.local_buses {
1869            if let Err(err) = bus.broadcast_event(&event).await {
1870                debug!(
1871                    "Failed to broadcast signaling event over {} ({}): {}",
1872                    bus.source_name(),
1873                    msg.msg_type(),
1874                    err
1875                );
1876            }
1877        }
1878
1879        let mut frame =
1880            MeshNostrFrame::new_event(event, &self.my_peer_id.to_string(), MESH_DEFAULT_HTL);
1881        if !self.mark_seen_frame_id(frame.frame_id.clone()).await {
1882            self.state.record_mesh_duplicate_drop();
1883            return;
1884        }
1885        if !self.mark_seen_event_id(frame.event().id.to_hex()).await {
1886            self.state.record_mesh_duplicate_drop();
1887            return;
1888        }
1889
1890        // Keep the sender peer id stable even if this is forwarded later.
1891        frame.sender_peer_id = self.my_peer_id.to_string();
1892        let forwarded = self.forward_mesh_frame(&frame, None).await;
1893        if forwarded > 0 {
1894            self.state.record_mesh_forwarded(forwarded as u64);
1895        }
1896    }
1897
1898    async fn forward_mesh_frame(
1899        &self,
1900        frame: &MeshNostrFrame,
1901        exclude_peer_id: Option<&str>,
1902    ) -> usize {
1903        let peers = self.state.peers.read().await;
1904        let peer_refs: Vec<_> = peers
1905            .values()
1906            .filter(|entry| entry.state == ConnectionState::Connected)
1907            .filter(|entry| {
1908                entry
1909                    .peer
1910                    .as_ref()
1911                    .map(|peer| peer.is_ready())
1912                    .unwrap_or(false)
1913            })
1914            .filter(|entry| {
1915                exclude_peer_id
1916                    .map(|exclude| exclude != entry.peer_id.to_string())
1917                    .unwrap_or(true)
1918            })
1919            .filter_map(|entry| {
1920                entry.peer.as_ref().map(|peer| {
1921                    (
1922                        entry.peer_id.to_string(),
1923                        entry.peer_id.short(),
1924                        peer.clone(),
1925                        peer.htl_config(),
1926                    )
1927                })
1928            })
1929            .collect();
1930        drop(peers);
1931
1932        let mut forwarded = 0usize;
1933        for (_peer_key, peer_short, peer, htl_cfg) in peer_refs {
1934            let next_htl = decrement_htl_with_policy(frame.htl, &MESH_EVENT_POLICY, &htl_cfg);
1935            if !should_forward_htl(next_htl) {
1936                continue;
1937            }
1938
1939            let mut outbound = frame.clone();
1940            outbound.htl = next_htl;
1941            if peer.send_mesh_frame_text(&outbound).await.is_ok() {
1942                forwarded += 1;
1943            } else {
1944                debug!("Failed to forward mesh frame to {}", peer_short);
1945            }
1946        }
1947
1948        forwarded
1949    }
1950
1951    async fn handle_mesh_frame(&self, from_peer_id: PeerId, frame: MeshNostrFrame) {
1952        if let Err(reason) = validate_mesh_frame(&frame) {
1953            debug!(
1954                "Ignoring mesh frame from {} (invalid: {})",
1955                from_peer_id.short(),
1956                reason
1957            );
1958            return;
1959        }
1960
1961        if !self.mark_seen_frame_id(frame.frame_id.clone()).await {
1962            self.state.record_mesh_duplicate_drop();
1963            return;
1964        }
1965
1966        let event = match &frame.payload {
1967            MeshNostrPayload::Event { event } => event.clone(),
1968        };
1969
1970        if !self.mark_seen_event_id(event.id.to_hex()).await {
1971            self.state.record_mesh_duplicate_drop();
1972            return;
1973        }
1974
1975        if event.verify().is_err() {
1976            debug!(
1977                "Ignoring mesh event from {} due to invalid signature",
1978                from_peer_id.short()
1979            );
1980            return;
1981        }
1982
1983        self.state.record_mesh_received();
1984
1985        if let Err(e) = self
1986            .handle_event("mesh", &event, self.shared_router.as_ref())
1987            .await
1988        {
1989            debug!(
1990                "Error handling mesh event from {}: {}",
1991                from_peer_id.short(),
1992                e
1993            );
1994        }
1995
1996        let forwarded = self
1997            .forward_mesh_frame(&frame, Some(&from_peer_id.to_string()))
1998            .await;
1999        if forwarded > 0 {
2000            self.state.record_mesh_forwarded(forwarded as u64);
2001        }
2002    }
2003
2004    /// Create a signaling event
2005    ///
2006    /// For directed messages (offer, answer, candidate, candidates), use NIP-17 style
2007    /// gift wrapping with ephemeral keys for privacy.
2008    /// Hello messages use kind 25050 with #l: "hello" tag and peerId.
2009    async fn create_signaling_event(keys: &Keys, msg: &SignalingMessage) -> Result<nostr::Event> {
2010        encode_signaling_event(
2011            keys,
2012            msg.peer_id(),
2013            msg,
2014            Kind::Ephemeral(WEBRTC_KIND as u16),
2015        )
2016        .map_err(|e| anyhow::anyhow!(e.to_string()))
2017    }
2018
2019    /// Handle an incoming event
2020    ///
2021    /// Messages may be:
2022    /// 1. Hello messages: kind 25050 with #l: "hello" tag and peerId
2023    /// 2. Gift-wrapped directed messages: kind 25050 with #p tag, encrypted with ephemeral key
2024    async fn handle_event(
2025        &self,
2026        relay: &str,
2027        event: &nostr::Event,
2028        shared_router: Option<&Arc<SharedProductionRouter>>,
2029    ) -> Result<()> {
2030        if !self.config.signaling_enabled {
2031            return Ok(());
2032        }
2033
2034        let Some(shared_router) = shared_router else {
2035            return Ok(());
2036        };
2037
2038        let Some(msg) = decode_signaling_event(
2039            event,
2040            &self.my_peer_id.to_string(),
2041            &self.keys.public_key().to_hex(),
2042            &self.keys,
2043        ) else {
2044            return Ok(());
2045        };
2046
2047        if matches!(
2048            msg,
2049            SignalingMessage::Hello { .. } | SignalingMessage::Offer { .. }
2050        ) {
2051            let peers = self.state.peers.read().await;
2052            if !self.can_track_local_bus_peer(relay, msg.peer_id(), &peers) {
2053                return Ok(());
2054            }
2055        }
2056
2057        debug!(
2058            "Received {} from {} via {}",
2059            msg.msg_type(),
2060            msg.peer_id(),
2061            relay
2062        );
2063        let peer_id = msg.peer_id().to_string();
2064        shared_router
2065            .handle_message(msg)
2066            .await
2067            .map_err(|e| anyhow::anyhow!(e.to_string()))?;
2068        remember_peer_signal_path(self.state.as_ref(), &peer_id, relay).await;
2069
2070        Ok(())
2071    }
2072
2073    /// Handle peer state change events from peer connections
2074    async fn handle_peer_state_event(
2075        &self,
2076        event: PeerStateEvent,
2077        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
2078    ) {
2079        match event {
2080            PeerStateEvent::Connected(peer_id) => {
2081                let peer_key = peer_id.to_string();
2082                let mut emit_hello = false;
2083                let mut peers = self.state.peers.write().await;
2084                if let Some(entry) = peers.get_mut(&peer_key) {
2085                    if entry.state != ConnectionState::Connected {
2086                        info!("Peer {} connected (via state event)", peer_id.short());
2087                        entry.state = ConnectionState::Connected;
2088                        emit_hello = true;
2089                        // Update connected count
2090                        self.state
2091                            .connected_count
2092                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2093                    }
2094                }
2095                drop(peers);
2096                if emit_hello {
2097                    if let Some(shared_router) = self.shared_router.as_ref() {
2098                        let _ = shared_router.send_hello(Vec::new()).await;
2099                    } else {
2100                        self.dispatch_signaling_message(self.local_hello_message(), relay_write_tx)
2101                            .await;
2102                    }
2103                }
2104            }
2105            PeerStateEvent::Failed(peer_id) => {
2106                let peer_key = peer_id.to_string();
2107                info!(
2108                    "Peer {} connection failed - removing from pool",
2109                    peer_id.short()
2110                );
2111                let removed = {
2112                    let mut peers = self.state.peers.write().await;
2113                    peers.remove(&peer_key)
2114                };
2115                if let Some(entry) = removed {
2116                    // Decrement connected count if was connected
2117                    if entry.state == ConnectionState::Connected {
2118                        self.state
2119                            .connected_count
2120                            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
2121                    }
2122                    // Close the peer connection if it exists
2123                    if let Some(peer) = entry.peer {
2124                        let _ = peer.close().await;
2125                    }
2126                }
2127                if let Some(shared_router) = self.shared_router.as_ref() {
2128                    if let Some(channel) = shared_router.remove_peer(&peer_key).await {
2129                        channel.close().await;
2130                    }
2131                }
2132            }
2133            PeerStateEvent::Disconnected(peer_id) => {
2134                let peer_key = peer_id.to_string();
2135                info!("Peer {} disconnected - removing from pool", peer_id.short());
2136                let removed = {
2137                    let mut peers = self.state.peers.write().await;
2138                    peers.remove(&peer_key)
2139                };
2140                if let Some(entry) = removed {
2141                    // Decrement connected count if was connected
2142                    if entry.state == ConnectionState::Connected {
2143                        self.state
2144                            .connected_count
2145                            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
2146                    }
2147                    // Close the peer connection if it exists
2148                    if let Some(peer) = entry.peer {
2149                        let _ = peer.close().await;
2150                    }
2151                }
2152                if let Some(shared_router) = self.shared_router.as_ref() {
2153                    if let Some(channel) = shared_router.remove_peer(&peer_key).await {
2154                        channel.close().await;
2155                    }
2156                }
2157            }
2158        }
2159    }
2160
2161    /// Cleanup stale peers and sync connection states (fallback, runs every 30s)
2162    async fn cleanup_stale_peers(&self) {
2163        let mut peers = self.state.peers.write().await;
2164        let mut connected_count = 0;
2165        let mut to_remove = Vec::new();
2166        let stale_timeout = Duration::from_secs(60); // Remove peers stuck in Discovered/Connecting for 60s
2167
2168        for (key, entry) in peers.iter_mut() {
2169            if let Some(ref peer) = entry.peer {
2170                // Sync connected state as fallback (in case event was missed)
2171                if peer.is_connected() {
2172                    if entry.state != ConnectionState::Connected {
2173                        info!(
2174                            "Peer {} is now connected (sync fallback)",
2175                            entry.peer_id.short()
2176                        );
2177                        entry.state = ConnectionState::Connected;
2178                    }
2179                    connected_count += 1;
2180                } else if entry.state == ConnectionState::Connected {
2181                    info!(
2182                        "Removing disconnected peer {} after transport closed",
2183                        entry.peer_id.short()
2184                    );
2185                    to_remove.push(key.clone());
2186                } else if entry.state == ConnectionState::Connecting
2187                    && entry.last_seen.elapsed() > stale_timeout
2188                {
2189                    // Peer stuck in Connecting for too long - mark for removal
2190                    info!(
2191                        "Removing stale peer {} (stuck in Connecting for {:?})",
2192                        entry.peer_id.short(),
2193                        entry.last_seen.elapsed()
2194                    );
2195                    to_remove.push(key.clone());
2196                }
2197            } else if entry.state == ConnectionState::Discovered
2198                && entry.last_seen.elapsed() > stale_timeout
2199            {
2200                // Discovered peer with no actual connection - remove
2201                debug!("Removing stale discovered peer {}", entry.peer_id.short());
2202                to_remove.push(key.clone());
2203            }
2204        }
2205
2206        // Remove stale peers
2207        let mut removed_peers = Vec::new();
2208        for key in to_remove {
2209            if let Some(entry) = peers.remove(&key) {
2210                removed_peers.push(entry);
2211            }
2212        }
2213        drop(peers);
2214
2215        for entry in removed_peers {
2216            if let Some(peer) = entry.peer {
2217                let _ = peer.close().await;
2218            }
2219        }
2220
2221        self.state
2222            .connected_count
2223            .store(connected_count, std::sync::atomic::Ordering::Relaxed);
2224    }
2225}
2226
2227pub type PeerRouter = WebRTCManager;
2228
2229// Keep the old PeerState for backward compatibility with tests
2230#[allow(dead_code)]
2231#[derive(Debug, Clone)]
2232pub struct PeerState {
2233    pub peer_id: PeerId,
2234    pub direction: PeerDirection,
2235    pub state: String,
2236    pub last_seen: Instant,
2237}
2238
2239#[cfg(test)]
2240mod tests {
2241    use super::*;
2242    use crate::webrtc::root_events::PeerRootEvent;
2243    use crate::webrtc::session::TestMeshPeer;
2244    use crate::webrtc::SelectionStrategy;
2245    use anyhow::Result as AnyResult;
2246    use async_trait::async_trait;
2247    use hashtree_network::{build_hedged_wave_plan, normalize_dispatch_config};
2248    use nostr::{EventBuilder, Keys, Tag};
2249    use std::time::Duration;
2250
2251    struct TestLocalBus {
2252        source: &'static str,
2253        root: Option<PeerRootEvent>,
2254    }
2255
2256    #[async_trait]
2257    impl super::super::LocalNostrBus for TestLocalBus {
2258        fn source_name(&self) -> &'static str {
2259            self.source
2260        }
2261
2262        async fn broadcast_event(&self, _event: &nostr::Event) -> AnyResult<()> {
2263            Ok(())
2264        }
2265
2266        async fn query_root(
2267            &self,
2268            _owner_pubkey: &str,
2269            _tree_name: &str,
2270            _timeout: Duration,
2271        ) -> Option<PeerRootEvent> {
2272            self.root.clone()
2273        }
2274    }
2275
2276    #[test]
2277    fn root_event_from_peer_extracts_tags() {
2278        let keys = Keys::generate();
2279        let hash = "ab".repeat(32);
2280        let event = EventBuilder::new(
2281            Kind::Custom(super::super::root_events::HASHTREE_KIND),
2282            "",
2283            [
2284                Tag::parse(&["d", "repo"]).unwrap(),
2285                Tag::parse(&["l", super::super::root_events::HASHTREE_LABEL]).unwrap(),
2286                Tag::parse(&["hash", &hash]).unwrap(),
2287                Tag::parse(&["encryptedKey", &"11".repeat(32)]).unwrap(),
2288            ],
2289        )
2290        .to_event(&keys)
2291        .unwrap();
2292
2293        let parsed = root_event_from_peer(&event, "peer-a", "repo").unwrap();
2294        let expected_encrypted = "11".repeat(32);
2295        assert_eq!(parsed.hash, hash);
2296        assert_eq!(parsed.peer_id, "peer-a");
2297        assert_eq!(
2298            parsed.encrypted_key.as_deref(),
2299            Some(expected_encrypted.as_str())
2300        );
2301        assert!(parsed.key.is_none());
2302    }
2303
2304    #[test]
2305    fn pick_latest_event_prefers_higher_event_id_on_timestamp_tie() {
2306        let keys = Keys::generate();
2307        let created_at = nostr::Timestamp::from_secs(1_700_000_000);
2308        let event_a = EventBuilder::new(
2309            Kind::Custom(super::super::root_events::HASHTREE_KIND),
2310            "",
2311            [],
2312        )
2313        .custom_created_at(created_at)
2314        .to_event(&keys)
2315        .unwrap();
2316        let event_b = EventBuilder::new(
2317            Kind::Custom(super::super::root_events::HASHTREE_KIND),
2318            "",
2319            [],
2320        )
2321        .custom_created_at(created_at)
2322        .to_event(&keys)
2323        .unwrap();
2324
2325        let expected = if event_a.id > event_b.id {
2326            event_a.id
2327        } else {
2328            event_b.id
2329        };
2330        let picked = pick_latest_event([&event_a, &event_b]).unwrap();
2331        assert_eq!(picked.id, expected);
2332    }
2333
2334    #[tokio::test]
2335    async fn resolve_root_from_local_buses_returns_source_and_first_match() {
2336        let state = WebRTCState::new();
2337        let root = PeerRootEvent {
2338            hash: "ab".repeat(32),
2339            key: None,
2340            encrypted_key: None,
2341            self_encrypted_key: None,
2342            event_id: "event-1".to_string(),
2343            created_at: 1,
2344            peer_id: "bus-peer".to_string(),
2345        };
2346
2347        state
2348            .set_local_buses(vec![
2349                Arc::new(TestLocalBus {
2350                    source: "empty",
2351                    root: None,
2352                }),
2353                Arc::new(TestLocalBus {
2354                    source: "mock-bus",
2355                    root: Some(root.clone()),
2356                }),
2357            ])
2358            .await;
2359
2360        let resolved = state
2361            .resolve_root_from_local_buses_with_source("owner", "tree", Duration::from_millis(10))
2362            .await
2363            .expect("expected root from local bus");
2364
2365        assert_eq!(resolved.0, "mock-bus");
2366        assert_eq!(resolved.1.hash, root.hash);
2367        assert_eq!(resolved.1.peer_id, root.peer_id);
2368    }
2369
2370    #[tokio::test]
2371    async fn can_track_local_bus_peer_enforces_wifi_aware_limit() {
2372        let keys = Keys::generate();
2373        let mut config = WebRTCConfig::default();
2374        config.wifi_aware.enabled = true;
2375        config.wifi_aware.max_peers = 1;
2376        let manager = WebRTCManager::new(keys, config);
2377        let existing_peer = PeerId::new("peer-a".to_string(), Some("sess-a".to_string()));
2378        let existing_key = existing_peer.to_string();
2379        let mut peers = HashMap::new();
2380        peers.insert(
2381            existing_key.clone(),
2382            PeerEntry {
2383                peer_id: existing_peer,
2384                direction: PeerDirection::Outbound,
2385                state: ConnectionState::Discovered,
2386                last_seen: Instant::now(),
2387                peer: None,
2388                pool: PeerPool::Other,
2389                transport: PeerTransport::WebRtc,
2390                signal_paths: BTreeSet::from([PeerSignalPath::WifiAware]),
2391                bytes_sent: 0,
2392                bytes_received: 0,
2393            },
2394        );
2395
2396        assert!(manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, &existing_key, &peers,));
2397        assert!(!manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, "peer-b:sess-b", &peers,));
2398        assert!(manager.can_track_local_bus_peer("relay", "peer-c:sess-c", &peers));
2399    }
2400
2401    #[tokio::test]
2402    async fn request_from_peers_with_source_accepts_generic_mesh_peers() {
2403        let state = WebRTCState::new();
2404        let data = b"offline-over-ble".to_vec();
2405        let hash_hex = hex::encode(hashtree_core::sha256(&data));
2406
2407        state.peers.write().await.insert(
2408            "peer-a".to_string(),
2409            PeerEntry {
2410                peer_id: PeerId::new("peer-a-pub".to_string(), Some("session-a".to_string())),
2411                direction: PeerDirection::Outbound,
2412                state: ConnectionState::Connected,
2413                last_seen: Instant::now(),
2414                peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_response(Some(
2415                    data.clone(),
2416                )))),
2417                pool: PeerPool::Other,
2418                transport: PeerTransport::Bluetooth,
2419                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2420                bytes_sent: 0,
2421                bytes_received: 0,
2422            },
2423        );
2424
2425        let resolved = state
2426            .request_from_peers_with_source(&hash_hex)
2427            .await
2428            .expect("expected mock mesh peer response");
2429
2430        assert_eq!(resolved.0, data);
2431        assert_eq!(resolved.1, "peer-a-pub:session-a");
2432    }
2433
2434    #[tokio::test]
2435    async fn request_from_peers_with_source_waits_full_timeout_for_last_generic_peer() {
2436        let state = WebRTCState::new_with_routing_and_cashu(
2437            SelectionStrategy::TitForTat,
2438            true,
2439            RequestDispatchConfig {
2440                initial_fanout: 1,
2441                hedge_fanout: 1,
2442                max_fanout: 1,
2443                hedge_interval_ms: 50,
2444            },
2445            Duration::from_millis(400),
2446            CashuRoutingConfig::default(),
2447            None,
2448            None,
2449        );
2450        let data = b"slow-offline-over-ble".to_vec();
2451        let hash_hex = hex::encode(hashtree_core::sha256(&data));
2452
2453        state.peers.write().await.insert(
2454            "peer-a".to_string(),
2455            PeerEntry {
2456                peer_id: PeerId::new("peer-a-pub".to_string(), Some("session-a".to_string())),
2457                direction: PeerDirection::Outbound,
2458                state: ConnectionState::Connected,
2459                last_seen: Instant::now(),
2460                peer: Some(MeshPeer::mock_for_tests(
2461                    TestMeshPeer::with_delayed_response(
2462                        Some(data.clone()),
2463                        Duration::from_millis(200),
2464                    ),
2465                )),
2466                pool: PeerPool::Other,
2467                transport: PeerTransport::Bluetooth,
2468                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2469                bytes_sent: 0,
2470                bytes_received: 0,
2471            },
2472        );
2473
2474        let resolved = state
2475            .request_from_peers_with_source(&hash_hex)
2476            .await
2477            .expect("expected delayed mock mesh peer response");
2478
2479        assert_eq!(resolved.0, data);
2480        assert_eq!(resolved.1, "peer-a-pub:session-a");
2481    }
2482
2483    #[tokio::test]
2484    async fn dispatch_signaling_message_is_noop_when_signaling_disabled() {
2485        let keys = Keys::generate();
2486        let mut config = WebRTCConfig::default();
2487        config.signaling_enabled = false;
2488        let manager = WebRTCManager::new(keys, config);
2489        let peer_id = PeerId::new("peer-a-pub".to_string(), Some("session-a".to_string()));
2490        let peer_key = peer_id.to_string();
2491        let peer = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
2492        let peer_ref = peer.mock_ref().expect("mock peer").clone();
2493
2494        manager.state.peers.write().await.insert(
2495            peer_key,
2496            PeerEntry {
2497                peer_id,
2498                direction: PeerDirection::Outbound,
2499                state: ConnectionState::Connected,
2500                last_seen: Instant::now(),
2501                peer: Some(peer),
2502                pool: PeerPool::Other,
2503                transport: PeerTransport::Bluetooth,
2504                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2505                bytes_sent: 0,
2506                bytes_received: 0,
2507            },
2508        );
2509
2510        let (relay_tx, _) = tokio::sync::broadcast::channel(4);
2511        manager
2512            .dispatch_signaling_message(
2513                SignalingMessage::Hello {
2514                    peer_id: manager.my_peer_id.to_string(),
2515                    roots: Vec::new(),
2516                },
2517                &relay_tx,
2518            )
2519            .await;
2520
2521        assert_eq!(peer_ref.sent_frame_count().await, 0);
2522    }
2523
2524    #[tokio::test]
2525    async fn failed_peer_cleanup_does_not_hold_peer_map_lock_while_closing() {
2526        let keys = Keys::generate();
2527        let manager = Arc::new(WebRTCManager::new(keys, WebRTCConfig::default()));
2528        let peer_id = PeerId::new("peer-a-pub".to_string(), Some("session-a".to_string()));
2529        let peer_key = peer_id.to_string();
2530
2531        manager.state.peers.write().await.insert(
2532            peer_key.clone(),
2533            PeerEntry {
2534                peer_id: peer_id.clone(),
2535                direction: PeerDirection::Outbound,
2536                state: ConnectionState::Connected,
2537                last_seen: Instant::now(),
2538                peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_close(
2539                    Duration::from_millis(200),
2540                ))),
2541                pool: PeerPool::Other,
2542                transport: PeerTransport::Bluetooth,
2543                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2544                bytes_sent: 0,
2545                bytes_received: 0,
2546            },
2547        );
2548
2549        let (relay_tx, _) = tokio::sync::broadcast::channel(4);
2550        let manager_for_task = manager.clone();
2551        let peer_id_for_task = peer_id.clone();
2552        let cleanup_task = tokio::spawn(async move {
2553            manager_for_task
2554                .handle_peer_state_event(PeerStateEvent::Failed(peer_id_for_task), &relay_tx)
2555                .await;
2556        });
2557
2558        tokio::time::sleep(Duration::from_millis(20)).await;
2559
2560        let remaining = tokio::time::timeout(Duration::from_millis(50), async {
2561            manager.state.peers.read().await.len()
2562        })
2563        .await
2564        .expect("peer map read should not block on close");
2565
2566        assert_eq!(remaining, 0);
2567        cleanup_task.await.expect("cleanup task");
2568    }
2569
2570    #[tokio::test]
2571    async fn resolve_root_from_peers_does_not_hold_peer_map_lock_while_querying() {
2572        let keys = Keys::generate();
2573        let manager = Arc::new(WebRTCManager::new(keys.clone(), WebRTCConfig::default()));
2574        let owner_keys = Keys::generate();
2575        let owner_pubkey = owner_keys.public_key().to_hex();
2576        let tree_name = "video";
2577        let hash = "ab".repeat(32);
2578        let event = EventBuilder::new(
2579            Kind::Custom(super::super::root_events::HASHTREE_KIND),
2580            "",
2581            [
2582                Tag::parse(&["d", tree_name]).unwrap(),
2583                Tag::parse(&["l", super::super::root_events::HASHTREE_LABEL]).unwrap(),
2584                Tag::parse(&["hash", &hash]).unwrap(),
2585            ],
2586        )
2587        .to_event(&owner_keys)
2588        .unwrap();
2589
2590        let peer_id = PeerId::new("peer-a-pub".to_string(), Some("session-a".to_string()));
2591        let peer_key = peer_id.to_string();
2592
2593        manager.state.peers.write().await.insert(
2594            peer_key.clone(),
2595            PeerEntry {
2596                peer_id,
2597                direction: PeerDirection::Outbound,
2598                state: ConnectionState::Connected,
2599                last_seen: Instant::now(),
2600                peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_events(
2601                    vec![event],
2602                    Duration::from_millis(200),
2603                ))),
2604                pool: PeerPool::Other,
2605                transport: PeerTransport::Bluetooth,
2606                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2607                bytes_sent: 0,
2608                bytes_received: 0,
2609            },
2610        );
2611
2612        let manager_for_task = manager.clone();
2613        let owner_pubkey_for_task = owner_pubkey.clone();
2614        let resolve_task = tokio::spawn(async move {
2615            manager_for_task
2616                .state
2617                .resolve_root_from_peers(
2618                    &owner_pubkey_for_task,
2619                    tree_name,
2620                    Duration::from_millis(500),
2621                )
2622                .await
2623        });
2624
2625        tokio::time::sleep(Duration::from_millis(20)).await;
2626
2627        let manager_for_writer = manager.clone();
2628        let peer_key_for_writer = peer_key.clone();
2629        let writer_task = tokio::spawn(async move {
2630            let mut peers = manager_for_writer.state.peers.write().await;
2631            if let Some(entry) = peers.get_mut(&peer_key_for_writer) {
2632                entry.bytes_received += 1;
2633            }
2634        });
2635
2636        tokio::time::sleep(Duration::from_millis(20)).await;
2637
2638        let status_count = tokio::time::timeout(Duration::from_millis(50), async {
2639            manager.state.peers.read().await.len()
2640        })
2641        .await
2642        .expect("peer map read should not block on root query");
2643
2644        assert_eq!(status_count, 1);
2645        assert!(resolve_task.await.expect("resolve task").is_some());
2646        writer_task.await.expect("writer task");
2647    }
2648
2649    #[test]
2650    fn test_formal_timed_seen_set_rejects_duplicates() {
2651        let mut seen = TimedSeenSet::new(4, Duration::from_secs(60));
2652        assert!(seen.insert_if_new("frame-1".to_string()));
2653        assert!(!seen.insert_if_new("frame-1".to_string()));
2654        assert!(seen.insert_if_new("frame-2".to_string()));
2655    }
2656
2657    #[test]
2658    fn test_formal_timed_seen_set_evicts_oldest_when_capacity_exceeded() {
2659        let mut seen = TimedSeenSet::new(2, Duration::from_secs(60));
2660        assert!(seen.insert_if_new("a".to_string()));
2661        assert!(seen.insert_if_new("b".to_string()));
2662        assert!(seen.insert_if_new("c".to_string()));
2663
2664        // "a" should be evicted due to cap=2, so re-insert becomes new again.
2665        assert!(seen.insert_if_new("a".to_string()));
2666        assert!(!seen.insert_if_new("a".to_string()));
2667    }
2668
2669    #[test]
2670    fn test_request_dispatch_normalization_caps_to_available_peers() {
2671        let normalized = normalize_dispatch_config(
2672            RequestDispatchConfig {
2673                initial_fanout: 8,
2674                hedge_fanout: 6,
2675                max_fanout: 5,
2676                hedge_interval_ms: 120,
2677            },
2678            3,
2679        );
2680        assert_eq!(normalized.max_fanout, 3);
2681        assert_eq!(normalized.initial_fanout, 3);
2682        assert_eq!(normalized.hedge_fanout, 3);
2683    }
2684
2685    #[test]
2686    fn test_hedged_wave_plan_matches_dispatch_policy() {
2687        let plan = build_hedged_wave_plan(
2688            7,
2689            RequestDispatchConfig {
2690                initial_fanout: 2,
2691                hedge_fanout: 3,
2692                max_fanout: 6,
2693                hedge_interval_ms: 120,
2694            },
2695        );
2696        assert_eq!(plan, vec![2, 3, 1]);
2697    }
2698}