Skip to main content

hashtree_cli/webrtc/
signaling.rs

1//! WebRTC signaling over Nostr relays
2//!
3//! Protocol (compatible with hashtree-ts):
4//! - All signaling uses ephemeral kind 25050
5//! - Hello messages: #l: "hello" tag, broadcast for peer discovery (unencrypted)
6//! - Directed signaling (offer, answer, candidate, candidates): NIP-17 style
7//!   gift wrap for privacy - wrapped with ephemeral key, #p tag with recipient
8//!
9//! Security: Directed messages use gift wrapping with ephemeral keys so that
10//! relays cannot see the actual sender or correlate messages.
11
12use anyhow::Result;
13use async_trait::async_trait;
14use futures::{SinkExt, StreamExt};
15use hashtree_network::{
16    decode_signaling_event, encode_signaling_event, run_hedged_waves, sync_selector_peers,
17    ClassifyRequest as SharedClassifyRequest, HedgedWaveAction, IceCandidate as SharedIceCandidate,
18    MeshRouter, PeerLink as SharedPeerLink, PeerLinkFactory as SharedPeerLinkFactory, PeerSelector,
19    SignalingTransport as SharedSignalingTransport, TransportError as SharedTransportError,
20};
21use nostr::{ClientMessage, Filter, JsonUtil, Keys, Kind, RelayMessage};
22use std::collections::{BTreeSet, HashMap};
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25use tokio::sync::{mpsc, Mutex, RwLock};
26use tokio_tungstenite::{connect_async, tungstenite::Message};
27use tracing::{debug, error, info, warn};
28
29use super::bluetooth::{BluetoothMesh, BluetoothPeerRegistrar, BluetoothRuntimeContext};
30use super::cashu::{CashuMintMetadataStore, CashuQuoteState, CashuRoutingConfig, NegotiatedQuote};
31use super::local_bus::SharedLocalNostrBus;
32use super::multicast::MulticastNostrBus;
33use super::peer::{ContentStore, Peer, PendingRequest};
34use super::root_events::{
35    build_root_filter, hashtree_event_identifier, is_hashtree_labeled_event, pick_latest_event,
36    root_event_from_peer, PeerRootEvent,
37};
38use super::session::MeshPeer;
39use super::types::{
40    decrement_htl_with_policy, encode_quote_request, encode_request, should_forward_htl,
41    validate_mesh_frame, DataQuoteRequest, DataRequest, MeshNostrFrame, MeshNostrPayload,
42    PeerDirection, PeerId, PeerPool, PeerStateEvent, PeerStatus, RequestDispatchConfig,
43    SignalingMessage, TimedSeenSet, WebRTCConfig, HELLO_TAG, MESH_DEFAULT_HTL, MESH_EVENT_POLICY,
44    WEBRTC_KIND,
45};
46use super::wifi_aware::{mobile_wifi_aware_bridge, WifiAwareNostrBus, WIFI_AWARE_SOURCE};
47use crate::cashu_helper::CashuPaymentClient;
48use crate::nostr_relay::NostrRelay;
49
50/// Callback type for classifying peers into pools
51pub type PeerClassifier = Arc<dyn Fn(&str) -> PeerPool + Send + Sync>;
52
53/// Active data transport used for a peer session.
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
55pub enum PeerTransport {
56    WebRtc,
57    Bluetooth,
58}
59
60impl PeerTransport {
61    pub const fn as_str(self) -> &'static str {
62        match self {
63            PeerTransport::WebRtc => "webrtc",
64            PeerTransport::Bluetooth => "bluetooth",
65        }
66    }
67}
68
69impl std::fmt::Display for PeerTransport {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.write_str((*self).as_str())
72    }
73}
74
75fn bluetooth_nostr_only_mode() -> bool {
76    matches!(
77        std::env::var("HTREE_BLUETOOTH_NOSTR_ONLY").ok().as_deref(),
78        Some("1" | "true" | "TRUE" | "yes" | "YES")
79    )
80}
81
82/// Signaling/discovery path through which a peer was seen.
83#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
84pub enum PeerSignalPath {
85    Relay,
86    Multicast,
87    WifiAware,
88    Bluetooth,
89}
90
91impl PeerSignalPath {
92    pub const fn as_str(self) -> &'static str {
93        match self {
94            PeerSignalPath::Relay => "relay",
95            PeerSignalPath::Multicast => "multicast",
96            PeerSignalPath::WifiAware => WIFI_AWARE_SOURCE,
97            PeerSignalPath::Bluetooth => "bluetooth",
98        }
99    }
100
101    pub fn from_source_name(source: &str) -> Self {
102        match source {
103            "multicast" => PeerSignalPath::Multicast,
104            WIFI_AWARE_SOURCE => PeerSignalPath::WifiAware,
105            "bluetooth" => PeerSignalPath::Bluetooth,
106            _ => PeerSignalPath::Relay,
107        }
108    }
109}
110
111impl std::fmt::Display for PeerSignalPath {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        f.write_str((*self).as_str())
114    }
115}
116
117/// Connection state for a peer
118#[derive(Debug, Clone, PartialEq)]
119pub enum ConnectionState {
120    Discovered,
121    Connecting,
122    Connected,
123    Failed,
124}
125
126impl std::fmt::Display for ConnectionState {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        match self {
129            ConnectionState::Discovered => write!(f, "discovered"),
130            ConnectionState::Connecting => write!(f, "connecting"),
131            ConnectionState::Connected => write!(f, "connected"),
132            ConnectionState::Failed => write!(f, "failed"),
133        }
134    }
135}
136
137/// Peer entry in the manager
138pub struct PeerEntry {
139    pub peer_id: PeerId,
140    pub direction: PeerDirection,
141    pub state: ConnectionState,
142    pub last_seen: Instant,
143    pub peer: Option<MeshPeer>,
144    pub pool: PeerPool,
145    pub transport: PeerTransport,
146    pub signal_paths: BTreeSet<PeerSignalPath>,
147    pub bytes_sent: u64,
148    pub bytes_received: u64,
149}
150
151/// Shared state for the native mesh router.
152pub struct WebRTCState {
153    pub peers: RwLock<HashMap<String, PeerEntry>>,
154    pub connected_count: std::sync::atomic::AtomicUsize,
155    /// Total bytes sent across all peers (cumulative)
156    pub bytes_sent: std::sync::atomic::AtomicU64,
157    /// Total bytes received across all peers (cumulative)
158    pub bytes_received: std::sync::atomic::AtomicU64,
159    /// Relayless mesh frames received and accepted.
160    pub mesh_received: std::sync::atomic::AtomicU64,
161    /// Relayless mesh frames forwarded to peers.
162    pub mesh_forwarded: std::sync::atomic::AtomicU64,
163    /// Relayless mesh frames/events dropped due to dedupe.
164    pub mesh_dropped_duplicate: std::sync::atomic::AtomicU64,
165    /// Shared peer selector used by live retrieval; aligned with simulation strategies.
166    peer_selector: Arc<RwLock<PeerSelector>>,
167    /// Hedged dispatch policy for retrieval requests.
168    request_dispatch: RequestDispatchConfig,
169    /// Retrieval timeout for quote negotiation and single-peer fetches.
170    request_timeout: Duration,
171    /// Shared Cashu quote negotiation policy/state.
172    cashu_quotes: Arc<CashuQuoteState>,
173    /// Optional local buses such as multicast or BLE that carry signed Nostr
174    /// envelopes for nearby/offline peers.
175    local_buses: RwLock<Vec<SharedLocalNostrBus>>,
176}
177const SEEN_FRAME_CAP: usize = 4096;
178const SEEN_FRAME_TTL: Duration = Duration::from_secs(120);
179const SEEN_EVENT_CAP: usize = 8192;
180const SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
181
182type PendingRequestsMap = Arc<Mutex<HashMap<String, PendingRequest>>>;
183type ConnectedPeer = (
184    String,
185    PendingRequestsMap,
186    Arc<webrtc::data_channel::RTCDataChannel>,
187);
188type ConnectedSession = (String, MeshPeer, PeerTransport);
189type SharedProductionRouter = MeshRouter<RouterSignalingBridge, SharedRouterPeerFactory>;
190
191async fn remember_peer_signal_path(state: &WebRTCState, peer_id: &str, source: &str) {
192    if let Some(entry) = state.peers.write().await.get_mut(peer_id) {
193        entry
194            .signal_paths
195            .insert(PeerSignalPath::from_source_name(source));
196    }
197}
198
199#[derive(Clone)]
200struct RouterSignalingBridge {
201    peer_id: String,
202    signaling_tx: mpsc::Sender<SignalingMessage>,
203}
204
205impl RouterSignalingBridge {
206    fn new(peer_id: String, signaling_tx: mpsc::Sender<SignalingMessage>) -> Self {
207        Self {
208            peer_id,
209            signaling_tx,
210        }
211    }
212}
213
214#[async_trait]
215impl SharedSignalingTransport for RouterSignalingBridge {
216    async fn connect(&self, _relays: &[String]) -> Result<(), SharedTransportError> {
217        Ok(())
218    }
219
220    async fn disconnect(&self) {}
221
222    async fn publish(&self, msg: SignalingMessage) -> Result<(), SharedTransportError> {
223        self.signaling_tx
224            .send(msg)
225            .await
226            .map_err(|e| SharedTransportError::SendFailed(e.to_string()))
227    }
228
229    async fn recv(&self) -> Option<SignalingMessage> {
230        None
231    }
232
233    fn try_recv(&self) -> Option<SignalingMessage> {
234        None
235    }
236
237    fn peer_id(&self) -> &str {
238        &self.peer_id
239    }
240}
241
242struct SharedRouterPeerFactory {
243    my_peer_id: PeerId,
244    signaling_tx: mpsc::Sender<SignalingMessage>,
245    stun_servers: Vec<String>,
246    store: Option<Arc<dyn ContentStore>>,
247    state: Arc<WebRTCState>,
248    state_event_tx: mpsc::Sender<PeerStateEvent>,
249    nostr_relay: Option<Arc<NostrRelay>>,
250    mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
251    peer_classifier: PeerClassifier,
252    peers: RwLock<HashMap<String, Arc<Peer>>>,
253}
254
255impl SharedRouterPeerFactory {
256    fn new(
257        my_peer_id: PeerId,
258        signaling_tx: mpsc::Sender<SignalingMessage>,
259        stun_servers: Vec<String>,
260        store: Option<Arc<dyn ContentStore>>,
261        state: Arc<WebRTCState>,
262        state_event_tx: mpsc::Sender<PeerStateEvent>,
263        nostr_relay: Option<Arc<NostrRelay>>,
264        mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
265        peer_classifier: PeerClassifier,
266    ) -> Self {
267        Self {
268            my_peer_id,
269            signaling_tx,
270            stun_servers,
271            store,
272            state,
273            state_event_tx,
274            nostr_relay,
275            mesh_frame_tx,
276            peer_classifier,
277            peers: RwLock::new(HashMap::new()),
278        }
279    }
280
281    async fn register_peer(&self, peer_id: PeerId, direction: PeerDirection, peer: Arc<Peer>) {
282        let peer_key = peer_id.to_string();
283        let pool = (self.peer_classifier)(&peer_id.pubkey);
284        self.peers
285            .write()
286            .await
287            .insert(peer_key.clone(), peer.clone());
288
289        let mut peers = self.state.peers.write().await;
290        peers.insert(
291            peer_key,
292            PeerEntry {
293                peer_id,
294                direction,
295                state: ConnectionState::Connecting,
296                last_seen: Instant::now(),
297                peer: Some(MeshPeer::WebRtc(peer)),
298                pool,
299                transport: PeerTransport::WebRtc,
300                signal_paths: BTreeSet::from([PeerSignalPath::Relay]),
301                bytes_sent: 0,
302                bytes_received: 0,
303            },
304        );
305    }
306
307    async fn create_peer(
308        &self,
309        peer_id: PeerId,
310        direction: PeerDirection,
311    ) -> Result<Peer, SharedTransportError> {
312        Peer::new_with_store_and_events(
313            peer_id,
314            direction,
315            self.my_peer_id.clone(),
316            self.signaling_tx.clone(),
317            self.stun_servers.clone(),
318            self.store.clone(),
319            Some(self.state_event_tx.clone()),
320            self.nostr_relay.clone(),
321            Some(self.mesh_frame_tx.clone()),
322            Some(self.state.cashu_quotes.clone()),
323        )
324        .await
325        .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))
326    }
327}
328
329#[async_trait]
330impl SharedPeerLinkFactory for SharedRouterPeerFactory {
331    async fn create_offer(
332        &self,
333        target_peer_id: &str,
334    ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
335        let target_peer = PeerId::from_string(target_peer_id).ok_or_else(|| {
336            SharedTransportError::ConnectionFailed(format!("invalid peer id {target_peer_id}"))
337        })?;
338        let peer = Arc::new(
339            self.create_peer(target_peer.clone(), PeerDirection::Outbound)
340                .await?,
341        );
342        peer.setup_handlers()
343            .await
344            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
345        let offer = peer
346            .connect()
347            .await
348            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
349        let sdp = offer
350            .get("sdp")
351            .and_then(|value| value.as_str())
352            .ok_or_else(|| {
353                SharedTransportError::ConnectionFailed("missing SDP in CLI peer offer".to_string())
354            })?
355            .to_string();
356        self.register_peer(target_peer, PeerDirection::Outbound, peer.clone())
357            .await;
358        Ok((peer as Arc<dyn SharedPeerLink>, sdp))
359    }
360
361    async fn accept_offer(
362        &self,
363        from_peer_id: &str,
364        offer_sdp: &str,
365    ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
366        let from_peer = PeerId::from_string(from_peer_id).ok_or_else(|| {
367            SharedTransportError::ConnectionFailed(format!("invalid peer id {from_peer_id}"))
368        })?;
369        let peer = Arc::new(
370            self.create_peer(from_peer.clone(), PeerDirection::Inbound)
371                .await?,
372        );
373        peer.setup_handlers()
374            .await
375            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
376        let answer = peer
377            .handle_offer(serde_json::json!({ "type": "offer", "sdp": offer_sdp }))
378            .await
379            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
380        let sdp = answer
381            .get("sdp")
382            .and_then(|value| value.as_str())
383            .ok_or_else(|| {
384                SharedTransportError::ConnectionFailed("missing SDP in CLI peer answer".to_string())
385            })?
386            .to_string();
387        self.register_peer(from_peer, PeerDirection::Inbound, peer.clone())
388            .await;
389        Ok((peer as Arc<dyn SharedPeerLink>, sdp))
390    }
391
392    async fn handle_answer(
393        &self,
394        target_peer_id: &str,
395        answer_sdp: &str,
396    ) -> Result<Arc<dyn SharedPeerLink>, SharedTransportError> {
397        let peer = self
398            .peers
399            .read()
400            .await
401            .get(target_peer_id)
402            .cloned()
403            .ok_or_else(|| {
404                SharedTransportError::ConnectionFailed(format!(
405                    "missing outbound peer for {target_peer_id}"
406                ))
407            })?;
408        peer.handle_answer(serde_json::json!({ "type": "answer", "sdp": answer_sdp }))
409            .await
410            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
411        Ok(peer as Arc<dyn SharedPeerLink>)
412    }
413
414    async fn handle_candidate(
415        &self,
416        peer_id: &str,
417        candidate: SharedIceCandidate,
418    ) -> Result<(), SharedTransportError> {
419        let peer = self.peers.read().await.get(peer_id).cloned();
420        if let Some(peer) = peer {
421            peer.handle_candidate(serde_json::json!({
422                "candidate": candidate.candidate,
423                "sdpMLineIndex": candidate.sdp_m_line_index,
424                "sdpMid": candidate.sdp_mid,
425            }))
426            .await
427            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
428        }
429        Ok(())
430    }
431
432    async fn remove_peer(&self, peer_id: &str) -> Result<(), SharedTransportError> {
433        self.peers.write().await.remove(peer_id);
434        Ok(())
435    }
436}
437
438impl WebRTCState {
439    pub fn new() -> Self {
440        let cfg = WebRTCConfig::default();
441        Self::new_with_routing_and_cashu(
442            cfg.request_selection_strategy,
443            cfg.request_fairness_enabled,
444            cfg.request_dispatch,
445            Duration::from_millis(cfg.message_timeout_ms),
446            CashuRoutingConfig::default(),
447            None,
448            None,
449        )
450    }
451
452    pub fn new_with_routing(
453        selection_strategy: super::types::SelectionStrategy,
454        fairness_enabled: bool,
455        request_dispatch: RequestDispatchConfig,
456    ) -> Self {
457        let cfg = WebRTCConfig::default();
458        Self::new_with_routing_and_cashu(
459            selection_strategy,
460            fairness_enabled,
461            request_dispatch,
462            Duration::from_millis(cfg.message_timeout_ms),
463            CashuRoutingConfig::default(),
464            None,
465            None,
466        )
467    }
468
469    pub fn new_with_routing_and_cashu(
470        selection_strategy: super::types::SelectionStrategy,
471        fairness_enabled: bool,
472        request_dispatch: RequestDispatchConfig,
473        request_timeout: Duration,
474        cashu_routing: CashuRoutingConfig,
475        payment_client: Option<Arc<dyn CashuPaymentClient>>,
476        mint_metadata: Option<Arc<CashuMintMetadataStore>>,
477    ) -> Self {
478        let mut selector = PeerSelector::with_strategy(selection_strategy);
479        selector.set_fairness(fairness_enabled);
480        let peer_selector = Arc::new(RwLock::new(selector));
481        let cashu_quotes = Arc::new(if let Some(mint_metadata) = mint_metadata {
482            CashuQuoteState::new_with_mint_metadata(
483                cashu_routing,
484                peer_selector.clone(),
485                payment_client,
486                mint_metadata,
487            )
488        } else {
489            CashuQuoteState::new(cashu_routing, peer_selector.clone(), payment_client)
490        });
491        Self {
492            peers: RwLock::new(HashMap::new()),
493            connected_count: std::sync::atomic::AtomicUsize::new(0),
494            bytes_sent: std::sync::atomic::AtomicU64::new(0),
495            bytes_received: std::sync::atomic::AtomicU64::new(0),
496            mesh_received: std::sync::atomic::AtomicU64::new(0),
497            mesh_forwarded: std::sync::atomic::AtomicU64::new(0),
498            mesh_dropped_duplicate: std::sync::atomic::AtomicU64::new(0),
499            peer_selector,
500            request_dispatch,
501            request_timeout,
502            cashu_quotes,
503            local_buses: RwLock::new(Vec::new()),
504        }
505    }
506
507    pub async fn set_local_buses(&self, buses: Vec<SharedLocalNostrBus>) {
508        *self.local_buses.write().await = buses;
509    }
510
511    pub async fn add_local_bus(&self, bus: SharedLocalNostrBus) {
512        self.local_buses.write().await.push(bus);
513    }
514
515    pub async fn set_multicast_bus(&self, bus: Option<Arc<MulticastNostrBus>>) {
516        let buses = bus
517            .into_iter()
518            .map(|bus| bus as SharedLocalNostrBus)
519            .collect();
520        self.set_local_buses(buses).await;
521    }
522
523    /// Drop all live peer sessions and clear topology-specific state while
524    /// keeping cumulative bandwidth counters intact.
525    pub async fn reset_runtime_state(&self) {
526        self.set_local_buses(Vec::new()).await;
527        let peers = {
528            let mut peers = self.peers.write().await;
529            std::mem::take(&mut *peers)
530        };
531        self.connected_count
532            .store(0, std::sync::atomic::Ordering::Relaxed);
533        for entry in peers.into_values() {
534            if let Some(peer) = entry.peer {
535                let _ = peer.close().await;
536            }
537        }
538    }
539
540    /// Get current bandwidth stats (bytes sent/received)
541    pub fn get_bandwidth(&self) -> (u64, u64) {
542        (
543            self.bytes_sent.load(std::sync::atomic::Ordering::Relaxed),
544            self.bytes_received
545                .load(std::sync::atomic::Ordering::Relaxed),
546        )
547    }
548
549    pub fn get_mesh_stats(&self) -> (u64, u64, u64) {
550        (
551            self.mesh_received
552                .load(std::sync::atomic::Ordering::Relaxed),
553            self.mesh_forwarded
554                .load(std::sync::atomic::Ordering::Relaxed),
555            self.mesh_dropped_duplicate
556                .load(std::sync::atomic::Ordering::Relaxed),
557        )
558    }
559
560    pub fn record_mesh_received(&self) {
561        self.mesh_received
562            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
563    }
564
565    pub fn record_mesh_forwarded(&self, count: u64) {
566        self.mesh_forwarded
567            .fetch_add(count, std::sync::atomic::Ordering::Relaxed);
568    }
569
570    pub fn record_mesh_duplicate_drop(&self) {
571        self.mesh_dropped_duplicate
572            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
573    }
574
575    /// Record bytes sent (global + per-peer)
576    pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
577        self.bytes_sent
578            .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
579        if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
580            entry.bytes_sent += bytes;
581        }
582    }
583
584    /// Record bytes received (global + per-peer)
585    pub async fn record_received(&self, peer_id: &str, bytes: u64) {
586        self.bytes_received
587            .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
588        if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
589            entry.bytes_received += bytes;
590        }
591    }
592
593    /// Request content by hash from connected peers
594    /// Queries peers in adaptive selector order with hedged fanout waves.
595    /// Returns the first successful response, or None if no peer has it
596    pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
597        self.request_from_peers_with_source(hash_hex)
598            .await
599            .map(|(data, _peer_id)| data)
600    }
601
602    /// Request content by hash from connected peers, returning data and source peer.
603    pub async fn request_from_peers_with_source(
604        &self,
605        hash_hex: &str,
606    ) -> Option<(Vec<u8>, String)> {
607        use super::types::BLOB_REQUEST_POLICY;
608
609        let peers = self.peers.read().await;
610
611        let peer_refs: Vec<_> = peers
612            .values()
613            .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
614            .filter_map(|p| {
615                p.peer
616                    .clone()
617                    .map(|peer| (p.peer_id.to_string(), peer, p.transport))
618            })
619            .collect();
620
621        drop(peers); // Release the read lock
622
623        let mut connected_peers: Vec<ConnectedPeer> = Vec::new();
624        let mut connected_sessions: Vec<ConnectedSession> = Vec::new();
625        for (peer_id, peer, transport) in peer_refs {
626            if !peer.is_ready() {
627                continue;
628            }
629            if bluetooth_nostr_only_mode() && transport == PeerTransport::Bluetooth {
630                continue;
631            }
632            if let Some(webrtc_peer) = peer.as_webrtc() {
633                let dc_guard = webrtc_peer.data_channel.lock().await;
634                if let Some(dc) = dc_guard.as_ref() {
635                    connected_peers.push((
636                        peer_id.clone(),
637                        webrtc_peer.pending_requests.clone(),
638                        dc.clone(),
639                    ));
640                }
641            }
642            connected_sessions.push((peer_id, peer, transport));
643        }
644
645        if connected_sessions.is_empty() {
646            debug!(
647                "No connected peers to query for {}",
648                &hash_hex[..8.min(hash_hex.len())]
649            );
650            return None;
651        }
652
653        // Convert hex to binary hash once
654        let hash_bytes = match hex::decode(hash_hex) {
655            Ok(b) => b,
656            Err(_) => return None,
657        };
658
659        let expected_hash: [u8; 32] = match hash_bytes.as_slice().try_into() {
660            Ok(h) => h,
661            Err(_) => {
662                debug!(
663                    "Invalid hash length {}, expected 32 bytes",
664                    hash_bytes.len()
665                );
666                return None;
667            }
668        };
669
670        let connected_peer_ids: Vec<String> = connected_sessions
671            .iter()
672            .map(|(peer_id, _, _)| peer_id.clone())
673            .collect();
674        sync_selector_peers(self.peer_selector.as_ref(), &connected_peer_ids).await;
675
676        let ordered_peer_ids = self.peer_selector.write().await.select_peers();
677        let mut quote_by_peer: HashMap<
678            String,
679            (
680                PendingRequestsMap,
681                Arc<webrtc::data_channel::RTCDataChannel>,
682            ),
683        > = connected_peers
684            .iter()
685            .cloned()
686            .map(|(peer_id, pending, dc)| (peer_id, (pending, dc)))
687            .collect();
688        let mut ordered_quote_peers: Vec<ConnectedPeer> = Vec::new();
689        for peer_id in &ordered_peer_ids {
690            if let Some((pending, dc)) = quote_by_peer.remove(peer_id) {
691                ordered_quote_peers.push((peer_id.clone(), pending, dc));
692            }
693        }
694        for (peer_id, (pending, dc)) in quote_by_peer {
695            ordered_quote_peers.push((peer_id, pending, dc));
696        }
697
698        let mut by_peer: HashMap<String, (MeshPeer, PeerTransport)> = connected_sessions
699            .into_iter()
700            .map(|(peer_id, peer, transport)| (peer_id, (peer, transport)))
701            .collect();
702
703        let mut ordered_peers: Vec<ConnectedSession> = Vec::new();
704        for peer_id in ordered_peer_ids {
705            if let Some((peer, transport)) = by_peer.remove(&peer_id) {
706                ordered_peers.push((peer_id, peer, transport));
707            }
708        }
709        for (peer_id, (peer, transport)) in by_peer {
710            ordered_peers.push((peer_id, peer, transport));
711        }
712
713        debug!(
714            "Querying {} peers for {} with shared hedged scheduler",
715            ordered_peers.len(),
716            &hash_hex[..8.min(hash_hex.len())],
717        );
718
719        if let Some((requested_mint, payment_sat, quote_ttl_ms)) =
720            self.cashu_quotes.requester_quote_terms().await
721        {
722            if let Some(quote) = self
723                .request_quote_from_peers(
724                    &hash_bytes,
725                    requested_mint,
726                    payment_sat,
727                    quote_ttl_ms,
728                    &ordered_quote_peers,
729                )
730                .await
731            {
732                if let Some(data) = self
733                    .request_from_single_peer(
734                        hash_hex,
735                        &hash_bytes,
736                        expected_hash,
737                        &quote.peer_id,
738                        Some(&quote),
739                        &ordered_quote_peers,
740                    )
741                    .await
742                {
743                    debug!(
744                        "Got quoted response from peer {} for {}",
745                        quote.peer_id,
746                        &hash_hex[..8.min(hash_hex.len())]
747                    );
748                    return Some((data, quote.peer_id));
749                }
750            }
751        }
752
753        let request = DataRequest {
754            h: hash_bytes.clone(),
755            htl: BLOB_REQUEST_POLICY.max_htl,
756            q: None,
757        };
758        let wire = match encode_request(&request) {
759            Ok(w) => w,
760            Err(_) => return None,
761        };
762        let wire_len = wire.len() as u64;
763        let current_result_rx = Arc::new(Mutex::new(None));
764        if let Some((data, peer_id)) = run_hedged_waves(
765            ordered_peers.len(),
766            self.request_dispatch,
767            self.request_timeout,
768            |range| {
769                let wave_peers = ordered_peers[range].to_vec();
770                let (result_tx, result_rx) =
771                    mpsc::channel::<(String, Instant, Result<Option<Vec<u8>>>)>(wave_peers.len());
772                let current_result_rx = current_result_rx.clone();
773                let hash_hex = hash_hex.to_string();
774                async move {
775                    *current_result_rx.lock().await = Some(result_rx);
776                    let sent = wave_peers.len();
777                    for (peer_id, peer, transport) in wave_peers {
778                        if transport != PeerTransport::Bluetooth {
779                            self.record_sent(&peer_id, wire_len).await;
780                        }
781                        self.peer_selector
782                            .write()
783                            .await
784                            .record_request(&peer_id, wire_len);
785
786                        let result_tx = result_tx.clone();
787                        let peer_id_for_task = peer_id.clone();
788                        let peer = peer.clone();
789                        let hash_hex = hash_hex.clone();
790                        let per_request_timeout = self.request_timeout;
791                        tokio::spawn(async move {
792                            let started = Instant::now();
793                            let result = peer.request(&hash_hex, per_request_timeout).await;
794                            let _ = result_tx.send((peer_id_for_task, started, result)).await;
795                        });
796                    }
797                    drop(result_tx);
798                    sent
799                }
800            },
801            |wait| {
802                let current_result_rx = current_result_rx.clone();
803                async move {
804                    let mut current_result_rx = current_result_rx.lock().await;
805                    let Some(result_rx) = current_result_rx.as_mut() else {
806                        return HedgedWaveAction::Abort;
807                    };
808                    let deadline = Instant::now() + wait;
809                    loop {
810                        let now = Instant::now();
811                        if now >= deadline {
812                            return HedgedWaveAction::Continue;
813                        }
814                        let remaining = deadline.saturating_duration_since(now);
815                        match tokio::time::timeout(remaining, result_rx.recv()).await {
816                            Ok(Some((peer_id, started, Ok(Some(data))))) => {
817                                let rtt_ms = started.elapsed().as_millis() as u64;
818                                if hashtree_core::sha256(&data) == expected_hash {
819                                    let should_record = {
820                                        let peers = self.peers.read().await;
821                                        peers
822                                            .get(&peer_id)
823                                            .map(|entry| {
824                                                entry.transport != PeerTransport::Bluetooth
825                                            })
826                                            .unwrap_or(true)
827                                    };
828                                    if should_record {
829                                        self.record_received(&peer_id, data.len() as u64).await;
830                                    }
831                                    self.peer_selector.write().await.record_success(
832                                        &peer_id,
833                                        rtt_ms,
834                                        data.len() as u64,
835                                    );
836                                    return HedgedWaveAction::Success((data, peer_id));
837                                }
838                                self.peer_selector.write().await.record_failure(&peer_id);
839                            }
840                            Ok(Some((peer_id, _, Ok(None)))) | Ok(Some((peer_id, _, Err(_)))) => {
841                                self.peer_selector.write().await.record_timeout(&peer_id);
842                            }
843                            Ok(None) | Err(_) => return HedgedWaveAction::Continue,
844                        }
845                    }
846                }
847            },
848        )
849        .await
850        {
851            debug!(
852                "Got response from peer {} for {}",
853                peer_id,
854                &hash_hex[..8.min(hash_hex.len())]
855            );
856            return Some((data, peer_id));
857        }
858
859        debug!(
860            "No peer had data for {}",
861            &hash_hex[..8.min(hash_hex.len())]
862        );
863        None
864    }
865
866    async fn request_quote_from_peers(
867        &self,
868        hash_bytes: &[u8],
869        requested_mint: String,
870        payment_sat: u64,
871        quote_ttl_ms: u32,
872        ordered_peers: &[ConnectedPeer],
873    ) -> Option<NegotiatedQuote> {
874        if ordered_peers.is_empty() || quote_ttl_ms == 0 {
875            return None;
876        }
877
878        let hash_hex = hex::encode(hash_bytes);
879        let rx = self
880            .cashu_quotes
881            .register_pending_quote(hash_hex.clone(), Some(requested_mint.clone()), payment_sat)
882            .await;
883        let quote_request = DataQuoteRequest {
884            h: hash_bytes.to_vec(),
885            p: payment_sat,
886            t: quote_ttl_ms,
887            m: Some(requested_mint),
888        };
889        let wire = match encode_quote_request(&quote_request) {
890            Ok(wire) => wire,
891            Err(_) => {
892                self.cashu_quotes.clear_pending_quote(&hash_hex).await;
893                return None;
894            }
895        };
896        let rx = Arc::new(Mutex::new(rx));
897        let result = run_hedged_waves(
898            ordered_peers.len(),
899            self.request_dispatch,
900            self.request_timeout,
901            |range| {
902                let wave_peers = ordered_peers[range].to_vec();
903                let wire = wire.clone();
904                async move {
905                    let mut sent = 0usize;
906                    for (_, _, dc) in wave_peers {
907                        if dc.send(&bytes::Bytes::copy_from_slice(&wire)).await.is_ok() {
908                            sent += 1;
909                        }
910                    }
911                    sent
912                }
913            },
914            |wait| {
915                let rx = rx.clone();
916                async move {
917                    let mut rx = rx.lock().await;
918                    match tokio::time::timeout(wait, &mut *rx).await {
919                        Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
920                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
921                        Err(_) => HedgedWaveAction::Continue,
922                    }
923                }
924            },
925        )
926        .await;
927
928        self.cashu_quotes.clear_pending_quote(&hash_hex).await;
929        result
930    }
931
932    async fn request_from_single_peer(
933        &self,
934        hash_hex: &str,
935        hash_bytes: &[u8],
936        expected_hash: [u8; 32],
937        target_peer_id: &str,
938        quote: Option<&NegotiatedQuote>,
939        ordered_peers: &[ConnectedPeer],
940    ) -> Option<Vec<u8>> {
941        use super::types::BLOB_REQUEST_POLICY;
942
943        let (pending_requests, dc) = ordered_peers
944            .iter()
945            .find(|(peer_id, _, _)| peer_id == target_peer_id)
946            .map(|(_, pending_requests, dc)| (pending_requests.clone(), dc.clone()))?;
947
948        let request = DataRequest {
949            h: hash_bytes.to_vec(),
950            htl: BLOB_REQUEST_POLICY.max_htl,
951            q: quote.map(|quote| quote.quote_id),
952        };
953        let wire = encode_request(&request).ok()?;
954        let wire_len = wire.len() as u64;
955        let sent_at = Instant::now();
956        let (tx, mut rx) = tokio::sync::oneshot::channel();
957
958        {
959            let mut pending = pending_requests.lock().await;
960            pending.insert(
961                hash_hex.to_string(),
962                if let Some(quote) = quote {
963                    PendingRequest::quoted(
964                        hash_bytes.to_vec(),
965                        tx,
966                        quote.quote_id,
967                        quote.mint_url.clone().unwrap_or_default(),
968                        quote.payment_sat,
969                    )
970                } else {
971                    PendingRequest::standard(hash_bytes.to_vec(), tx)
972                },
973            );
974        }
975
976        if dc
977            .send(&bytes::Bytes::copy_from_slice(&wire))
978            .await
979            .is_err()
980        {
981            let mut pending = pending_requests.lock().await;
982            pending.remove(hash_hex);
983            self.peer_selector
984                .write()
985                .await
986                .record_failure(target_peer_id);
987            return None;
988        }
989
990        self.record_sent(target_peer_id, wire_len).await;
991        self.peer_selector
992            .write()
993            .await
994            .record_request(target_peer_id, wire_len);
995
996        let wait_timeout = if let Some(quote) = quote {
997            let multiplier = quote.payment_sat.clamp(1, 32) as u128;
998            let extra_ms = self
999                .cashu_quotes
1000                .settlement_timeout()
1001                .as_millis()
1002                .saturating_mul(multiplier);
1003            self.request_timeout + Duration::from_millis(extra_ms.min(u64::MAX as u128) as u64)
1004        } else {
1005            self.request_timeout
1006        };
1007
1008        match tokio::time::timeout(wait_timeout, &mut rx).await {
1009            Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == expected_hash => {
1010                let rtt_ms = sent_at.elapsed().as_millis() as u64;
1011                self.record_received(target_peer_id, data.len() as u64)
1012                    .await;
1013                self.peer_selector.write().await.record_success(
1014                    target_peer_id,
1015                    rtt_ms,
1016                    data.len() as u64,
1017                );
1018                Some(data)
1019            }
1020            Ok(Ok(Some(_))) => {
1021                self.peer_selector
1022                    .write()
1023                    .await
1024                    .record_failure(target_peer_id);
1025                let pending = pending_requests.lock().await.remove(hash_hex);
1026                if let Some(pending) = pending {
1027                    if let Some(quoted) = pending.quoted {
1028                        if let Some(in_flight) = quoted.in_flight_payment {
1029                            let _ = self
1030                                .cashu_quotes
1031                                .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
1032                                .await;
1033                        }
1034                    }
1035                }
1036                None
1037            }
1038            Ok(Ok(None)) | Ok(Err(_)) | Err(_) => {
1039                let pending = pending_requests.lock().await.remove(hash_hex);
1040                if let Some(pending) = pending {
1041                    if let Some(quoted) = pending.quoted {
1042                        if let Some(in_flight) = quoted.in_flight_payment {
1043                            let _ = self
1044                                .cashu_quotes
1045                                .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
1046                                .await;
1047                        }
1048                    }
1049                }
1050                self.peer_selector
1051                    .write()
1052                    .await
1053                    .record_timeout(target_peer_id);
1054                None
1055            }
1056        }
1057    }
1058
1059    /// Resolve a hashtree root event through connected peers using Nostr REQ/EOSE over WebRTC.
1060    pub async fn resolve_root_from_peers(
1061        &self,
1062        owner_pubkey: &str,
1063        tree_name: &str,
1064        per_peer_timeout: Duration,
1065    ) -> Option<PeerRootEvent> {
1066        let filter = build_root_filter(owner_pubkey, tree_name)?;
1067
1068        let peer_refs: Vec<_> = {
1069            let peers = self.peers.read().await;
1070            peers
1071                .values()
1072                .filter(|entry| entry.state == ConnectionState::Connected)
1073                .filter(|entry| {
1074                    !bluetooth_nostr_only_mode() || entry.transport != PeerTransport::Bluetooth
1075                })
1076                .filter_map(|entry| {
1077                    let peer = entry.peer.as_ref()?;
1078                    if !peer.is_ready() {
1079                        return None;
1080                    }
1081                    Some((entry.peer_id.short(), peer.clone()))
1082                })
1083                .collect()
1084        };
1085
1086        for (peer_short, peer) in peer_refs {
1087            debug!(
1088                "Querying peer {} for root event {}/{}",
1089                peer_short, owner_pubkey, tree_name
1090            );
1091            let events = match peer
1092                .query_nostr_events(vec![filter.clone()], per_peer_timeout)
1093                .await
1094            {
1095                Ok(events) => events,
1096                Err(e) => {
1097                    debug!(
1098                        "Peer {} Nostr query failed for {}/{}: {}",
1099                        peer_short, owner_pubkey, tree_name, e
1100                    );
1101                    continue;
1102                }
1103            };
1104            debug!(
1105                "Peer {} returned {} Nostr event(s) for {}/{}",
1106                peer_short,
1107                events.len(),
1108                owner_pubkey,
1109                tree_name
1110            );
1111
1112            let latest = pick_latest_event(events.iter().filter(|event| {
1113                hashtree_event_identifier(event).as_deref() == Some(tree_name)
1114                    && is_hashtree_labeled_event(event)
1115            }));
1116            if let Some(event) = latest {
1117                if let Some(root) = root_event_from_peer(event, &peer_short, tree_name) {
1118                    debug!(
1119                        "Resolved {}/{} via peer {} event {}",
1120                        owner_pubkey,
1121                        tree_name,
1122                        peer_short,
1123                        event.id.to_hex()
1124                    );
1125                    return Some(root);
1126                }
1127            }
1128        }
1129
1130        None
1131    }
1132
1133    pub async fn resolve_root_from_local_buses_with_source(
1134        &self,
1135        owner_pubkey: &str,
1136        tree_name: &str,
1137        timeout: Duration,
1138    ) -> Option<(&'static str, PeerRootEvent)> {
1139        let buses = self.local_buses.read().await.clone();
1140        for bus in buses {
1141            if let Some(root) = bus.query_root(owner_pubkey, tree_name, timeout).await {
1142                return Some((bus.source_name(), root));
1143            }
1144        }
1145        None
1146    }
1147
1148    pub async fn resolve_root_from_local_buses(
1149        &self,
1150        owner_pubkey: &str,
1151        tree_name: &str,
1152        timeout: Duration,
1153    ) -> Option<PeerRootEvent> {
1154        self.resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
1155            .await
1156            .map(|(_, root)| root)
1157    }
1158
1159    pub async fn resolve_root_from_multicast(
1160        &self,
1161        owner_pubkey: &str,
1162        tree_name: &str,
1163        timeout: Duration,
1164    ) -> Option<PeerRootEvent> {
1165        self.resolve_root_from_local_buses(owner_pubkey, tree_name, timeout)
1166            .await
1167    }
1168}
1169
1170impl Default for WebRTCState {
1171    fn default() -> Self {
1172        Self::new()
1173    }
1174}
1175
1176/// Native mesh manager handles peer discovery and transport fan-out.
1177pub struct WebRTCManager {
1178    config: WebRTCConfig,
1179    my_peer_id: PeerId,
1180    keys: Keys,
1181    state: Arc<WebRTCState>,
1182    shutdown: Arc<tokio::sync::watch::Sender<bool>>,
1183    shutdown_rx: tokio::sync::watch::Receiver<bool>,
1184    /// Channel to send signaling messages to relays
1185    signaling_tx: mpsc::Sender<SignalingMessage>,
1186    signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
1187    /// Optional content store for serving hash requests
1188    store: Option<Arc<dyn ContentStore>>,
1189    /// Peer classifier for pool assignment
1190    peer_classifier: PeerClassifier,
1191    /// Optional Nostr relay for data-channel relay messages
1192    nostr_relay: Option<Arc<NostrRelay>>,
1193    local_buses: Vec<SharedLocalNostrBus>,
1194    /// Channel for peer state events (connection success/failure)
1195    state_event_tx: mpsc::Sender<PeerStateEvent>,
1196    state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
1197    /// Channel for relayless mesh signaling frames received from peers.
1198    mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
1199    mesh_frame_rx: Option<mpsc::Receiver<(PeerId, MeshNostrFrame)>>,
1200    shared_router: Option<Arc<SharedProductionRouter>>,
1201    seen_frame_ids: Arc<Mutex<TimedSeenSet>>,
1202    seen_event_ids: Arc<Mutex<TimedSeenSet>>,
1203}
1204
1205impl WebRTCManager {
1206    /// Create a new WebRTC manager
1207    pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
1208        let pubkey = keys.public_key().to_hex();
1209        let my_peer_id = PeerId::new(pubkey);
1210        let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
1211        let (signaling_tx, signaling_rx) = mpsc::channel(100);
1212        let (state_event_tx, state_event_rx) = mpsc::channel(100);
1213        let (mesh_frame_tx, mesh_frame_rx) = mpsc::channel(256);
1214        let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1215            config.request_selection_strategy,
1216            config.request_fairness_enabled,
1217            config.request_dispatch,
1218            Duration::from_millis(config.message_timeout_ms),
1219            CashuRoutingConfig::default(),
1220            None,
1221            None,
1222        ));
1223
1224        // Default classifier: all peers go to 'other' pool
1225        let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
1226
1227        Self {
1228            config,
1229            my_peer_id,
1230            keys,
1231            state,
1232            shutdown: Arc::new(shutdown),
1233            shutdown_rx,
1234            signaling_tx,
1235            signaling_rx: Some(signaling_rx),
1236            store: None,
1237            peer_classifier,
1238            nostr_relay: None,
1239            local_buses: Vec::new(),
1240            state_event_tx,
1241            state_event_rx: Some(state_event_rx),
1242            mesh_frame_tx,
1243            mesh_frame_rx: Some(mesh_frame_rx),
1244            shared_router: None,
1245            seen_frame_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1246                SEEN_FRAME_CAP,
1247                SEEN_FRAME_TTL,
1248            ))),
1249            seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1250                SEEN_EVENT_CAP,
1251                SEEN_EVENT_TTL,
1252            ))),
1253        }
1254    }
1255
1256    /// Create a new WebRTC manager reusing an existing shared state object.
1257    pub fn new_with_state(keys: Keys, config: WebRTCConfig, state: Arc<WebRTCState>) -> Self {
1258        let mut manager = Self::new(keys, config);
1259        manager.state = state;
1260        manager
1261    }
1262
1263    /// Create a new WebRTC manager with a peer classifier
1264    pub fn new_with_classifier(
1265        keys: Keys,
1266        config: WebRTCConfig,
1267        classifier: PeerClassifier,
1268    ) -> Self {
1269        let mut manager = Self::new(keys, config);
1270        manager.peer_classifier = classifier;
1271        manager
1272    }
1273
1274    /// Create a new WebRTC manager with a content store for serving hash requests
1275    pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
1276        let mut manager = Self::new(keys, config);
1277        manager.store = Some(store);
1278        manager
1279    }
1280
1281    /// Create a new WebRTC manager with store and classifier
1282    pub fn new_with_store_and_classifier(
1283        keys: Keys,
1284        config: WebRTCConfig,
1285        store: Arc<dyn ContentStore>,
1286        classifier: PeerClassifier,
1287    ) -> Self {
1288        Self::new_with_store_and_classifier_and_cashu(
1289            keys,
1290            config,
1291            store,
1292            classifier,
1293            CashuRoutingConfig::default(),
1294            None,
1295            None,
1296        )
1297    }
1298
1299    pub fn new_with_state_and_store_and_classifier(
1300        keys: Keys,
1301        config: WebRTCConfig,
1302        state: Arc<WebRTCState>,
1303        store: Arc<dyn ContentStore>,
1304        classifier: PeerClassifier,
1305    ) -> Self {
1306        let mut manager = Self::new_with_state(keys, config, state);
1307        manager.store = Some(store);
1308        manager.peer_classifier = classifier;
1309        manager
1310    }
1311
1312    pub fn new_with_store_and_classifier_and_cashu(
1313        keys: Keys,
1314        config: WebRTCConfig,
1315        store: Arc<dyn ContentStore>,
1316        classifier: PeerClassifier,
1317        cashu_routing: CashuRoutingConfig,
1318        payment_client: Option<Arc<dyn CashuPaymentClient>>,
1319        mint_metadata: Option<Arc<CashuMintMetadataStore>>,
1320    ) -> Self {
1321        let mut manager = Self::new(keys, config);
1322        manager.state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1323            manager.config.request_selection_strategy,
1324            manager.config.request_fairness_enabled,
1325            manager.config.request_dispatch,
1326            Duration::from_millis(manager.config.message_timeout_ms),
1327            cashu_routing,
1328            payment_client,
1329            mint_metadata,
1330        ));
1331        manager.store = Some(store);
1332        manager.peer_classifier = classifier;
1333        manager
1334    }
1335
1336    /// Set the content store for serving hash requests
1337    pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
1338        self.store = Some(store);
1339    }
1340
1341    /// Set the peer classifier
1342    pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
1343        self.peer_classifier = classifier;
1344    }
1345
1346    /// Set the Nostr relay for data-channel relay messages
1347    pub fn set_nostr_relay(&mut self, relay: Arc<NostrRelay>) {
1348        self.nostr_relay = Some(relay);
1349    }
1350
1351    /// Get my peer ID
1352    pub fn my_peer_id(&self) -> &PeerId {
1353        &self.my_peer_id
1354    }
1355
1356    /// Get shared state for external access
1357    pub fn state(&self) -> Arc<WebRTCState> {
1358        self.state.clone()
1359    }
1360
1361    /// Cloneable shutdown handle for external lifecycle control.
1362    pub fn shutdown_signal(&self) -> Arc<tokio::sync::watch::Sender<bool>> {
1363        self.shutdown.clone()
1364    }
1365
1366    /// Signal shutdown
1367    pub fn shutdown(&self) {
1368        let _ = self.shutdown.send(true);
1369    }
1370
1371    /// Get connected peer count
1372    pub async fn connected_count(&self) -> usize {
1373        self.state
1374            .connected_count
1375            .load(std::sync::atomic::Ordering::Relaxed)
1376    }
1377
1378    /// Get all peer statuses
1379    pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
1380        self.state
1381            .peers
1382            .read()
1383            .await
1384            .values()
1385            .map(|p| PeerStatus {
1386                peer_id: p.peer_id.to_string(),
1387                pubkey: p.peer_id.pubkey.clone(),
1388                state: p.state.to_string(),
1389                direction: p.direction,
1390                connected_at: Some(p.last_seen),
1391                pool: p.pool,
1392            })
1393            .collect()
1394    }
1395
1396    /// Get pool counts
1397    /// Returns (follows_connected, follows_active, other_connected, other_active)
1398    /// "active" = Connected or Connecting (excludes Discovered and Failed)
1399    pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
1400        let peers = self.state.peers.read().await;
1401        let mut follows_connected = 0;
1402        let mut follows_active = 0;
1403        let mut other_connected = 0;
1404        let mut other_active = 0;
1405
1406        for entry in peers.values() {
1407            // Only count Connected or Connecting as "active" connections
1408            // Discovered peers are just seen hellos, not real connections
1409            let is_active = entry.state == ConnectionState::Connected
1410                || entry.state == ConnectionState::Connecting;
1411
1412            match entry.pool {
1413                PeerPool::Follows => {
1414                    if is_active {
1415                        follows_active += 1;
1416                    }
1417                    if entry.state == ConnectionState::Connected {
1418                        follows_connected += 1;
1419                    }
1420                }
1421                PeerPool::Other => {
1422                    if is_active {
1423                        other_active += 1;
1424                    }
1425                    if entry.state == ConnectionState::Connected {
1426                        other_connected += 1;
1427                    }
1428                }
1429            }
1430        }
1431
1432        (
1433            follows_connected,
1434            follows_active,
1435            other_connected,
1436            other_active,
1437        )
1438    }
1439
1440    fn local_hello_message(&self) -> SignalingMessage {
1441        SignalingMessage::Hello {
1442            peer_id: self.my_peer_id.to_string(),
1443            roots: Vec::new(),
1444        }
1445    }
1446
1447    fn local_bus_max_peers(&self, source: &str) -> Option<usize> {
1448        match source {
1449            "multicast" => Some(self.config.multicast.max_peers),
1450            WIFI_AWARE_SOURCE => Some(self.config.wifi_aware.max_peers),
1451            _ => None,
1452        }
1453    }
1454
1455    fn can_track_local_bus_peer(
1456        &self,
1457        source: &str,
1458        peer_key: &str,
1459        peers: &HashMap<String, PeerEntry>,
1460    ) -> bool {
1461        let Some(max_peers) = self.local_bus_max_peers(source) else {
1462            return true;
1463        };
1464        if peers.contains_key(peer_key) {
1465            return true;
1466        }
1467        if max_peers == 0 {
1468            return false;
1469        }
1470        let signal_path = PeerSignalPath::from_source_name(source);
1471        peers
1472            .values()
1473            .filter(|entry| {
1474                entry.signal_paths.contains(&signal_path) && entry.state != ConnectionState::Failed
1475            })
1476            .count()
1477            < max_peers
1478    }
1479
1480    /// Start the native peer router - connects transports and handles signaling.
1481    pub async fn run(&mut self) -> Result<()> {
1482        info!(
1483            "Starting peer router with peer ID: {}",
1484            self.my_peer_id.short()
1485        );
1486
1487        let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr::Event)>(100);
1488
1489        // Take the signaling receiver
1490        let mut signaling_rx = self
1491            .signaling_rx
1492            .take()
1493            .expect("signaling_rx already taken");
1494
1495        // Take the state event receiver
1496        let mut state_event_rx = self
1497            .state_event_rx
1498            .take()
1499            .expect("state_event_rx already taken");
1500        let mut mesh_frame_rx = self
1501            .mesh_frame_rx
1502            .take()
1503            .expect("mesh_frame_rx already taken");
1504
1505        if self.config.bluetooth.is_enabled() {
1506            let bluetooth = BluetoothMesh::new(self.config.bluetooth.clone());
1507            let context = BluetoothRuntimeContext {
1508                my_peer_id: self.my_peer_id.clone(),
1509                store: if bluetooth_nostr_only_mode() {
1510                    None
1511                } else {
1512                    self.store.clone()
1513                },
1514                nostr_relay: self.nostr_relay.clone(),
1515                mesh_frame_tx: self.mesh_frame_tx.clone(),
1516                registrar: BluetoothPeerRegistrar::new(
1517                    self.state.clone(),
1518                    self.peer_classifier.clone(),
1519                    self.config.pools.clone(),
1520                    self.config.bluetooth.max_peers,
1521                ),
1522            };
1523            let _ = bluetooth.start(context).await;
1524        }
1525
1526        // Create a shared write channel for all relay tasks
1527        let (relay_write_tx, _) = tokio::sync::broadcast::channel::<SignalingMessage>(100);
1528
1529        // Spawn relay connections
1530        for relay_url in &self.config.relays {
1531            let url = relay_url.clone();
1532            let event_tx = event_tx.clone();
1533            let shutdown_rx = self.shutdown_rx.clone();
1534            let keys = self.keys.clone();
1535            let relay_write_rx = relay_write_tx.subscribe();
1536
1537            tokio::spawn(async move {
1538                if let Err(e) =
1539                    Self::relay_task(url.clone(), event_tx, shutdown_rx, keys, relay_write_rx).await
1540                {
1541                    error!("Relay {} error: {}", url, e);
1542                }
1543            });
1544        }
1545
1546        if self.config.multicast.is_enabled() {
1547            if let Some(relay) = self.nostr_relay.clone() {
1548                match MulticastNostrBus::bind(
1549                    self.config.multicast.clone(),
1550                    self.keys.clone(),
1551                    relay,
1552                )
1553                .await
1554                {
1555                    Ok(bus) => {
1556                        let local_bus: SharedLocalNostrBus = bus.clone();
1557                        self.state.add_local_bus(local_bus.clone()).await;
1558                        self.local_buses.push(local_bus);
1559                        let shutdown_rx = self.shutdown_rx.clone();
1560                        let signaling_tx = event_tx.clone();
1561                        tokio::spawn(async move {
1562                            if let Err(err) = bus.run(shutdown_rx, signaling_tx).await {
1563                                error!("Multicast bus error: {}", err);
1564                            }
1565                        });
1566                    }
1567                    Err(err) => {
1568                        warn!("Failed to start multicast bus: {}", err);
1569                    }
1570                }
1571            } else {
1572                warn!("Multicast enabled but Nostr relay is unavailable");
1573            }
1574        }
1575
1576        if self.config.wifi_aware.is_enabled() {
1577            if let Some(relay) = self.nostr_relay.clone() {
1578                if let Some(bridge) = mobile_wifi_aware_bridge() {
1579                    let bus = WifiAwareNostrBus::new(
1580                        self.config.wifi_aware.clone(),
1581                        self.keys.clone(),
1582                        relay,
1583                        bridge,
1584                    );
1585                    let local_bus: SharedLocalNostrBus = bus.clone();
1586                    self.state.add_local_bus(local_bus.clone()).await;
1587                    self.local_buses.push(local_bus);
1588                    let shutdown_rx = self.shutdown_rx.clone();
1589                    let signaling_tx = event_tx.clone();
1590                    let local_peer_id = self.my_peer_id.to_string();
1591                    tokio::spawn(async move {
1592                        if let Err(err) = bus.run(local_peer_id, shutdown_rx, signaling_tx).await {
1593                            error!("Wi-Fi Aware bus error: {}", err);
1594                        }
1595                    });
1596                } else {
1597                    warn!("Wi-Fi Aware enabled but no mobile bridge is installed");
1598                }
1599            } else {
1600                warn!("Wi-Fi Aware enabled but Nostr relay is unavailable");
1601            }
1602        }
1603
1604        if self.config.signaling_enabled {
1605            let transport = Arc::new(RouterSignalingBridge::new(
1606                self.my_peer_id.to_string(),
1607                self.signaling_tx.clone(),
1608            ));
1609            let factory = Arc::new(SharedRouterPeerFactory::new(
1610                self.my_peer_id.clone(),
1611                self.signaling_tx.clone(),
1612                self.config.stun_servers.clone(),
1613                self.store.clone(),
1614                self.state.clone(),
1615                self.state_event_tx.clone(),
1616                self.nostr_relay.clone(),
1617                self.mesh_frame_tx.clone(),
1618                self.peer_classifier.clone(),
1619            ));
1620            let (classifier_tx, mut classifier_rx) = mpsc::channel::<SharedClassifyRequest>(32);
1621            let classifier = self.peer_classifier.clone();
1622            tokio::spawn(async move {
1623                while let Some(request) = classifier_rx.recv().await {
1624                    let _ = request.response.send(classifier(&request.pubkey));
1625                }
1626            });
1627
1628            let mut router = MeshRouter::new(
1629                self.my_peer_id.to_string(),
1630                transport,
1631                factory.clone(),
1632                self.config.pools.clone(),
1633                self.config.debug,
1634            );
1635            router.set_classifier(classifier_tx);
1636            self.shared_router = Some(Arc::new(router));
1637        }
1638
1639        // Process incoming events and outgoing signaling messages
1640        let mut shutdown_rx = self.shutdown_rx.clone();
1641        // Cleanup interval - run every 30 seconds as a fallback (not for real-time sync)
1642        let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
1643        let mut hello_ticker =
1644            tokio::time::interval(Duration::from_millis(self.config.hello_interval_ms));
1645        if self.config.signaling_enabled {
1646            if let Some(shared_router) = self.shared_router.as_ref() {
1647                let _ = shared_router.send_hello(Vec::new()).await;
1648            } else {
1649                self.dispatch_signaling_message(self.local_hello_message(), &relay_write_tx)
1650                    .await;
1651            }
1652        }
1653        loop {
1654            tokio::select! {
1655                _ = shutdown_rx.changed() => {
1656                    if *shutdown_rx.borrow() {
1657                        info!("WebRTC manager shutting down");
1658                        break;
1659                    }
1660                }
1661                Some((relay, event)) = event_rx.recv() => {
1662                    if let Err(e) = self
1663                        .handle_event(&relay, &event, self.shared_router.as_ref())
1664                        .await
1665                    {
1666                        debug!("Error handling event from {}: {}", relay, e);
1667                    }
1668                }
1669                Some(msg) = signaling_rx.recv() => {
1670                    self.dispatch_signaling_message(msg, &relay_write_tx).await;
1671                }
1672                Some(event) = state_event_rx.recv() => {
1673                    // Handle peer state events (connected, failed, disconnected)
1674                    self.handle_peer_state_event(event, &relay_write_tx).await;
1675                }
1676                Some((from_peer_id, frame)) = mesh_frame_rx.recv() => {
1677                    self.handle_mesh_frame(from_peer_id, frame).await;
1678                }
1679                _ = hello_ticker.tick(), if self.config.signaling_enabled => {
1680                    if let Some(shared_router) = self.shared_router.as_ref() {
1681                        let _ = shared_router.send_hello(Vec::new()).await;
1682                    } else {
1683                        self.dispatch_signaling_message(self.local_hello_message(), &relay_write_tx)
1684                            .await;
1685                    }
1686                }
1687                _ = cleanup_interval.tick() => {
1688                    // Periodic cleanup of stale peers and state sync (fallback)
1689                    self.cleanup_stale_peers().await;
1690                }
1691            }
1692        }
1693
1694        Ok(())
1695    }
1696
1697    /// Connect to a single relay and handle messages
1698    async fn relay_task(
1699        url: String,
1700        event_tx: mpsc::Sender<(String, nostr::Event)>,
1701        mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
1702        keys: Keys,
1703        mut signaling_rx: tokio::sync::broadcast::Receiver<SignalingMessage>,
1704    ) -> Result<()> {
1705        info!("Connecting to relay: {}", url);
1706
1707        let (ws_stream, _) = connect_async(&url).await?;
1708        let (mut write, mut read) = ws_stream.split();
1709
1710        // Subscribe to webrtc events - two filters:
1711        // 1. Hello messages: kind 25050 with #l: "hello" tag
1712        // 2. Directed messages: kind 25050 with #p tag (our pubkey)
1713        let hello_filter = Filter::new()
1714            .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
1715            .custom_tag(
1716                nostr::SingleLetterTag::lowercase(nostr::Alphabet::L),
1717                vec![HELLO_TAG],
1718            )
1719            .since(nostr::Timestamp::now() - Duration::from_secs(60));
1720
1721        let directed_filter = Filter::new()
1722            .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
1723            .custom_tag(
1724                nostr::SingleLetterTag::lowercase(nostr::Alphabet::P),
1725                vec![keys.public_key().to_hex()],
1726            )
1727            .since(nostr::Timestamp::now() - Duration::from_secs(60));
1728
1729        let sub_id = nostr::SubscriptionId::generate();
1730        let sub_msg = ClientMessage::req(sub_id.clone(), vec![hello_filter, directed_filter]);
1731        write.send(Message::Text(sub_msg.as_json())).await?;
1732
1733        info!(
1734            "Subscribed to {} for WebRTC events (kind {})",
1735            url, WEBRTC_KIND
1736        );
1737
1738        loop {
1739            tokio::select! {
1740                _ = shutdown_rx.changed() => {
1741                    if *shutdown_rx.borrow() {
1742                        break;
1743                    }
1744                }
1745                // Handle outgoing signaling messages
1746                Ok(signaling_msg) = signaling_rx.recv() => {
1747                    info!("Sending {} via {}", signaling_msg.msg_type(), url);
1748                    if let Ok(event) = Self::create_signaling_event(&keys, &signaling_msg).await {
1749                        let event_id = event.id.to_string();
1750                        let msg = ClientMessage::event(event);
1751                        if write.send(Message::Text(msg.as_json())).await.is_ok() {
1752                            info!("Sent {} to {} (event id: {})", signaling_msg.msg_type(), url, &event_id[..16]);
1753                        }
1754                    }
1755                }
1756                msg = read.next() => {
1757                    match msg {
1758                        Some(Ok(Message::Text(text))) => {
1759                            if let Ok(RelayMessage::Event { event, .. }) =
1760                                RelayMessage::from_json(&text)
1761                            {
1762                                let _ = event_tx.send((url.clone(), *event)).await;
1763                            }
1764                        }
1765                        Some(Err(e)) => {
1766                            error!("WebSocket error from {}: {}", url, e);
1767                            break;
1768                        }
1769                        None => {
1770                            warn!("WebSocket closed: {}", url);
1771                            break;
1772                        }
1773                        _ => {}
1774                    }
1775                }
1776            }
1777        }
1778
1779        Ok(())
1780    }
1781
1782    async fn mark_seen_frame_id(&self, frame_id: String) -> bool {
1783        let mut seen = self.seen_frame_ids.lock().await;
1784        seen.insert_if_new(frame_id)
1785    }
1786
1787    async fn mark_seen_event_id(&self, event_id: String) -> bool {
1788        let mut seen = self.seen_event_ids.lock().await;
1789        seen.insert_if_new(event_id)
1790    }
1791
1792    async fn dispatch_signaling_message(
1793        &self,
1794        msg: SignalingMessage,
1795        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1796    ) {
1797        if !self.config.signaling_enabled {
1798            debug!(
1799                "Skipping signaling message {} because WebRTC signaling is disabled",
1800                msg.msg_type()
1801            );
1802            return;
1803        }
1804
1805        if relay_write_tx.send(msg.clone()).is_err() {
1806            debug!(
1807                "No relay subscribers for signaling message {}",
1808                msg.msg_type()
1809            );
1810        }
1811
1812        let event = match Self::create_signaling_event(&self.keys, &msg).await {
1813            Ok(event) => event,
1814            Err(e) => {
1815                debug!("Failed to create signaling event for mesh dispatch: {}", e);
1816                return;
1817            }
1818        };
1819
1820        for bus in &self.local_buses {
1821            if let Err(err) = bus.broadcast_event(&event).await {
1822                debug!(
1823                    "Failed to broadcast signaling event over {} ({}): {}",
1824                    bus.source_name(),
1825                    msg.msg_type(),
1826                    err
1827                );
1828            }
1829        }
1830
1831        let mut frame =
1832            MeshNostrFrame::new_event(event, &self.my_peer_id.to_string(), MESH_DEFAULT_HTL);
1833        if !self.mark_seen_frame_id(frame.frame_id.clone()).await {
1834            self.state.record_mesh_duplicate_drop();
1835            return;
1836        }
1837        if !self.mark_seen_event_id(frame.event().id.to_hex()).await {
1838            self.state.record_mesh_duplicate_drop();
1839            return;
1840        }
1841
1842        // Keep the sender peer id stable even if this is forwarded later.
1843        frame.sender_peer_id = self.my_peer_id.to_string();
1844        let forwarded = self.forward_mesh_frame(&frame, None).await;
1845        if forwarded > 0 {
1846            self.state.record_mesh_forwarded(forwarded as u64);
1847        }
1848    }
1849
1850    async fn forward_mesh_frame(
1851        &self,
1852        frame: &MeshNostrFrame,
1853        exclude_peer_id: Option<&str>,
1854    ) -> usize {
1855        let peers = self.state.peers.read().await;
1856        let peer_refs: Vec<_> = peers
1857            .values()
1858            .filter(|entry| entry.state == ConnectionState::Connected)
1859            .filter(|entry| {
1860                entry
1861                    .peer
1862                    .as_ref()
1863                    .map(|peer| peer.is_ready())
1864                    .unwrap_or(false)
1865            })
1866            .filter(|entry| {
1867                exclude_peer_id
1868                    .map(|exclude| exclude != entry.peer_id.to_string())
1869                    .unwrap_or(true)
1870            })
1871            .filter_map(|entry| {
1872                entry.peer.as_ref().map(|peer| {
1873                    (
1874                        entry.peer_id.to_string(),
1875                        entry.peer_id.short(),
1876                        peer.clone(),
1877                        peer.htl_config(),
1878                    )
1879                })
1880            })
1881            .collect();
1882        drop(peers);
1883
1884        let mut forwarded = 0usize;
1885        for (_peer_key, peer_short, peer, htl_cfg) in peer_refs {
1886            let next_htl = decrement_htl_with_policy(frame.htl, &MESH_EVENT_POLICY, &htl_cfg);
1887            if !should_forward_htl(next_htl) {
1888                continue;
1889            }
1890
1891            let mut outbound = frame.clone();
1892            outbound.htl = next_htl;
1893            if peer.send_mesh_frame_text(&outbound).await.is_ok() {
1894                forwarded += 1;
1895            } else {
1896                debug!("Failed to forward mesh frame to {}", peer_short);
1897            }
1898        }
1899
1900        forwarded
1901    }
1902
1903    async fn handle_mesh_frame(&self, from_peer_id: PeerId, frame: MeshNostrFrame) {
1904        if let Err(reason) = validate_mesh_frame(&frame) {
1905            debug!(
1906                "Ignoring mesh frame from {} (invalid: {})",
1907                from_peer_id.short(),
1908                reason
1909            );
1910            return;
1911        }
1912
1913        if !self.mark_seen_frame_id(frame.frame_id.clone()).await {
1914            self.state.record_mesh_duplicate_drop();
1915            return;
1916        }
1917
1918        let event = match &frame.payload {
1919            MeshNostrPayload::Event { event } => event.clone(),
1920        };
1921
1922        if !self.mark_seen_event_id(event.id.to_hex()).await {
1923            self.state.record_mesh_duplicate_drop();
1924            return;
1925        }
1926
1927        if event.verify().is_err() {
1928            debug!(
1929                "Ignoring mesh event from {} due to invalid signature",
1930                from_peer_id.short()
1931            );
1932            return;
1933        }
1934
1935        self.state.record_mesh_received();
1936
1937        if let Err(e) = self
1938            .handle_event("mesh", &event, self.shared_router.as_ref())
1939            .await
1940        {
1941            debug!(
1942                "Error handling mesh event from {}: {}",
1943                from_peer_id.short(),
1944                e
1945            );
1946        }
1947
1948        let forwarded = self
1949            .forward_mesh_frame(&frame, Some(&from_peer_id.to_string()))
1950            .await;
1951        if forwarded > 0 {
1952            self.state.record_mesh_forwarded(forwarded as u64);
1953        }
1954    }
1955
1956    /// Create a signaling event
1957    ///
1958    /// For directed messages (offer, answer, candidate, candidates), use NIP-17 style
1959    /// gift wrapping with ephemeral keys for privacy.
1960    /// Hello messages use kind 25050 with #l: "hello" tag and peerId.
1961    async fn create_signaling_event(keys: &Keys, msg: &SignalingMessage) -> Result<nostr::Event> {
1962        encode_signaling_event(
1963            keys,
1964            msg.peer_id(),
1965            msg,
1966            Kind::Ephemeral(WEBRTC_KIND as u16),
1967        )
1968        .map_err(|e| anyhow::anyhow!(e.to_string()))
1969    }
1970
1971    /// Handle an incoming event
1972    ///
1973    /// Messages may be:
1974    /// 1. Hello messages: kind 25050 with #l: "hello" tag and peerId
1975    /// 2. Gift-wrapped directed messages: kind 25050 with #p tag, encrypted with ephemeral key
1976    async fn handle_event(
1977        &self,
1978        relay: &str,
1979        event: &nostr::Event,
1980        shared_router: Option<&Arc<SharedProductionRouter>>,
1981    ) -> Result<()> {
1982        if !self.config.signaling_enabled {
1983            return Ok(());
1984        }
1985
1986        let Some(shared_router) = shared_router else {
1987            return Ok(());
1988        };
1989
1990        let Some(msg) = decode_signaling_event(
1991            event,
1992            &self.my_peer_id.to_string(),
1993            &self.keys.public_key().to_hex(),
1994            &self.keys,
1995        ) else {
1996            return Ok(());
1997        };
1998
1999        if matches!(
2000            msg,
2001            SignalingMessage::Hello { .. } | SignalingMessage::Offer { .. }
2002        ) {
2003            let peers = self.state.peers.read().await;
2004            if !self.can_track_local_bus_peer(relay, msg.peer_id(), &peers) {
2005                return Ok(());
2006            }
2007        }
2008
2009        debug!(
2010            "Received {} from {} via {}",
2011            msg.msg_type(),
2012            msg.peer_id(),
2013            relay
2014        );
2015        let peer_id = msg.peer_id().to_string();
2016        shared_router
2017            .handle_message(msg)
2018            .await
2019            .map_err(|e| anyhow::anyhow!(e.to_string()))?;
2020        remember_peer_signal_path(self.state.as_ref(), &peer_id, relay).await;
2021
2022        Ok(())
2023    }
2024
2025    /// Handle peer state change events from peer connections
2026    async fn handle_peer_state_event(
2027        &self,
2028        event: PeerStateEvent,
2029        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
2030    ) {
2031        match event {
2032            PeerStateEvent::Connected(peer_id) => {
2033                let peer_key = peer_id.to_string();
2034                let mut emit_hello = false;
2035                let mut peers = self.state.peers.write().await;
2036                if let Some(entry) = peers.get_mut(&peer_key) {
2037                    if entry.state != ConnectionState::Connected {
2038                        info!("Peer {} connected (via state event)", peer_id.short());
2039                        entry.state = ConnectionState::Connected;
2040                        emit_hello = true;
2041                        // Update connected count
2042                        self.state
2043                            .connected_count
2044                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2045                    }
2046                }
2047                drop(peers);
2048                if emit_hello {
2049                    if let Some(shared_router) = self.shared_router.as_ref() {
2050                        let _ = shared_router.send_hello(Vec::new()).await;
2051                    } else {
2052                        self.dispatch_signaling_message(self.local_hello_message(), relay_write_tx)
2053                            .await;
2054                    }
2055                }
2056            }
2057            PeerStateEvent::Failed(peer_id) => {
2058                let peer_key = peer_id.to_string();
2059                info!(
2060                    "Peer {} connection failed - removing from pool",
2061                    peer_id.short()
2062                );
2063                let removed = {
2064                    let mut peers = self.state.peers.write().await;
2065                    peers.remove(&peer_key)
2066                };
2067                if let Some(entry) = removed {
2068                    // Decrement connected count if was connected
2069                    if entry.state == ConnectionState::Connected {
2070                        self.state
2071                            .connected_count
2072                            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
2073                    }
2074                    // Close the peer connection if it exists
2075                    if let Some(peer) = entry.peer {
2076                        let _ = peer.close().await;
2077                    }
2078                }
2079                if let Some(shared_router) = self.shared_router.as_ref() {
2080                    if let Some(channel) = shared_router.remove_peer(&peer_key).await {
2081                        channel.close().await;
2082                    }
2083                }
2084            }
2085            PeerStateEvent::Disconnected(peer_id) => {
2086                let peer_key = peer_id.to_string();
2087                info!("Peer {} disconnected - removing from pool", peer_id.short());
2088                let removed = {
2089                    let mut peers = self.state.peers.write().await;
2090                    peers.remove(&peer_key)
2091                };
2092                if let Some(entry) = removed {
2093                    // Decrement connected count if was connected
2094                    if entry.state == ConnectionState::Connected {
2095                        self.state
2096                            .connected_count
2097                            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
2098                    }
2099                    // Close the peer connection if it exists
2100                    if let Some(peer) = entry.peer {
2101                        let _ = peer.close().await;
2102                    }
2103                }
2104                if let Some(shared_router) = self.shared_router.as_ref() {
2105                    if let Some(channel) = shared_router.remove_peer(&peer_key).await {
2106                        channel.close().await;
2107                    }
2108                }
2109            }
2110        }
2111    }
2112
2113    /// Cleanup stale peers and sync connection states (fallback, runs every 30s)
2114    async fn cleanup_stale_peers(&self) {
2115        let mut peers = self.state.peers.write().await;
2116        let mut connected_count = 0;
2117        let mut to_remove = Vec::new();
2118        let stale_timeout = Duration::from_secs(60); // Remove peers stuck in Discovered/Connecting for 60s
2119
2120        for (key, entry) in peers.iter_mut() {
2121            if let Some(ref peer) = entry.peer {
2122                // Sync connected state as fallback (in case event was missed)
2123                if peer.is_connected() {
2124                    if entry.state != ConnectionState::Connected {
2125                        info!(
2126                            "Peer {} is now connected (sync fallback)",
2127                            entry.peer_id.short()
2128                        );
2129                        entry.state = ConnectionState::Connected;
2130                    }
2131                    connected_count += 1;
2132                } else if entry.state == ConnectionState::Connected {
2133                    info!(
2134                        "Removing disconnected peer {} after transport closed",
2135                        entry.peer_id.short()
2136                    );
2137                    to_remove.push(key.clone());
2138                } else if entry.state == ConnectionState::Connecting
2139                    && entry.last_seen.elapsed() > stale_timeout
2140                {
2141                    // Peer stuck in Connecting for too long - mark for removal
2142                    info!(
2143                        "Removing stale peer {} (stuck in Connecting for {:?})",
2144                        entry.peer_id.short(),
2145                        entry.last_seen.elapsed()
2146                    );
2147                    to_remove.push(key.clone());
2148                }
2149            } else if entry.state == ConnectionState::Discovered
2150                && entry.last_seen.elapsed() > stale_timeout
2151            {
2152                // Discovered peer with no actual connection - remove
2153                debug!("Removing stale discovered peer {}", entry.peer_id.short());
2154                to_remove.push(key.clone());
2155            }
2156        }
2157
2158        // Remove stale peers
2159        let mut removed_peers = Vec::new();
2160        for key in to_remove {
2161            if let Some(entry) = peers.remove(&key) {
2162                removed_peers.push(entry);
2163            }
2164        }
2165        drop(peers);
2166
2167        for entry in removed_peers {
2168            if let Some(peer) = entry.peer {
2169                let _ = peer.close().await;
2170            }
2171        }
2172
2173        self.state
2174            .connected_count
2175            .store(connected_count, std::sync::atomic::Ordering::Relaxed);
2176    }
2177}
2178
2179// Keep the old PeerState for backward compatibility with tests
2180#[allow(dead_code)]
2181#[derive(Debug, Clone)]
2182pub struct PeerState {
2183    pub peer_id: PeerId,
2184    pub direction: PeerDirection,
2185    pub state: String,
2186    pub last_seen: Instant,
2187}
2188
2189#[cfg(test)]
2190mod tests {
2191    use super::*;
2192    use crate::webrtc::root_events::PeerRootEvent;
2193    use crate::webrtc::session::TestMeshPeer;
2194    use crate::webrtc::SelectionStrategy;
2195    use anyhow::Result as AnyResult;
2196    use async_trait::async_trait;
2197    use hashtree_network::{build_hedged_wave_plan, normalize_dispatch_config};
2198    use nostr::{EventBuilder, Keys, Tag};
2199    use std::time::Duration;
2200
2201    struct TestLocalBus {
2202        source: &'static str,
2203        root: Option<PeerRootEvent>,
2204    }
2205
2206    #[async_trait]
2207    impl super::super::LocalNostrBus for TestLocalBus {
2208        fn source_name(&self) -> &'static str {
2209            self.source
2210        }
2211
2212        async fn broadcast_event(&self, _event: &nostr::Event) -> AnyResult<()> {
2213            Ok(())
2214        }
2215
2216        async fn query_root(
2217            &self,
2218            _owner_pubkey: &str,
2219            _tree_name: &str,
2220            _timeout: Duration,
2221        ) -> Option<PeerRootEvent> {
2222            self.root.clone()
2223        }
2224    }
2225
2226    #[test]
2227    fn root_event_from_peer_extracts_tags() {
2228        let keys = Keys::generate();
2229        let hash = "ab".repeat(32);
2230        let event = EventBuilder::new(
2231            Kind::Custom(super::super::root_events::HASHTREE_KIND),
2232            "",
2233            [
2234                Tag::parse(&["d", "repo"]).unwrap(),
2235                Tag::parse(&["l", super::super::root_events::HASHTREE_LABEL]).unwrap(),
2236                Tag::parse(&["hash", &hash]).unwrap(),
2237                Tag::parse(&["encryptedKey", &"11".repeat(32)]).unwrap(),
2238            ],
2239        )
2240        .to_event(&keys)
2241        .unwrap();
2242
2243        let parsed = root_event_from_peer(&event, "peer-a", "repo").unwrap();
2244        let expected_encrypted = "11".repeat(32);
2245        assert_eq!(parsed.hash, hash);
2246        assert_eq!(parsed.peer_id, "peer-a");
2247        assert_eq!(
2248            parsed.encrypted_key.as_deref(),
2249            Some(expected_encrypted.as_str())
2250        );
2251        assert!(parsed.key.is_none());
2252    }
2253
2254    #[test]
2255    fn pick_latest_event_prefers_higher_event_id_on_timestamp_tie() {
2256        let keys = Keys::generate();
2257        let created_at = nostr::Timestamp::from_secs(1_700_000_000);
2258        let event_a = EventBuilder::new(
2259            Kind::Custom(super::super::root_events::HASHTREE_KIND),
2260            "",
2261            [],
2262        )
2263        .custom_created_at(created_at)
2264        .to_event(&keys)
2265        .unwrap();
2266        let event_b = EventBuilder::new(
2267            Kind::Custom(super::super::root_events::HASHTREE_KIND),
2268            "",
2269            [],
2270        )
2271        .custom_created_at(created_at)
2272        .to_event(&keys)
2273        .unwrap();
2274
2275        let expected = if event_a.id > event_b.id {
2276            event_a.id
2277        } else {
2278            event_b.id
2279        };
2280        let picked = pick_latest_event([&event_a, &event_b]).unwrap();
2281        assert_eq!(picked.id, expected);
2282    }
2283
2284    #[tokio::test]
2285    async fn resolve_root_from_local_buses_returns_source_and_first_match() {
2286        let state = WebRTCState::new();
2287        let root = PeerRootEvent {
2288            hash: "ab".repeat(32),
2289            key: None,
2290            encrypted_key: None,
2291            self_encrypted_key: None,
2292            event_id: "event-1".to_string(),
2293            created_at: 1,
2294            peer_id: "bus-peer".to_string(),
2295        };
2296
2297        state
2298            .set_local_buses(vec![
2299                Arc::new(TestLocalBus {
2300                    source: "empty",
2301                    root: None,
2302                }),
2303                Arc::new(TestLocalBus {
2304                    source: "mock-bus",
2305                    root: Some(root.clone()),
2306                }),
2307            ])
2308            .await;
2309
2310        let resolved = state
2311            .resolve_root_from_local_buses_with_source("owner", "tree", Duration::from_millis(10))
2312            .await
2313            .expect("expected root from local bus");
2314
2315        assert_eq!(resolved.0, "mock-bus");
2316        assert_eq!(resolved.1.hash, root.hash);
2317        assert_eq!(resolved.1.peer_id, root.peer_id);
2318    }
2319
2320    #[tokio::test]
2321    async fn can_track_local_bus_peer_enforces_wifi_aware_limit() {
2322        let keys = Keys::generate();
2323        let mut config = WebRTCConfig::default();
2324        config.wifi_aware.enabled = true;
2325        config.wifi_aware.max_peers = 1;
2326        let manager = WebRTCManager::new(keys, config);
2327        let existing_peer = PeerId::new("peer-a".to_string());
2328        let existing_key = existing_peer.to_string();
2329        let mut peers = HashMap::new();
2330        peers.insert(
2331            existing_key.clone(),
2332            PeerEntry {
2333                peer_id: existing_peer,
2334                direction: PeerDirection::Outbound,
2335                state: ConnectionState::Discovered,
2336                last_seen: Instant::now(),
2337                peer: None,
2338                pool: PeerPool::Other,
2339                transport: PeerTransport::WebRtc,
2340                signal_paths: BTreeSet::from([PeerSignalPath::WifiAware]),
2341                bytes_sent: 0,
2342                bytes_received: 0,
2343            },
2344        );
2345
2346        assert!(manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, &existing_key, &peers,));
2347        assert!(!manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, "peer-b:sess-b", &peers,));
2348        assert!(manager.can_track_local_bus_peer("relay", "peer-c:sess-c", &peers));
2349    }
2350
2351    #[tokio::test]
2352    async fn request_from_peers_with_source_accepts_generic_mesh_peers() {
2353        let state = WebRTCState::new();
2354        let data = b"offline-over-ble".to_vec();
2355        let hash_hex = hex::encode(hashtree_core::sha256(&data));
2356
2357        state.peers.write().await.insert(
2358            "peer-a".to_string(),
2359            PeerEntry {
2360                peer_id: PeerId::new("peer-a-pub".to_string()),
2361                direction: PeerDirection::Outbound,
2362                state: ConnectionState::Connected,
2363                last_seen: Instant::now(),
2364                peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_response(Some(
2365                    data.clone(),
2366                )))),
2367                pool: PeerPool::Other,
2368                transport: PeerTransport::Bluetooth,
2369                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2370                bytes_sent: 0,
2371                bytes_received: 0,
2372            },
2373        );
2374
2375        let resolved = state
2376            .request_from_peers_with_source(&hash_hex)
2377            .await
2378            .expect("expected mock mesh peer response");
2379
2380        assert_eq!(resolved.0, data);
2381        assert_eq!(resolved.1, "peer-a-pub");
2382    }
2383
2384    #[tokio::test]
2385    async fn request_from_peers_with_source_waits_full_timeout_for_last_generic_peer() {
2386        let state = WebRTCState::new_with_routing_and_cashu(
2387            SelectionStrategy::TitForTat,
2388            true,
2389            RequestDispatchConfig {
2390                initial_fanout: 1,
2391                hedge_fanout: 1,
2392                max_fanout: 1,
2393                hedge_interval_ms: 50,
2394            },
2395            Duration::from_millis(400),
2396            CashuRoutingConfig::default(),
2397            None,
2398            None,
2399        );
2400        let data = b"slow-offline-over-ble".to_vec();
2401        let hash_hex = hex::encode(hashtree_core::sha256(&data));
2402
2403        state.peers.write().await.insert(
2404            "peer-a".to_string(),
2405            PeerEntry {
2406                peer_id: PeerId::new("peer-a-pub".to_string()),
2407                direction: PeerDirection::Outbound,
2408                state: ConnectionState::Connected,
2409                last_seen: Instant::now(),
2410                peer: Some(MeshPeer::mock_for_tests(
2411                    TestMeshPeer::with_delayed_response(
2412                        Some(data.clone()),
2413                        Duration::from_millis(200),
2414                    ),
2415                )),
2416                pool: PeerPool::Other,
2417                transport: PeerTransport::Bluetooth,
2418                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2419                bytes_sent: 0,
2420                bytes_received: 0,
2421            },
2422        );
2423
2424        let resolved = state
2425            .request_from_peers_with_source(&hash_hex)
2426            .await
2427            .expect("expected delayed mock mesh peer response");
2428
2429        assert_eq!(resolved.0, data);
2430        assert_eq!(resolved.1, "peer-a-pub");
2431    }
2432
2433    #[tokio::test]
2434    async fn dispatch_signaling_message_is_noop_when_signaling_disabled() {
2435        let keys = Keys::generate();
2436        let mut config = WebRTCConfig::default();
2437        config.signaling_enabled = false;
2438        let manager = WebRTCManager::new(keys, config);
2439        let peer_id = PeerId::new("peer-a-pub".to_string());
2440        let peer_key = peer_id.to_string();
2441        let peer = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
2442        let peer_ref = peer.mock_ref().expect("mock peer").clone();
2443
2444        manager.state.peers.write().await.insert(
2445            peer_key,
2446            PeerEntry {
2447                peer_id,
2448                direction: PeerDirection::Outbound,
2449                state: ConnectionState::Connected,
2450                last_seen: Instant::now(),
2451                peer: Some(peer),
2452                pool: PeerPool::Other,
2453                transport: PeerTransport::Bluetooth,
2454                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2455                bytes_sent: 0,
2456                bytes_received: 0,
2457            },
2458        );
2459
2460        let (relay_tx, _) = tokio::sync::broadcast::channel(4);
2461        manager
2462            .dispatch_signaling_message(
2463                SignalingMessage::Hello {
2464                    peer_id: manager.my_peer_id.to_string(),
2465                    roots: Vec::new(),
2466                },
2467                &relay_tx,
2468            )
2469            .await;
2470
2471        assert_eq!(peer_ref.sent_frame_count().await, 0);
2472    }
2473
2474    #[tokio::test]
2475    async fn failed_peer_cleanup_does_not_hold_peer_map_lock_while_closing() {
2476        let keys = Keys::generate();
2477        let manager = Arc::new(WebRTCManager::new(keys, WebRTCConfig::default()));
2478        let peer_id = PeerId::new("peer-a-pub".to_string());
2479        let peer_key = peer_id.to_string();
2480
2481        manager.state.peers.write().await.insert(
2482            peer_key.clone(),
2483            PeerEntry {
2484                peer_id: peer_id.clone(),
2485                direction: PeerDirection::Outbound,
2486                state: ConnectionState::Connected,
2487                last_seen: Instant::now(),
2488                peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_close(
2489                    Duration::from_millis(200),
2490                ))),
2491                pool: PeerPool::Other,
2492                transport: PeerTransport::Bluetooth,
2493                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2494                bytes_sent: 0,
2495                bytes_received: 0,
2496            },
2497        );
2498
2499        let (relay_tx, _) = tokio::sync::broadcast::channel(4);
2500        let manager_for_task = manager.clone();
2501        let peer_id_for_task = peer_id.clone();
2502        let cleanup_task = tokio::spawn(async move {
2503            manager_for_task
2504                .handle_peer_state_event(PeerStateEvent::Failed(peer_id_for_task), &relay_tx)
2505                .await;
2506        });
2507
2508        tokio::time::sleep(Duration::from_millis(20)).await;
2509
2510        let remaining = tokio::time::timeout(Duration::from_millis(50), async {
2511            manager.state.peers.read().await.len()
2512        })
2513        .await
2514        .expect("peer map read should not block on close");
2515
2516        assert_eq!(remaining, 0);
2517        cleanup_task.await.expect("cleanup task");
2518    }
2519
2520    #[tokio::test]
2521    async fn resolve_root_from_peers_does_not_hold_peer_map_lock_while_querying() {
2522        let keys = Keys::generate();
2523        let manager = Arc::new(WebRTCManager::new(keys.clone(), WebRTCConfig::default()));
2524        let owner_keys = Keys::generate();
2525        let owner_pubkey = owner_keys.public_key().to_hex();
2526        let tree_name = "video";
2527        let hash = "ab".repeat(32);
2528        let event = EventBuilder::new(
2529            Kind::Custom(super::super::root_events::HASHTREE_KIND),
2530            "",
2531            [
2532                Tag::parse(&["d", tree_name]).unwrap(),
2533                Tag::parse(&["l", super::super::root_events::HASHTREE_LABEL]).unwrap(),
2534                Tag::parse(&["hash", &hash]).unwrap(),
2535            ],
2536        )
2537        .to_event(&owner_keys)
2538        .unwrap();
2539
2540        let peer_id = PeerId::new("peer-a-pub".to_string());
2541        let peer_key = peer_id.to_string();
2542
2543        manager.state.peers.write().await.insert(
2544            peer_key.clone(),
2545            PeerEntry {
2546                peer_id,
2547                direction: PeerDirection::Outbound,
2548                state: ConnectionState::Connected,
2549                last_seen: Instant::now(),
2550                peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_events(
2551                    vec![event],
2552                    Duration::from_millis(200),
2553                ))),
2554                pool: PeerPool::Other,
2555                transport: PeerTransport::Bluetooth,
2556                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2557                bytes_sent: 0,
2558                bytes_received: 0,
2559            },
2560        );
2561
2562        let manager_for_task = manager.clone();
2563        let owner_pubkey_for_task = owner_pubkey.clone();
2564        let resolve_task = tokio::spawn(async move {
2565            manager_for_task
2566                .state
2567                .resolve_root_from_peers(
2568                    &owner_pubkey_for_task,
2569                    tree_name,
2570                    Duration::from_millis(500),
2571                )
2572                .await
2573        });
2574
2575        tokio::time::sleep(Duration::from_millis(20)).await;
2576
2577        let manager_for_writer = manager.clone();
2578        let peer_key_for_writer = peer_key.clone();
2579        let writer_task = tokio::spawn(async move {
2580            let mut peers = manager_for_writer.state.peers.write().await;
2581            if let Some(entry) = peers.get_mut(&peer_key_for_writer) {
2582                entry.bytes_received += 1;
2583            }
2584        });
2585
2586        tokio::time::sleep(Duration::from_millis(20)).await;
2587
2588        let status_count = tokio::time::timeout(Duration::from_millis(50), async {
2589            manager.state.peers.read().await.len()
2590        })
2591        .await
2592        .expect("peer map read should not block on root query");
2593
2594        assert_eq!(status_count, 1);
2595        assert!(resolve_task.await.expect("resolve task").is_some());
2596        writer_task.await.expect("writer task");
2597    }
2598
2599    #[test]
2600    fn test_formal_timed_seen_set_rejects_duplicates() {
2601        let mut seen = TimedSeenSet::new(4, Duration::from_secs(60));
2602        assert!(seen.insert_if_new("frame-1".to_string()));
2603        assert!(!seen.insert_if_new("frame-1".to_string()));
2604        assert!(seen.insert_if_new("frame-2".to_string()));
2605    }
2606
2607    #[test]
2608    fn test_formal_timed_seen_set_evicts_oldest_when_capacity_exceeded() {
2609        let mut seen = TimedSeenSet::new(2, Duration::from_secs(60));
2610        assert!(seen.insert_if_new("a".to_string()));
2611        assert!(seen.insert_if_new("b".to_string()));
2612        assert!(seen.insert_if_new("c".to_string()));
2613
2614        // "a" should be evicted due to cap=2, so re-insert becomes new again.
2615        assert!(seen.insert_if_new("a".to_string()));
2616        assert!(!seen.insert_if_new("a".to_string()));
2617    }
2618
2619    #[test]
2620    fn test_request_dispatch_normalization_caps_to_available_peers() {
2621        let normalized = normalize_dispatch_config(
2622            RequestDispatchConfig {
2623                initial_fanout: 8,
2624                hedge_fanout: 6,
2625                max_fanout: 5,
2626                hedge_interval_ms: 120,
2627            },
2628            3,
2629        );
2630        assert_eq!(normalized.max_fanout, 3);
2631        assert_eq!(normalized.initial_fanout, 3);
2632        assert_eq!(normalized.hedge_fanout, 3);
2633    }
2634
2635    #[test]
2636    fn test_hedged_wave_plan_matches_dispatch_policy() {
2637        let plan = build_hedged_wave_plan(
2638            7,
2639            RequestDispatchConfig {
2640                initial_fanout: 2,
2641                hedge_fanout: 3,
2642                max_fanout: 6,
2643                hedge_interval_ms: 120,
2644            },
2645        );
2646        assert_eq!(plan, vec![2, 3, 1]);
2647    }
2648}