Skip to main content

hashtree_cli/webrtc/
signaling.rs

1//! WebRTC signaling over Nostr relays
2//!
3//! Protocol (compatible with hashtree-ts):
4//! - All signaling uses ephemeral kind 25050
5//! - Hello messages: #l: "hello" tag, broadcast for peer discovery (unencrypted)
6//! - Directed signaling (offer, answer, candidate, candidates): NIP-17 style
7//!   gift wrap for privacy - wrapped with ephemeral key, #p tag with recipient
8//!
9//! Security: Directed messages use gift wrapping with ephemeral keys so that
10//! relays cannot see the actual sender or correlate messages.
11
12use anyhow::Result;
13use futures::{SinkExt, StreamExt};
14use hashtree_webrtc::{
15    build_hedged_wave_plan, normalize_dispatch_config, sync_selector_peers, PeerSelector,
16};
17use nostr::{
18    nips::nip44, ClientMessage, EventBuilder, Filter, JsonUtil, Keys, Kind, PublicKey,
19    RelayMessage, Tag,
20};
21use std::collections::{BTreeSet, HashMap};
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24use tokio::sync::{mpsc, Mutex, RwLock};
25use tokio_tungstenite::{connect_async, tungstenite::Message};
26use tracing::{debug, error, info, warn};
27
28use super::bluetooth::{BluetoothMesh, BluetoothPeerRegistrar, BluetoothRuntimeContext};
29use super::cashu::{CashuMintMetadataStore, CashuQuoteState, CashuRoutingConfig, NegotiatedQuote};
30use super::local_bus::SharedLocalNostrBus;
31use super::multicast::MulticastNostrBus;
32use super::peer::{ContentStore, Peer, PendingRequest};
33use super::root_events::{
34    build_root_filter, hashtree_event_identifier, is_hashtree_labeled_event, pick_latest_event,
35    root_event_from_peer, PeerRootEvent,
36};
37use super::session::MeshPeer;
38use super::types::{
39    decrement_htl_with_policy, encode_quote_request, encode_request, should_forward_htl,
40    validate_mesh_frame, DataQuoteRequest, DataRequest, MeshNostrFrame, MeshNostrPayload,
41    PeerDirection, PeerId, PeerPool, PeerStateEvent, PeerStatus, RequestDispatchConfig,
42    SignalingMessage, TimedSeenSet, WebRTCConfig, HELLO_TAG, MESH_DEFAULT_HTL, MESH_EVENT_POLICY,
43    WEBRTC_KIND,
44};
45use super::wifi_aware::{mobile_wifi_aware_bridge, WifiAwareNostrBus, WIFI_AWARE_SOURCE};
46use crate::cashu_helper::CashuPaymentClient;
47use crate::nostr_relay::NostrRelay;
48
49/// Callback type for classifying peers into pools
50pub type PeerClassifier = Arc<dyn Fn(&str) -> PeerPool + Send + Sync>;
51
52/// Active data transport used for a peer session.
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
54pub enum PeerTransport {
55    WebRtc,
56    Bluetooth,
57}
58
59impl PeerTransport {
60    pub const fn as_str(self) -> &'static str {
61        match self {
62            PeerTransport::WebRtc => "webrtc",
63            PeerTransport::Bluetooth => "bluetooth",
64        }
65    }
66}
67
68impl std::fmt::Display for PeerTransport {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        f.write_str((*self).as_str())
71    }
72}
73
74fn bluetooth_nostr_only_mode() -> bool {
75    matches!(
76        std::env::var("HTREE_BLUETOOTH_NOSTR_ONLY").ok().as_deref(),
77        Some("1" | "true" | "TRUE" | "yes" | "YES")
78    )
79}
80
81/// Signaling/discovery path through which a peer was seen.
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
83pub enum PeerSignalPath {
84    Relay,
85    Multicast,
86    WifiAware,
87    Bluetooth,
88}
89
90impl PeerSignalPath {
91    pub const fn as_str(self) -> &'static str {
92        match self {
93            PeerSignalPath::Relay => "relay",
94            PeerSignalPath::Multicast => "multicast",
95            PeerSignalPath::WifiAware => WIFI_AWARE_SOURCE,
96            PeerSignalPath::Bluetooth => "bluetooth",
97        }
98    }
99
100    pub fn from_source_name(source: &str) -> Self {
101        match source {
102            "multicast" => PeerSignalPath::Multicast,
103            WIFI_AWARE_SOURCE => PeerSignalPath::WifiAware,
104            "bluetooth" => PeerSignalPath::Bluetooth,
105            _ => PeerSignalPath::Relay,
106        }
107    }
108}
109
110impl std::fmt::Display for PeerSignalPath {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        f.write_str((*self).as_str())
113    }
114}
115
116/// Connection state for a peer
117#[derive(Debug, Clone, PartialEq)]
118pub enum ConnectionState {
119    Discovered,
120    Connecting,
121    Connected,
122    Failed,
123}
124
125impl std::fmt::Display for ConnectionState {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        match self {
128            ConnectionState::Discovered => write!(f, "discovered"),
129            ConnectionState::Connecting => write!(f, "connecting"),
130            ConnectionState::Connected => write!(f, "connected"),
131            ConnectionState::Failed => write!(f, "failed"),
132        }
133    }
134}
135
136/// Peer entry in the manager
137pub struct PeerEntry {
138    pub peer_id: PeerId,
139    pub direction: PeerDirection,
140    pub state: ConnectionState,
141    pub last_seen: Instant,
142    pub peer: Option<MeshPeer>,
143    pub pool: PeerPool,
144    pub transport: PeerTransport,
145    pub signal_paths: BTreeSet<PeerSignalPath>,
146    pub bytes_sent: u64,
147    pub bytes_received: u64,
148}
149
150/// Shared state for the native mesh router.
151pub struct WebRTCState {
152    pub peers: RwLock<HashMap<String, PeerEntry>>,
153    pub connected_count: std::sync::atomic::AtomicUsize,
154    /// Total bytes sent across all peers (cumulative)
155    pub bytes_sent: std::sync::atomic::AtomicU64,
156    /// Total bytes received across all peers (cumulative)
157    pub bytes_received: std::sync::atomic::AtomicU64,
158    /// Relayless mesh frames received and accepted.
159    pub mesh_received: std::sync::atomic::AtomicU64,
160    /// Relayless mesh frames forwarded to peers.
161    pub mesh_forwarded: std::sync::atomic::AtomicU64,
162    /// Relayless mesh frames/events dropped due to dedupe.
163    pub mesh_dropped_duplicate: std::sync::atomic::AtomicU64,
164    /// Shared peer selector used by live retrieval; aligned with simulation strategies.
165    peer_selector: Arc<RwLock<PeerSelector>>,
166    /// Hedged dispatch policy for retrieval requests.
167    request_dispatch: RequestDispatchConfig,
168    /// Retrieval timeout for quote negotiation and single-peer fetches.
169    request_timeout: Duration,
170    /// Shared Cashu quote negotiation policy/state.
171    cashu_quotes: Arc<CashuQuoteState>,
172    /// Optional local buses such as multicast or BLE that carry signed Nostr
173    /// envelopes for nearby/offline peers.
174    local_buses: RwLock<Vec<SharedLocalNostrBus>>,
175}
176const SEEN_FRAME_CAP: usize = 4096;
177const SEEN_FRAME_TTL: Duration = Duration::from_secs(120);
178const SEEN_EVENT_CAP: usize = 8192;
179const SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
180
181type PendingRequestsMap = Arc<Mutex<HashMap<String, PendingRequest>>>;
182type ConnectedPeer = (
183    String,
184    PendingRequestsMap,
185    Arc<webrtc::data_channel::RTCDataChannel>,
186);
187type ConnectedSession = (String, MeshPeer, PeerTransport);
188
189impl WebRTCState {
190    pub fn new() -> Self {
191        let cfg = WebRTCConfig::default();
192        Self::new_with_routing_and_cashu(
193            cfg.request_selection_strategy,
194            cfg.request_fairness_enabled,
195            cfg.request_dispatch,
196            Duration::from_millis(cfg.message_timeout_ms),
197            CashuRoutingConfig::default(),
198            None,
199            None,
200        )
201    }
202
203    pub fn new_with_routing(
204        selection_strategy: super::types::SelectionStrategy,
205        fairness_enabled: bool,
206        request_dispatch: RequestDispatchConfig,
207    ) -> Self {
208        let cfg = WebRTCConfig::default();
209        Self::new_with_routing_and_cashu(
210            selection_strategy,
211            fairness_enabled,
212            request_dispatch,
213            Duration::from_millis(cfg.message_timeout_ms),
214            CashuRoutingConfig::default(),
215            None,
216            None,
217        )
218    }
219
220    pub fn new_with_routing_and_cashu(
221        selection_strategy: super::types::SelectionStrategy,
222        fairness_enabled: bool,
223        request_dispatch: RequestDispatchConfig,
224        request_timeout: Duration,
225        cashu_routing: CashuRoutingConfig,
226        payment_client: Option<Arc<dyn CashuPaymentClient>>,
227        mint_metadata: Option<Arc<CashuMintMetadataStore>>,
228    ) -> Self {
229        let mut selector = PeerSelector::with_strategy(selection_strategy);
230        selector.set_fairness(fairness_enabled);
231        let peer_selector = Arc::new(RwLock::new(selector));
232        let cashu_quotes = Arc::new(if let Some(mint_metadata) = mint_metadata {
233            CashuQuoteState::new_with_mint_metadata(
234                cashu_routing,
235                peer_selector.clone(),
236                payment_client,
237                mint_metadata,
238            )
239        } else {
240            CashuQuoteState::new(cashu_routing, peer_selector.clone(), payment_client)
241        });
242        Self {
243            peers: RwLock::new(HashMap::new()),
244            connected_count: std::sync::atomic::AtomicUsize::new(0),
245            bytes_sent: std::sync::atomic::AtomicU64::new(0),
246            bytes_received: std::sync::atomic::AtomicU64::new(0),
247            mesh_received: std::sync::atomic::AtomicU64::new(0),
248            mesh_forwarded: std::sync::atomic::AtomicU64::new(0),
249            mesh_dropped_duplicate: std::sync::atomic::AtomicU64::new(0),
250            peer_selector,
251            request_dispatch,
252            request_timeout,
253            cashu_quotes,
254            local_buses: RwLock::new(Vec::new()),
255        }
256    }
257
258    pub async fn set_local_buses(&self, buses: Vec<SharedLocalNostrBus>) {
259        *self.local_buses.write().await = buses;
260    }
261
262    pub async fn add_local_bus(&self, bus: SharedLocalNostrBus) {
263        self.local_buses.write().await.push(bus);
264    }
265
266    pub async fn set_multicast_bus(&self, bus: Option<Arc<MulticastNostrBus>>) {
267        let buses = bus
268            .into_iter()
269            .map(|bus| bus as SharedLocalNostrBus)
270            .collect();
271        self.set_local_buses(buses).await;
272    }
273
274    /// Drop all live peer sessions and clear topology-specific state while
275    /// keeping cumulative bandwidth counters intact.
276    pub async fn reset_runtime_state(&self) {
277        self.set_local_buses(Vec::new()).await;
278        let peers = {
279            let mut peers = self.peers.write().await;
280            std::mem::take(&mut *peers)
281        };
282        self.connected_count
283            .store(0, std::sync::atomic::Ordering::Relaxed);
284        for entry in peers.into_values() {
285            if let Some(peer) = entry.peer {
286                let _ = peer.close().await;
287            }
288        }
289    }
290
291    /// Get current bandwidth stats (bytes sent/received)
292    pub fn get_bandwidth(&self) -> (u64, u64) {
293        (
294            self.bytes_sent.load(std::sync::atomic::Ordering::Relaxed),
295            self.bytes_received
296                .load(std::sync::atomic::Ordering::Relaxed),
297        )
298    }
299
300    pub fn get_mesh_stats(&self) -> (u64, u64, u64) {
301        (
302            self.mesh_received
303                .load(std::sync::atomic::Ordering::Relaxed),
304            self.mesh_forwarded
305                .load(std::sync::atomic::Ordering::Relaxed),
306            self.mesh_dropped_duplicate
307                .load(std::sync::atomic::Ordering::Relaxed),
308        )
309    }
310
311    pub fn record_mesh_received(&self) {
312        self.mesh_received
313            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
314    }
315
316    pub fn record_mesh_forwarded(&self, count: u64) {
317        self.mesh_forwarded
318            .fetch_add(count, std::sync::atomic::Ordering::Relaxed);
319    }
320
321    pub fn record_mesh_duplicate_drop(&self) {
322        self.mesh_dropped_duplicate
323            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
324    }
325
326    /// Record bytes sent (global + per-peer)
327    pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
328        self.bytes_sent
329            .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
330        if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
331            entry.bytes_sent += bytes;
332        }
333    }
334
335    /// Record bytes received (global + per-peer)
336    pub async fn record_received(&self, peer_id: &str, bytes: u64) {
337        self.bytes_received
338            .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
339        if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
340            entry.bytes_received += bytes;
341        }
342    }
343
344    /// Request content by hash from connected peers
345    /// Queries peers in adaptive selector order with hedged fanout waves.
346    /// Returns the first successful response, or None if no peer has it
347    pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
348        self.request_from_peers_with_source(hash_hex)
349            .await
350            .map(|(data, _peer_id)| data)
351    }
352
353    /// Request content by hash from connected peers, returning data and source peer.
354    pub async fn request_from_peers_with_source(
355        &self,
356        hash_hex: &str,
357    ) -> Option<(Vec<u8>, String)> {
358        use super::types::BLOB_REQUEST_POLICY;
359
360        let peers = self.peers.read().await;
361
362        let peer_refs: Vec<_> = peers
363            .values()
364            .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
365            .filter_map(|p| {
366                p.peer
367                    .clone()
368                    .map(|peer| (p.peer_id.to_string(), peer, p.transport))
369            })
370            .collect();
371
372        drop(peers); // Release the read lock
373
374        let mut connected_peers: Vec<ConnectedPeer> = Vec::new();
375        let mut connected_sessions: Vec<ConnectedSession> = Vec::new();
376        for (peer_id, peer, transport) in peer_refs {
377            if !peer.is_ready() {
378                continue;
379            }
380            if bluetooth_nostr_only_mode() && transport == PeerTransport::Bluetooth {
381                continue;
382            }
383            if let Some(webrtc_peer) = peer.as_webrtc() {
384                let dc_guard = webrtc_peer.data_channel.lock().await;
385                if let Some(dc) = dc_guard.as_ref() {
386                    connected_peers.push((
387                        peer_id.clone(),
388                        webrtc_peer.pending_requests.clone(),
389                        dc.clone(),
390                    ));
391                }
392            }
393            connected_sessions.push((peer_id, peer, transport));
394        }
395
396        if connected_sessions.is_empty() {
397            debug!(
398                "No connected peers to query for {}",
399                &hash_hex[..8.min(hash_hex.len())]
400            );
401            return None;
402        }
403
404        // Convert hex to binary hash once
405        let hash_bytes = match hex::decode(hash_hex) {
406            Ok(b) => b,
407            Err(_) => return None,
408        };
409
410        let expected_hash: [u8; 32] = match hash_bytes.as_slice().try_into() {
411            Ok(h) => h,
412            Err(_) => {
413                debug!(
414                    "Invalid hash length {}, expected 32 bytes",
415                    hash_bytes.len()
416                );
417                return None;
418            }
419        };
420
421        let connected_peer_ids: Vec<String> = connected_sessions
422            .iter()
423            .map(|(peer_id, _, _)| peer_id.clone())
424            .collect();
425        sync_selector_peers(self.peer_selector.as_ref(), &connected_peer_ids).await;
426
427        let ordered_peer_ids = self.peer_selector.write().await.select_peers();
428        let mut quote_by_peer: HashMap<
429            String,
430            (
431                PendingRequestsMap,
432                Arc<webrtc::data_channel::RTCDataChannel>,
433            ),
434        > = connected_peers
435            .iter()
436            .cloned()
437            .map(|(peer_id, pending, dc)| (peer_id, (pending, dc)))
438            .collect();
439        let mut ordered_quote_peers: Vec<ConnectedPeer> = Vec::new();
440        for peer_id in &ordered_peer_ids {
441            if let Some((pending, dc)) = quote_by_peer.remove(peer_id) {
442                ordered_quote_peers.push((peer_id.clone(), pending, dc));
443            }
444        }
445        for (peer_id, (pending, dc)) in quote_by_peer {
446            ordered_quote_peers.push((peer_id, pending, dc));
447        }
448
449        let mut by_peer: HashMap<String, (MeshPeer, PeerTransport)> = connected_sessions
450            .into_iter()
451            .map(|(peer_id, peer, transport)| (peer_id, (peer, transport)))
452            .collect();
453
454        let mut ordered_peers: Vec<ConnectedSession> = Vec::new();
455        for peer_id in ordered_peer_ids {
456            if let Some((peer, transport)) = by_peer.remove(&peer_id) {
457                ordered_peers.push((peer_id, peer, transport));
458            }
459        }
460        for (peer_id, (peer, transport)) in by_peer {
461            ordered_peers.push((peer_id, peer, transport));
462        }
463
464        let dispatch = normalize_dispatch_config(self.request_dispatch, ordered_peers.len());
465        let wave_plan = build_hedged_wave_plan(ordered_peers.len(), dispatch);
466        if wave_plan.is_empty() {
467            return None;
468        }
469
470        debug!(
471            "Querying {} peers for {} (strategy order + hedged waves {:?})",
472            ordered_peers.len(),
473            &hash_hex[..8.min(hash_hex.len())],
474            wave_plan
475        );
476
477        if let Some((requested_mint, payment_sat, quote_ttl_ms)) =
478            self.cashu_quotes.requester_quote_terms().await
479        {
480            if let Some(quote) = self
481                .request_quote_from_peers(
482                    &hash_bytes,
483                    requested_mint,
484                    payment_sat,
485                    quote_ttl_ms,
486                    &ordered_quote_peers,
487                )
488                .await
489            {
490                if let Some(data) = self
491                    .request_from_single_peer(
492                        hash_hex,
493                        &hash_bytes,
494                        expected_hash,
495                        &quote.peer_id,
496                        Some(&quote),
497                        &ordered_quote_peers,
498                    )
499                    .await
500                {
501                    debug!(
502                        "Got quoted response from peer {} for {}",
503                        quote.peer_id,
504                        &hash_hex[..8.min(hash_hex.len())]
505                    );
506                    return Some((data, quote.peer_id));
507                }
508            }
509        }
510
511        let request = DataRequest {
512            h: hash_bytes.clone(),
513            htl: BLOB_REQUEST_POLICY.max_htl,
514            q: None,
515        };
516        let wire = match encode_request(&request) {
517            Ok(w) => w,
518            Err(_) => return None,
519        };
520        let wire_len = wire.len() as u64;
521        let hedge_wait_window = Duration::from_millis(dispatch.hedge_interval_ms.max(1));
522        let mut next_peer_idx = 0usize;
523        for wave_size in wave_plan {
524            let from = next_peer_idx;
525            let to = (next_peer_idx + wave_size).min(ordered_peers.len());
526            next_peer_idx = to;
527
528            if from == to {
529                continue;
530            }
531
532            let (result_tx, mut result_rx) =
533                mpsc::channel::<(String, Instant, Result<Option<Vec<u8>>>)>(to - from);
534
535            for (peer_id, peer, transport) in &ordered_peers[from..to] {
536                if *transport != PeerTransport::Bluetooth {
537                    self.record_sent(peer_id, wire_len).await;
538                }
539                self.peer_selector
540                    .write()
541                    .await
542                    .record_request(peer_id, wire_len);
543
544                let peer_id = peer_id.clone();
545                let peer = peer.clone();
546                let hash_hex = hash_hex.to_string();
547                let result_tx = result_tx.clone();
548                let per_request_timeout = self.request_timeout;
549                tokio::spawn(async move {
550                    let started = Instant::now();
551                    let result = peer.request(&hash_hex, per_request_timeout).await;
552                    let _ = result_tx.send((peer_id, started, result)).await;
553                });
554            }
555            drop(result_tx);
556
557            let is_last_wave = next_peer_idx >= ordered_peers.len();
558            let deadline = Instant::now()
559                + if is_last_wave {
560                    self.request_timeout
561                } else {
562                    hedge_wait_window
563                };
564            loop {
565                let now = Instant::now();
566                if now >= deadline {
567                    break;
568                }
569                let remaining = deadline.saturating_duration_since(now);
570                match tokio::time::timeout(remaining, result_rx.recv()).await {
571                    Ok(Some((peer_id, started, Ok(Some(data))))) => {
572                        let rtt_ms = started.elapsed().as_millis() as u64;
573                        if hashtree_core::sha256(&data) == expected_hash {
574                            let should_record = {
575                                let peers = self.peers.read().await;
576                                peers
577                                    .get(&peer_id)
578                                    .map(|entry| entry.transport != PeerTransport::Bluetooth)
579                                    .unwrap_or(true)
580                            };
581                            if should_record {
582                                self.record_received(&peer_id, data.len() as u64).await;
583                            }
584                            self.peer_selector.write().await.record_success(
585                                &peer_id,
586                                rtt_ms,
587                                data.len() as u64,
588                            );
589                            debug!(
590                                "Got response from peer {} for {}",
591                                peer_id,
592                                &hash_hex[..8.min(hash_hex.len())]
593                            );
594                            return Some((data, peer_id));
595                        }
596                        self.peer_selector.write().await.record_failure(&peer_id);
597                    }
598                    Ok(Some((peer_id, _, Ok(None)))) | Ok(Some((peer_id, _, Err(_)))) => {
599                        self.peer_selector.write().await.record_timeout(&peer_id);
600                    }
601                    Ok(None) | Err(_) => break,
602                }
603            }
604        }
605
606        debug!(
607            "No peer had data for {}",
608            &hash_hex[..8.min(hash_hex.len())]
609        );
610        None
611    }
612
613    async fn request_quote_from_peers(
614        &self,
615        hash_bytes: &[u8],
616        requested_mint: String,
617        payment_sat: u64,
618        quote_ttl_ms: u32,
619        ordered_peers: &[ConnectedPeer],
620    ) -> Option<NegotiatedQuote> {
621        if ordered_peers.is_empty() || quote_ttl_ms == 0 {
622            return None;
623        }
624
625        let dispatch = normalize_dispatch_config(self.request_dispatch, ordered_peers.len());
626        let wave_plan = build_hedged_wave_plan(ordered_peers.len(), dispatch);
627        if wave_plan.is_empty() {
628            return None;
629        }
630
631        let hash_hex = hex::encode(hash_bytes);
632        let mut rx = self
633            .cashu_quotes
634            .register_pending_quote(hash_hex.clone(), Some(requested_mint.clone()), payment_sat)
635            .await;
636        let quote_request = DataQuoteRequest {
637            h: hash_bytes.to_vec(),
638            p: payment_sat,
639            t: quote_ttl_ms,
640            m: Some(requested_mint),
641        };
642        let wire = match encode_quote_request(&quote_request) {
643            Ok(wire) => wire,
644            Err(_) => {
645                self.cashu_quotes.clear_pending_quote(&hash_hex).await;
646                return None;
647            }
648        };
649        let deadline = Instant::now() + self.request_timeout;
650        let mut sent_total = 0usize;
651        let mut next_peer_idx = 0usize;
652
653        for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
654            let from = next_peer_idx;
655            let to = (next_peer_idx + wave_size).min(ordered_peers.len());
656            for (_, _, dc) in &ordered_peers[from..to] {
657                if dc.send(&bytes::Bytes::copy_from_slice(&wire)).await.is_ok() {
658                    sent_total += 1;
659                }
660            }
661            next_peer_idx = to;
662
663            if sent_total == 0 {
664                if next_peer_idx >= ordered_peers.len() {
665                    break;
666                }
667                continue;
668            }
669
670            let now = Instant::now();
671            if now >= deadline {
672                break;
673            }
674            let remaining = deadline.saturating_duration_since(now);
675            let is_last_wave =
676                wave_idx + 1 == wave_plan.len() || next_peer_idx >= ordered_peers.len();
677            let wait = if is_last_wave {
678                remaining
679            } else if dispatch.hedge_interval_ms == 0 {
680                Duration::ZERO
681            } else {
682                Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
683            };
684
685            if wait.is_zero() {
686                continue;
687            }
688
689            match tokio::time::timeout(wait, &mut rx).await {
690                Ok(Ok(Some(quote))) => {
691                    self.cashu_quotes.clear_pending_quote(&hash_hex).await;
692                    return Some(quote);
693                }
694                Ok(Ok(None)) | Ok(Err(_)) => break,
695                Err(_) => {}
696            }
697        }
698
699        self.cashu_quotes.clear_pending_quote(&hash_hex).await;
700        None
701    }
702
703    async fn request_from_single_peer(
704        &self,
705        hash_hex: &str,
706        hash_bytes: &[u8],
707        expected_hash: [u8; 32],
708        target_peer_id: &str,
709        quote: Option<&NegotiatedQuote>,
710        ordered_peers: &[ConnectedPeer],
711    ) -> Option<Vec<u8>> {
712        use super::types::BLOB_REQUEST_POLICY;
713
714        let (pending_requests, dc) = ordered_peers
715            .iter()
716            .find(|(peer_id, _, _)| peer_id == target_peer_id)
717            .map(|(_, pending_requests, dc)| (pending_requests.clone(), dc.clone()))?;
718
719        let request = DataRequest {
720            h: hash_bytes.to_vec(),
721            htl: BLOB_REQUEST_POLICY.max_htl,
722            q: quote.map(|quote| quote.quote_id),
723        };
724        let wire = encode_request(&request).ok()?;
725        let wire_len = wire.len() as u64;
726        let sent_at = Instant::now();
727        let (tx, mut rx) = tokio::sync::oneshot::channel();
728
729        {
730            let mut pending = pending_requests.lock().await;
731            pending.insert(
732                hash_hex.to_string(),
733                if let Some(quote) = quote {
734                    PendingRequest::quoted(
735                        hash_bytes.to_vec(),
736                        tx,
737                        quote.quote_id,
738                        quote.mint_url.clone().unwrap_or_default(),
739                        quote.payment_sat,
740                    )
741                } else {
742                    PendingRequest::standard(hash_bytes.to_vec(), tx)
743                },
744            );
745        }
746
747        if dc
748            .send(&bytes::Bytes::copy_from_slice(&wire))
749            .await
750            .is_err()
751        {
752            let mut pending = pending_requests.lock().await;
753            pending.remove(hash_hex);
754            self.peer_selector
755                .write()
756                .await
757                .record_failure(target_peer_id);
758            return None;
759        }
760
761        self.record_sent(target_peer_id, wire_len).await;
762        self.peer_selector
763            .write()
764            .await
765            .record_request(target_peer_id, wire_len);
766
767        let wait_timeout = if let Some(quote) = quote {
768            let multiplier = quote.payment_sat.clamp(1, 32) as u128;
769            let extra_ms = self
770                .cashu_quotes
771                .settlement_timeout()
772                .as_millis()
773                .saturating_mul(multiplier);
774            self.request_timeout + Duration::from_millis(extra_ms.min(u64::MAX as u128) as u64)
775        } else {
776            self.request_timeout
777        };
778
779        match tokio::time::timeout(wait_timeout, &mut rx).await {
780            Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == expected_hash => {
781                let rtt_ms = sent_at.elapsed().as_millis() as u64;
782                self.record_received(target_peer_id, data.len() as u64)
783                    .await;
784                self.peer_selector.write().await.record_success(
785                    target_peer_id,
786                    rtt_ms,
787                    data.len() as u64,
788                );
789                Some(data)
790            }
791            Ok(Ok(Some(_))) => {
792                self.peer_selector
793                    .write()
794                    .await
795                    .record_failure(target_peer_id);
796                let pending = pending_requests.lock().await.remove(hash_hex);
797                if let Some(pending) = pending {
798                    if let Some(quoted) = pending.quoted {
799                        if let Some(in_flight) = quoted.in_flight_payment {
800                            let _ = self
801                                .cashu_quotes
802                                .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
803                                .await;
804                        }
805                    }
806                }
807                None
808            }
809            Ok(Ok(None)) | Ok(Err(_)) | Err(_) => {
810                let pending = pending_requests.lock().await.remove(hash_hex);
811                if let Some(pending) = pending {
812                    if let Some(quoted) = pending.quoted {
813                        if let Some(in_flight) = quoted.in_flight_payment {
814                            let _ = self
815                                .cashu_quotes
816                                .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
817                                .await;
818                        }
819                    }
820                }
821                self.peer_selector
822                    .write()
823                    .await
824                    .record_timeout(target_peer_id);
825                None
826            }
827        }
828    }
829
830    /// Resolve a hashtree root event through connected peers using Nostr REQ/EOSE over WebRTC.
831    pub async fn resolve_root_from_peers(
832        &self,
833        owner_pubkey: &str,
834        tree_name: &str,
835        per_peer_timeout: Duration,
836    ) -> Option<PeerRootEvent> {
837        let filter = build_root_filter(owner_pubkey, tree_name)?;
838
839        let peer_refs: Vec<_> = {
840            let peers = self.peers.read().await;
841            peers
842                .values()
843                .filter(|entry| entry.state == ConnectionState::Connected)
844                .filter(|entry| {
845                    !bluetooth_nostr_only_mode() || entry.transport != PeerTransport::Bluetooth
846                })
847                .filter_map(|entry| {
848                    let peer = entry.peer.as_ref()?;
849                    if !peer.is_ready() {
850                        return None;
851                    }
852                    Some((entry.peer_id.short(), peer.clone()))
853                })
854                .collect()
855        };
856
857        for (peer_short, peer) in peer_refs {
858            debug!(
859                "Querying peer {} for root event {}/{}",
860                peer_short, owner_pubkey, tree_name
861            );
862            let events = match peer
863                .query_nostr_events(vec![filter.clone()], per_peer_timeout)
864                .await
865            {
866                Ok(events) => events,
867                Err(e) => {
868                    debug!(
869                        "Peer {} Nostr query failed for {}/{}: {}",
870                        peer_short, owner_pubkey, tree_name, e
871                    );
872                    continue;
873                }
874            };
875            debug!(
876                "Peer {} returned {} Nostr event(s) for {}/{}",
877                peer_short,
878                events.len(),
879                owner_pubkey,
880                tree_name
881            );
882
883            let latest = pick_latest_event(events.iter().filter(|event| {
884                hashtree_event_identifier(event).as_deref() == Some(tree_name)
885                    && is_hashtree_labeled_event(event)
886            }));
887            if let Some(event) = latest {
888                if let Some(root) = root_event_from_peer(event, &peer_short, tree_name) {
889                    debug!(
890                        "Resolved {}/{} via peer {} event {}",
891                        owner_pubkey,
892                        tree_name,
893                        peer_short,
894                        event.id.to_hex()
895                    );
896                    return Some(root);
897                }
898            }
899        }
900
901        None
902    }
903
904    pub async fn resolve_root_from_local_buses_with_source(
905        &self,
906        owner_pubkey: &str,
907        tree_name: &str,
908        timeout: Duration,
909    ) -> Option<(&'static str, PeerRootEvent)> {
910        let buses = self.local_buses.read().await.clone();
911        for bus in buses {
912            if let Some(root) = bus.query_root(owner_pubkey, tree_name, timeout).await {
913                return Some((bus.source_name(), root));
914            }
915        }
916        None
917    }
918
919    pub async fn resolve_root_from_local_buses(
920        &self,
921        owner_pubkey: &str,
922        tree_name: &str,
923        timeout: Duration,
924    ) -> Option<PeerRootEvent> {
925        self.resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
926            .await
927            .map(|(_, root)| root)
928    }
929
930    pub async fn resolve_root_from_multicast(
931        &self,
932        owner_pubkey: &str,
933        tree_name: &str,
934        timeout: Duration,
935    ) -> Option<PeerRootEvent> {
936        self.resolve_root_from_local_buses(owner_pubkey, tree_name, timeout)
937            .await
938    }
939}
940
941impl Default for WebRTCState {
942    fn default() -> Self {
943        Self::new()
944    }
945}
946
947pub type PeerRouterState = WebRTCState;
948
949/// Native peer router handles peer discovery and transport fan-out.
950pub struct WebRTCManager {
951    config: WebRTCConfig,
952    my_peer_id: PeerId,
953    keys: Keys,
954    state: Arc<WebRTCState>,
955    shutdown: Arc<tokio::sync::watch::Sender<bool>>,
956    shutdown_rx: tokio::sync::watch::Receiver<bool>,
957    /// Channel to send signaling messages to relays
958    signaling_tx: mpsc::Sender<SignalingMessage>,
959    signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
960    /// Optional content store for serving hash requests
961    store: Option<Arc<dyn ContentStore>>,
962    /// Peer classifier for pool assignment
963    peer_classifier: PeerClassifier,
964    /// Optional Nostr relay for data-channel relay messages
965    nostr_relay: Option<Arc<NostrRelay>>,
966    local_buses: Vec<SharedLocalNostrBus>,
967    /// Channel for peer state events (connection success/failure)
968    state_event_tx: mpsc::Sender<PeerStateEvent>,
969    state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
970    /// Channel for relayless mesh signaling frames received from peers.
971    mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
972    mesh_frame_rx: Option<mpsc::Receiver<(PeerId, MeshNostrFrame)>>,
973    seen_frame_ids: Arc<Mutex<TimedSeenSet>>,
974    seen_event_ids: Arc<Mutex<TimedSeenSet>>,
975}
976
977impl WebRTCManager {
978    /// Create a new WebRTC manager
979    pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
980        let pubkey = keys.public_key().to_hex();
981        let my_peer_id = PeerId::new(pubkey, None);
982        let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
983        let (signaling_tx, signaling_rx) = mpsc::channel(100);
984        let (state_event_tx, state_event_rx) = mpsc::channel(100);
985        let (mesh_frame_tx, mesh_frame_rx) = mpsc::channel(256);
986        let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
987            config.request_selection_strategy,
988            config.request_fairness_enabled,
989            config.request_dispatch,
990            Duration::from_millis(config.message_timeout_ms),
991            CashuRoutingConfig::default(),
992            None,
993            None,
994        ));
995
996        // Default classifier: all peers go to 'other' pool
997        let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
998
999        Self {
1000            config,
1001            my_peer_id,
1002            keys,
1003            state,
1004            shutdown: Arc::new(shutdown),
1005            shutdown_rx,
1006            signaling_tx,
1007            signaling_rx: Some(signaling_rx),
1008            store: None,
1009            peer_classifier,
1010            nostr_relay: None,
1011            local_buses: Vec::new(),
1012            state_event_tx,
1013            state_event_rx: Some(state_event_rx),
1014            mesh_frame_tx,
1015            mesh_frame_rx: Some(mesh_frame_rx),
1016            seen_frame_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1017                SEEN_FRAME_CAP,
1018                SEEN_FRAME_TTL,
1019            ))),
1020            seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1021                SEEN_EVENT_CAP,
1022                SEEN_EVENT_TTL,
1023            ))),
1024        }
1025    }
1026
1027    /// Create a new WebRTC manager reusing an existing shared state object.
1028    pub fn new_with_state(keys: Keys, config: WebRTCConfig, state: Arc<WebRTCState>) -> Self {
1029        let mut manager = Self::new(keys, config);
1030        manager.state = state;
1031        manager
1032    }
1033
1034    /// Create a new WebRTC manager with a peer classifier
1035    pub fn new_with_classifier(
1036        keys: Keys,
1037        config: WebRTCConfig,
1038        classifier: PeerClassifier,
1039    ) -> Self {
1040        let mut manager = Self::new(keys, config);
1041        manager.peer_classifier = classifier;
1042        manager
1043    }
1044
1045    /// Create a new WebRTC manager with a content store for serving hash requests
1046    pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
1047        let mut manager = Self::new(keys, config);
1048        manager.store = Some(store);
1049        manager
1050    }
1051
1052    /// Create a new WebRTC manager with store and classifier
1053    pub fn new_with_store_and_classifier(
1054        keys: Keys,
1055        config: WebRTCConfig,
1056        store: Arc<dyn ContentStore>,
1057        classifier: PeerClassifier,
1058    ) -> Self {
1059        Self::new_with_store_and_classifier_and_cashu(
1060            keys,
1061            config,
1062            store,
1063            classifier,
1064            CashuRoutingConfig::default(),
1065            None,
1066            None,
1067        )
1068    }
1069
1070    pub fn new_with_state_and_store_and_classifier(
1071        keys: Keys,
1072        config: WebRTCConfig,
1073        state: Arc<WebRTCState>,
1074        store: Arc<dyn ContentStore>,
1075        classifier: PeerClassifier,
1076    ) -> Self {
1077        let mut manager = Self::new_with_state(keys, config, state);
1078        manager.store = Some(store);
1079        manager.peer_classifier = classifier;
1080        manager
1081    }
1082
1083    pub fn new_with_store_and_classifier_and_cashu(
1084        keys: Keys,
1085        config: WebRTCConfig,
1086        store: Arc<dyn ContentStore>,
1087        classifier: PeerClassifier,
1088        cashu_routing: CashuRoutingConfig,
1089        payment_client: Option<Arc<dyn CashuPaymentClient>>,
1090        mint_metadata: Option<Arc<CashuMintMetadataStore>>,
1091    ) -> Self {
1092        let mut manager = Self::new(keys, config);
1093        manager.state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1094            manager.config.request_selection_strategy,
1095            manager.config.request_fairness_enabled,
1096            manager.config.request_dispatch,
1097            Duration::from_millis(manager.config.message_timeout_ms),
1098            cashu_routing,
1099            payment_client,
1100            mint_metadata,
1101        ));
1102        manager.store = Some(store);
1103        manager.peer_classifier = classifier;
1104        manager
1105    }
1106
1107    /// Set the content store for serving hash requests
1108    pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
1109        self.store = Some(store);
1110    }
1111
1112    /// Set the peer classifier
1113    pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
1114        self.peer_classifier = classifier;
1115    }
1116
1117    /// Set the Nostr relay for data-channel relay messages
1118    pub fn set_nostr_relay(&mut self, relay: Arc<NostrRelay>) {
1119        self.nostr_relay = Some(relay);
1120    }
1121
1122    /// Get my peer ID
1123    pub fn my_peer_id(&self) -> &PeerId {
1124        &self.my_peer_id
1125    }
1126
1127    /// Get shared state for external access
1128    pub fn state(&self) -> Arc<WebRTCState> {
1129        self.state.clone()
1130    }
1131
1132    /// Cloneable shutdown handle for external lifecycle control.
1133    pub fn shutdown_signal(&self) -> Arc<tokio::sync::watch::Sender<bool>> {
1134        self.shutdown.clone()
1135    }
1136
1137    /// Signal shutdown
1138    pub fn shutdown(&self) {
1139        let _ = self.shutdown.send(true);
1140    }
1141
1142    /// Get connected peer count
1143    pub async fn connected_count(&self) -> usize {
1144        self.state
1145            .connected_count
1146            .load(std::sync::atomic::Ordering::Relaxed)
1147    }
1148
1149    /// Get all peer statuses
1150    pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
1151        self.state
1152            .peers
1153            .read()
1154            .await
1155            .values()
1156            .map(|p| PeerStatus {
1157                peer_id: p.peer_id.to_string(),
1158                pubkey: p.peer_id.pubkey.clone(),
1159                state: p.state.to_string(),
1160                direction: p.direction,
1161                connected_at: Some(p.last_seen),
1162                pool: p.pool,
1163            })
1164            .collect()
1165    }
1166
1167    /// Get pool counts
1168    /// Returns (follows_connected, follows_active, other_connected, other_active)
1169    /// "active" = Connected or Connecting (excludes Discovered and Failed)
1170    pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
1171        let peers = self.state.peers.read().await;
1172        let mut follows_connected = 0;
1173        let mut follows_active = 0;
1174        let mut other_connected = 0;
1175        let mut other_active = 0;
1176
1177        for entry in peers.values() {
1178            // Only count Connected or Connecting as "active" connections
1179            // Discovered peers are just seen hellos, not real connections
1180            let is_active = entry.state == ConnectionState::Connected
1181                || entry.state == ConnectionState::Connecting;
1182
1183            match entry.pool {
1184                PeerPool::Follows => {
1185                    if is_active {
1186                        follows_active += 1;
1187                    }
1188                    if entry.state == ConnectionState::Connected {
1189                        follows_connected += 1;
1190                    }
1191                }
1192                PeerPool::Other => {
1193                    if is_active {
1194                        other_active += 1;
1195                    }
1196                    if entry.state == ConnectionState::Connected {
1197                        other_connected += 1;
1198                    }
1199                }
1200            }
1201        }
1202
1203        (
1204            follows_connected,
1205            follows_active,
1206            other_connected,
1207            other_active,
1208        )
1209    }
1210
1211    /// Check if we can accept a peer in a given pool
1212    fn can_accept_peer(&self, pool: PeerPool, pool_counts: &(usize, usize, usize, usize)) -> bool {
1213        let (_, follows_active, _, other_active) = *pool_counts;
1214        match pool {
1215            PeerPool::Follows => follows_active < self.config.pools.follows.max_connections,
1216            PeerPool::Other => other_active < self.config.pools.other.max_connections,
1217        }
1218    }
1219
1220    /// Check if a pool is satisfied
1221    #[allow(dead_code)]
1222    fn is_pool_satisfied(
1223        &self,
1224        pool: PeerPool,
1225        pool_counts: &(usize, usize, usize, usize),
1226    ) -> bool {
1227        let (follows_connected, _, other_connected, _) = *pool_counts;
1228        match pool {
1229            PeerPool::Follows => {
1230                follows_connected >= self.config.pools.follows.satisfied_connections
1231            }
1232            PeerPool::Other => other_connected >= self.config.pools.other.satisfied_connections,
1233        }
1234    }
1235
1236    /// Check if both pools are satisfied
1237    #[allow(dead_code)]
1238    fn is_satisfied(&self, pool_counts: &(usize, usize, usize, usize)) -> bool {
1239        self.is_pool_satisfied(PeerPool::Follows, pool_counts)
1240            && self.is_pool_satisfied(PeerPool::Other, pool_counts)
1241    }
1242
1243    /// Check if we should initiate connection (tie-breaking)
1244    /// Lower UUID initiates - same as iris-client/hashtree-ts
1245    fn should_initiate(&self, their_uuid: &str) -> bool {
1246        self.my_peer_id.uuid.as_str() < their_uuid
1247    }
1248
1249    fn local_bus_max_peers(&self, source: &str) -> Option<usize> {
1250        match source {
1251            "multicast" => Some(self.config.multicast.max_peers),
1252            WIFI_AWARE_SOURCE => Some(self.config.wifi_aware.max_peers),
1253            _ => None,
1254        }
1255    }
1256
1257    fn can_track_local_bus_peer(
1258        &self,
1259        source: &str,
1260        peer_key: &str,
1261        peers: &HashMap<String, PeerEntry>,
1262    ) -> bool {
1263        let Some(max_peers) = self.local_bus_max_peers(source) else {
1264            return true;
1265        };
1266        if peers.contains_key(peer_key) {
1267            return true;
1268        }
1269        if max_peers == 0 {
1270            return false;
1271        }
1272        let signal_path = PeerSignalPath::from_source_name(source);
1273        peers
1274            .values()
1275            .filter(|entry| {
1276                entry.signal_paths.contains(&signal_path) && entry.state != ConnectionState::Failed
1277            })
1278            .count()
1279            < max_peers
1280    }
1281
1282    /// Start the native peer router - connects transports and handles signaling.
1283    pub async fn run(&mut self) -> Result<()> {
1284        info!(
1285            "Starting peer router with peer ID: {}",
1286            self.my_peer_id.short()
1287        );
1288
1289        let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr::Event)>(100);
1290
1291        // Take the signaling receiver
1292        let mut signaling_rx = self
1293            .signaling_rx
1294            .take()
1295            .expect("signaling_rx already taken");
1296
1297        // Take the state event receiver
1298        let mut state_event_rx = self
1299            .state_event_rx
1300            .take()
1301            .expect("state_event_rx already taken");
1302        let mut mesh_frame_rx = self
1303            .mesh_frame_rx
1304            .take()
1305            .expect("mesh_frame_rx already taken");
1306
1307        if self.config.bluetooth.is_enabled() {
1308            let bluetooth = BluetoothMesh::new(self.config.bluetooth.clone());
1309            let context = BluetoothRuntimeContext {
1310                my_peer_id: self.my_peer_id.clone(),
1311                store: if bluetooth_nostr_only_mode() {
1312                    None
1313                } else {
1314                    self.store.clone()
1315                },
1316                nostr_relay: self.nostr_relay.clone(),
1317                mesh_frame_tx: self.mesh_frame_tx.clone(),
1318                registrar: BluetoothPeerRegistrar::new(
1319                    self.state.clone(),
1320                    self.peer_classifier.clone(),
1321                    self.config.pools.clone(),
1322                    self.config.bluetooth.max_peers,
1323                ),
1324            };
1325            let _ = bluetooth.start(context).await;
1326        }
1327
1328        // Create a shared write channel for all relay tasks
1329        let (relay_write_tx, _) = tokio::sync::broadcast::channel::<SignalingMessage>(100);
1330
1331        // Spawn relay connections
1332        for relay_url in &self.config.relays {
1333            let url = relay_url.clone();
1334            let event_tx = event_tx.clone();
1335            let shutdown_rx = self.shutdown_rx.clone();
1336            let keys = self.keys.clone();
1337            let relay_write_rx = relay_write_tx.subscribe();
1338
1339            tokio::spawn(async move {
1340                if let Err(e) =
1341                    Self::relay_task(url.clone(), event_tx, shutdown_rx, keys, relay_write_rx).await
1342                {
1343                    error!("Relay {} error: {}", url, e);
1344                }
1345            });
1346        }
1347
1348        if self.config.multicast.is_enabled() {
1349            if let Some(relay) = self.nostr_relay.clone() {
1350                match MulticastNostrBus::bind(
1351                    self.config.multicast.clone(),
1352                    self.keys.clone(),
1353                    relay,
1354                )
1355                .await
1356                {
1357                    Ok(bus) => {
1358                        let local_bus: SharedLocalNostrBus = bus.clone();
1359                        self.state.add_local_bus(local_bus.clone()).await;
1360                        self.local_buses.push(local_bus);
1361                        let shutdown_rx = self.shutdown_rx.clone();
1362                        let signaling_tx = event_tx.clone();
1363                        tokio::spawn(async move {
1364                            if let Err(err) = bus.run(shutdown_rx, signaling_tx).await {
1365                                error!("Multicast bus error: {}", err);
1366                            }
1367                        });
1368                    }
1369                    Err(err) => {
1370                        warn!("Failed to start multicast bus: {}", err);
1371                    }
1372                }
1373            } else {
1374                warn!("Multicast enabled but Nostr relay is unavailable");
1375            }
1376        }
1377
1378        if self.config.wifi_aware.is_enabled() {
1379            if let Some(relay) = self.nostr_relay.clone() {
1380                if let Some(bridge) = mobile_wifi_aware_bridge() {
1381                    let bus = WifiAwareNostrBus::new(
1382                        self.config.wifi_aware.clone(),
1383                        self.keys.clone(),
1384                        relay,
1385                        bridge,
1386                    );
1387                    let local_bus: SharedLocalNostrBus = bus.clone();
1388                    self.state.add_local_bus(local_bus.clone()).await;
1389                    self.local_buses.push(local_bus);
1390                    let shutdown_rx = self.shutdown_rx.clone();
1391                    let signaling_tx = event_tx.clone();
1392                    let local_peer_id = self.my_peer_id.to_string();
1393                    tokio::spawn(async move {
1394                        if let Err(err) = bus.run(local_peer_id, shutdown_rx, signaling_tx).await {
1395                            error!("Wi-Fi Aware bus error: {}", err);
1396                        }
1397                    });
1398                } else {
1399                    warn!("Wi-Fi Aware enabled but no mobile bridge is installed");
1400                }
1401            } else {
1402                warn!("Wi-Fi Aware enabled but Nostr relay is unavailable");
1403            }
1404        }
1405
1406        // Process incoming events and outgoing signaling messages
1407        let mut shutdown_rx = self.shutdown_rx.clone();
1408        // Cleanup interval - run every 30 seconds as a fallback (not for real-time sync)
1409        let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
1410        let mut hello_ticker =
1411            tokio::time::interval(Duration::from_millis(self.config.hello_interval_ms));
1412        if self.config.signaling_enabled {
1413            self.dispatch_signaling_message(
1414                SignalingMessage::hello(&self.my_peer_id.uuid),
1415                &relay_write_tx,
1416            )
1417            .await;
1418        }
1419        loop {
1420            tokio::select! {
1421                _ = shutdown_rx.changed() => {
1422                    if *shutdown_rx.borrow() {
1423                        info!("WebRTC manager shutting down");
1424                        break;
1425                    }
1426                }
1427                Some((relay, event)) = event_rx.recv() => {
1428                    if let Err(e) = self.handle_event(&relay, &event, &relay_write_tx).await {
1429                        debug!("Error handling event from {}: {}", relay, e);
1430                    }
1431                }
1432                Some(msg) = signaling_rx.recv() => {
1433                    self.dispatch_signaling_message(msg, &relay_write_tx).await;
1434                }
1435                Some(event) = state_event_rx.recv() => {
1436                    // Handle peer state events (connected, failed, disconnected)
1437                    self.handle_peer_state_event(event, &relay_write_tx).await;
1438                }
1439                Some((from_peer_id, frame)) = mesh_frame_rx.recv() => {
1440                    self.handle_mesh_frame(from_peer_id, frame, &relay_write_tx).await;
1441                }
1442                _ = hello_ticker.tick(), if self.config.signaling_enabled => {
1443                    self.dispatch_signaling_message(
1444                        SignalingMessage::hello(&self.my_peer_id.uuid),
1445                        &relay_write_tx,
1446                    ).await;
1447                }
1448                _ = cleanup_interval.tick() => {
1449                    // Periodic cleanup of stale peers and state sync (fallback)
1450                    self.cleanup_stale_peers().await;
1451                }
1452            }
1453        }
1454
1455        Ok(())
1456    }
1457
1458    /// Connect to a single relay and handle messages
1459    async fn relay_task(
1460        url: String,
1461        event_tx: mpsc::Sender<(String, nostr::Event)>,
1462        mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
1463        keys: Keys,
1464        mut signaling_rx: tokio::sync::broadcast::Receiver<SignalingMessage>,
1465    ) -> Result<()> {
1466        info!("Connecting to relay: {}", url);
1467
1468        let (ws_stream, _) = connect_async(&url).await?;
1469        let (mut write, mut read) = ws_stream.split();
1470
1471        // Subscribe to webrtc events - two filters:
1472        // 1. Hello messages: kind 25050 with #l: "hello" tag
1473        // 2. Directed messages: kind 25050 with #p tag (our pubkey)
1474        let hello_filter = Filter::new()
1475            .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
1476            .custom_tag(
1477                nostr::SingleLetterTag::lowercase(nostr::Alphabet::L),
1478                vec![HELLO_TAG],
1479            )
1480            .since(nostr::Timestamp::now() - Duration::from_secs(60));
1481
1482        let directed_filter = Filter::new()
1483            .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
1484            .custom_tag(
1485                nostr::SingleLetterTag::lowercase(nostr::Alphabet::P),
1486                vec![keys.public_key().to_hex()],
1487            )
1488            .since(nostr::Timestamp::now() - Duration::from_secs(60));
1489
1490        let sub_id = nostr::SubscriptionId::generate();
1491        let sub_msg = ClientMessage::req(sub_id.clone(), vec![hello_filter, directed_filter]);
1492        write.send(Message::Text(sub_msg.as_json())).await?;
1493
1494        info!(
1495            "Subscribed to {} for WebRTC events (kind {})",
1496            url, WEBRTC_KIND
1497        );
1498
1499        loop {
1500            tokio::select! {
1501                _ = shutdown_rx.changed() => {
1502                    if *shutdown_rx.borrow() {
1503                        break;
1504                    }
1505                }
1506                // Handle outgoing signaling messages
1507                Ok(signaling_msg) = signaling_rx.recv() => {
1508                    info!("Sending {} via {}", signaling_msg.msg_type(), url);
1509                    if let Ok(event) = Self::create_signaling_event(&keys, &signaling_msg).await {
1510                        let event_id = event.id.to_string();
1511                        let msg = ClientMessage::event(event);
1512                        if write.send(Message::Text(msg.as_json())).await.is_ok() {
1513                            info!("Sent {} to {} (event id: {})", signaling_msg.msg_type(), url, &event_id[..16]);
1514                        }
1515                    }
1516                }
1517                msg = read.next() => {
1518                    match msg {
1519                        Some(Ok(Message::Text(text))) => {
1520                            if let Ok(RelayMessage::Event { event, .. }) =
1521                                RelayMessage::from_json(&text)
1522                            {
1523                                let _ = event_tx.send((url.clone(), *event)).await;
1524                            }
1525                        }
1526                        Some(Err(e)) => {
1527                            error!("WebSocket error from {}: {}", url, e);
1528                            break;
1529                        }
1530                        None => {
1531                            warn!("WebSocket closed: {}", url);
1532                            break;
1533                        }
1534                        _ => {}
1535                    }
1536                }
1537            }
1538        }
1539
1540        Ok(())
1541    }
1542
1543    async fn mark_seen_frame_id(&self, frame_id: String) -> bool {
1544        let mut seen = self.seen_frame_ids.lock().await;
1545        seen.insert_if_new(frame_id)
1546    }
1547
1548    async fn mark_seen_event_id(&self, event_id: String) -> bool {
1549        let mut seen = self.seen_event_ids.lock().await;
1550        seen.insert_if_new(event_id)
1551    }
1552
1553    async fn dispatch_signaling_message(
1554        &self,
1555        msg: SignalingMessage,
1556        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1557    ) {
1558        if !self.config.signaling_enabled {
1559            debug!(
1560                "Skipping signaling message {} because WebRTC signaling is disabled",
1561                msg.msg_type()
1562            );
1563            return;
1564        }
1565
1566        if relay_write_tx.send(msg.clone()).is_err() {
1567            debug!(
1568                "No relay subscribers for signaling message {}",
1569                msg.msg_type()
1570            );
1571        }
1572
1573        let event = match Self::create_signaling_event(&self.keys, &msg).await {
1574            Ok(event) => event,
1575            Err(e) => {
1576                debug!("Failed to create signaling event for mesh dispatch: {}", e);
1577                return;
1578            }
1579        };
1580
1581        for bus in &self.local_buses {
1582            if let Err(err) = bus.broadcast_event(&event).await {
1583                debug!(
1584                    "Failed to broadcast signaling event over {} ({}): {}",
1585                    bus.source_name(),
1586                    msg.msg_type(),
1587                    err
1588                );
1589            }
1590        }
1591
1592        let mut frame =
1593            MeshNostrFrame::new_event(event, &self.my_peer_id.to_string(), MESH_DEFAULT_HTL);
1594        if !self.mark_seen_frame_id(frame.frame_id.clone()).await {
1595            self.state.record_mesh_duplicate_drop();
1596            return;
1597        }
1598        if !self.mark_seen_event_id(frame.event().id.to_hex()).await {
1599            self.state.record_mesh_duplicate_drop();
1600            return;
1601        }
1602
1603        // Keep the sender peer id stable even if this is forwarded later.
1604        frame.sender_peer_id = self.my_peer_id.to_string();
1605        let forwarded = self.forward_mesh_frame(&frame, None).await;
1606        if forwarded > 0 {
1607            self.state.record_mesh_forwarded(forwarded as u64);
1608        }
1609    }
1610
1611    async fn forward_mesh_frame(
1612        &self,
1613        frame: &MeshNostrFrame,
1614        exclude_peer_id: Option<&str>,
1615    ) -> usize {
1616        let peers = self.state.peers.read().await;
1617        let peer_refs: Vec<_> = peers
1618            .values()
1619            .filter(|entry| entry.state == ConnectionState::Connected)
1620            .filter(|entry| {
1621                entry
1622                    .peer
1623                    .as_ref()
1624                    .map(|peer| peer.is_ready())
1625                    .unwrap_or(false)
1626            })
1627            .filter(|entry| {
1628                exclude_peer_id
1629                    .map(|exclude| exclude != entry.peer_id.to_string())
1630                    .unwrap_or(true)
1631            })
1632            .filter_map(|entry| {
1633                entry.peer.as_ref().map(|peer| {
1634                    (
1635                        entry.peer_id.to_string(),
1636                        entry.peer_id.short(),
1637                        peer.clone(),
1638                        peer.htl_config(),
1639                    )
1640                })
1641            })
1642            .collect();
1643        drop(peers);
1644
1645        let mut forwarded = 0usize;
1646        for (_peer_key, peer_short, peer, htl_cfg) in peer_refs {
1647            let next_htl = decrement_htl_with_policy(frame.htl, &MESH_EVENT_POLICY, &htl_cfg);
1648            if !should_forward_htl(next_htl) {
1649                continue;
1650            }
1651
1652            let mut outbound = frame.clone();
1653            outbound.htl = next_htl;
1654            if peer.send_mesh_frame_text(&outbound).await.is_ok() {
1655                forwarded += 1;
1656            } else {
1657                debug!("Failed to forward mesh frame to {}", peer_short);
1658            }
1659        }
1660
1661        forwarded
1662    }
1663
1664    async fn handle_mesh_frame(
1665        &self,
1666        from_peer_id: PeerId,
1667        frame: MeshNostrFrame,
1668        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1669    ) {
1670        if let Err(reason) = validate_mesh_frame(&frame) {
1671            debug!(
1672                "Ignoring mesh frame from {} (invalid: {})",
1673                from_peer_id.short(),
1674                reason
1675            );
1676            return;
1677        }
1678
1679        if !self.mark_seen_frame_id(frame.frame_id.clone()).await {
1680            self.state.record_mesh_duplicate_drop();
1681            return;
1682        }
1683
1684        let event = match &frame.payload {
1685            MeshNostrPayload::Event { event } => event.clone(),
1686        };
1687
1688        if !self.mark_seen_event_id(event.id.to_hex()).await {
1689            self.state.record_mesh_duplicate_drop();
1690            return;
1691        }
1692
1693        if event.verify().is_err() {
1694            debug!(
1695                "Ignoring mesh event from {} due to invalid signature",
1696                from_peer_id.short()
1697            );
1698            return;
1699        }
1700
1701        self.state.record_mesh_received();
1702
1703        if let Err(e) = self.handle_event("mesh", &event, relay_write_tx).await {
1704            debug!(
1705                "Error handling mesh event from {}: {}",
1706                from_peer_id.short(),
1707                e
1708            );
1709        }
1710
1711        let forwarded = self
1712            .forward_mesh_frame(&frame, Some(&from_peer_id.to_string()))
1713            .await;
1714        if forwarded > 0 {
1715            self.state.record_mesh_forwarded(forwarded as u64);
1716        }
1717    }
1718
1719    /// Create a signaling event
1720    ///
1721    /// For directed messages (offer, answer, candidate, candidates), use NIP-17 style
1722    /// gift wrapping with ephemeral keys for privacy.
1723    /// Hello messages use kind 25050 with #l: "hello" tag and peerId.
1724    async fn create_signaling_event(keys: &Keys, msg: &SignalingMessage) -> Result<nostr::Event> {
1725        // Check if message has a recipient (needs gift wrapping)
1726        if let Some(recipient_str) = msg.recipient() {
1727            // Parse recipient to get their pubkey
1728            if let Some(peer_id) = PeerId::from_string(recipient_str) {
1729                let recipient_pubkey = PublicKey::from_hex(&peer_id.pubkey)?;
1730
1731                // Create seal with sender's actual pubkey (the "rumor")
1732                let seal = serde_json::json!({
1733                    "pubkey": keys.public_key().to_hex(),
1734                    "kind": WEBRTC_KIND,
1735                    "content": serde_json::to_string(msg)?,
1736                    "tags": []
1737                });
1738
1739                // Generate ephemeral keypair for the wrapper
1740                let ephemeral_keys = Keys::generate();
1741
1742                // Encrypt the seal for the recipient using ephemeral key (NIP-44)
1743                let encrypted_content = nip44::encrypt(
1744                    ephemeral_keys.secret_key(),
1745                    &recipient_pubkey,
1746                    seal.to_string(),
1747                    nip44::Version::V2,
1748                )?;
1749
1750                // Create wrapper event with ephemeral key
1751                let created_at = nostr::Timestamp::now();
1752                let expiration = created_at + Duration::from_secs(5 * 60); // 5 minutes
1753
1754                let tags = vec![
1755                    Tag::parse(&["p", &recipient_pubkey.to_hex()])?,
1756                    Tag::parse(&["expiration", &expiration.as_u64().to_string()])?,
1757                ];
1758
1759                let event =
1760                    EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), encrypted_content, tags)
1761                        .to_event(&ephemeral_keys)?;
1762
1763                return Ok(event);
1764            }
1765        }
1766
1767        // Hello messages - kind 25050 with #l: "hello" tag and peerId
1768        let tags = vec![
1769            Tag::parse(&["l", HELLO_TAG])?,
1770            Tag::parse(&["peerId", msg.peer_id()])?,
1771        ];
1772
1773        let event =
1774            EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), "", tags).to_event(keys)?;
1775
1776        Ok(event)
1777    }
1778
1779    /// Handle an incoming event
1780    ///
1781    /// Messages may be:
1782    /// 1. Hello messages: kind 25050 with #l: "hello" tag and peerId
1783    /// 2. Gift-wrapped directed messages: kind 25050 with #p tag, encrypted with ephemeral key
1784    async fn handle_event(
1785        &self,
1786        relay: &str,
1787        event: &nostr::Event,
1788        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1789    ) -> Result<()> {
1790        if !self.config.signaling_enabled {
1791            return Ok(());
1792        }
1793
1794        // Must be kind 25050
1795        if event.kind != Kind::Ephemeral(WEBRTC_KIND as u16) {
1796            return Ok(());
1797        }
1798
1799        // Helper to get tag value
1800        let get_tag = |name: &str| -> Option<String> {
1801            event.tags.iter().find_map(|tag| {
1802                let v: Vec<String> = tag.clone().to_vec();
1803                if v.len() >= 2 && v[0] == name {
1804                    Some(v[1].clone())
1805                } else {
1806                    None
1807                }
1808            })
1809        };
1810
1811        // Check if this is a hello message (#l: "hello" tag)
1812        let l_tag = get_tag("l");
1813        if l_tag.as_deref() == Some(HELLO_TAG) {
1814            let sender_pubkey = event.pubkey.to_hex();
1815
1816            // Skip our own hello messages
1817            if sender_pubkey == self.my_peer_id.pubkey {
1818                return Ok(());
1819            }
1820
1821            if let Some(their_uuid) = get_tag("peerId") {
1822                debug!("Received hello from {} via {}", &sender_pubkey[..8], relay);
1823                self.handle_hello(&sender_pubkey, &their_uuid, relay, relay_write_tx)
1824                    .await?;
1825            }
1826            return Ok(());
1827        }
1828
1829        // Check if this is a directed message for us (#p tag with our pubkey)
1830        let p_tag = get_tag("p");
1831        if p_tag.as_deref() != Some(&self.keys.public_key().to_hex()) {
1832            // Not for us - ignore silently
1833            return Ok(());
1834        }
1835
1836        // Gift-wrapped directed message - decrypt using our key and ephemeral sender's pubkey
1837        if event.content.is_empty() {
1838            return Ok(());
1839        }
1840
1841        // Try to unwrap the gift - decrypt with our key and the ephemeral sender's pubkey
1842        let seal: serde_json::Value =
1843            match nip44::decrypt(self.keys.secret_key(), &event.pubkey, &event.content) {
1844                Ok(plaintext) => match serde_json::from_str(&plaintext) {
1845                    Ok(v) => v,
1846                    Err(_) => return Ok(()),
1847                },
1848                Err(_) => {
1849                    // Can't decrypt - not for us or invalid
1850                    return Ok(());
1851                }
1852            };
1853
1854        // Extract the actual sender's pubkey and content from the seal
1855        let sender_pubkey = seal
1856            .get("pubkey")
1857            .and_then(|v| v.as_str())
1858            .ok_or_else(|| anyhow::anyhow!("Missing pubkey in seal"))?;
1859
1860        // Skip our own messages
1861        if sender_pubkey == self.my_peer_id.pubkey {
1862            return Ok(());
1863        }
1864
1865        let content = seal
1866            .get("content")
1867            .and_then(|v| v.as_str())
1868            .ok_or_else(|| anyhow::anyhow!("Missing content in seal"))?;
1869
1870        let raw_msg: serde_json::Value = serde_json::from_str(content)?;
1871        let msg_type = raw_msg.get("type").and_then(|v| v.as_str()).unwrap_or("");
1872
1873        // Support hashtree-ts format: { type, peerId, targetPeerId, sdp/candidate/candidates }
1874        if raw_msg.get("targetPeerId").is_some() {
1875            let target_peer = raw_msg
1876                .get("targetPeerId")
1877                .and_then(|v| v.as_str())
1878                .unwrap_or("");
1879            if target_peer != self.my_peer_id.to_string() {
1880                return Ok(());
1881            }
1882
1883            let peer_id = raw_msg.get("peerId").and_then(|v| v.as_str()).unwrap_or("");
1884            let their_uuid = peer_id.split(':').nth(1).unwrap_or(peer_id);
1885
1886            match msg_type {
1887                "offer" => {
1888                    let sdp = raw_msg
1889                        .get("sdp")
1890                        .and_then(|v| v.as_str())
1891                        .ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
1892                    let offer = serde_json::json!({ "type": "offer", "sdp": sdp });
1893                    self.handle_offer(sender_pubkey, their_uuid, offer, relay, relay_write_tx)
1894                        .await?;
1895                }
1896                "answer" => {
1897                    let sdp = raw_msg
1898                        .get("sdp")
1899                        .and_then(|v| v.as_str())
1900                        .ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
1901                    let answer = serde_json::json!({ "type": "answer", "sdp": sdp });
1902                    self.handle_answer(sender_pubkey, their_uuid, answer)
1903                        .await?;
1904                }
1905                "candidate" => {
1906                    let candidate = raw_msg
1907                        .get("candidate")
1908                        .and_then(|v| v.as_str())
1909                        .unwrap_or("");
1910                    if !candidate.is_empty() {
1911                        let candidate_json = serde_json::json!({
1912                            "candidate": candidate,
1913                            "sdpMid": raw_msg.get("sdpMid"),
1914                            "sdpMLineIndex": raw_msg.get("sdpMLineIndex"),
1915                        });
1916                        self.handle_candidate(sender_pubkey, their_uuid, candidate_json)
1917                            .await?;
1918                    }
1919                }
1920                "candidates" => {
1921                    let candidates = raw_msg
1922                        .get("candidates")
1923                        .and_then(|v| v.as_array())
1924                        .map(|entries| {
1925                            entries
1926                                .iter()
1927                                .filter_map(|entry| {
1928                                    entry
1929                                        .get("candidate")
1930                                        .and_then(|v| v.as_str())
1931                                        .or_else(|| entry.as_str())
1932                                        .map(|candidate_str| {
1933                                            serde_json::json!({
1934                                                "candidate": candidate_str,
1935                                                "sdpMid": entry.get("sdpMid"),
1936                                                "sdpMLineIndex": entry.get("sdpMLineIndex"),
1937                                            })
1938                                        })
1939                                })
1940                                .collect::<Vec<_>>()
1941                        })
1942                        .unwrap_or_default();
1943                    self.handle_candidates(sender_pubkey, their_uuid, candidates)
1944                        .await?;
1945                }
1946                _ => {}
1947            }
1948
1949            return Ok(());
1950        }
1951
1952        let msg: SignalingMessage = serde_json::from_value(raw_msg)?;
1953
1954        debug!(
1955            "Received {} from {} via {} (gift-wrapped)",
1956            msg.msg_type(),
1957            &sender_pubkey[..8],
1958            relay
1959        );
1960
1961        match msg {
1962            SignalingMessage::Hello { .. } => {
1963                // Hello messages should come via tags, not gift wrap
1964                return Ok(());
1965            }
1966            SignalingMessage::Offer {
1967                recipient,
1968                peer_id: their_uuid,
1969                offer,
1970            } => {
1971                if recipient != self.my_peer_id.to_string() {
1972                    return Ok(()); // Not for us
1973                }
1974                if let Err(e) = self
1975                    .handle_offer(sender_pubkey, &their_uuid, offer, relay, relay_write_tx)
1976                    .await
1977                {
1978                    error!(
1979                        "handle_offer FAILED: sender={}, uuid={}, error={:?}",
1980                        &sender_pubkey[..8.min(sender_pubkey.len())],
1981                        their_uuid,
1982                        e
1983                    );
1984                    return Err(e);
1985                }
1986            }
1987            SignalingMessage::Answer {
1988                recipient,
1989                peer_id: their_uuid,
1990                answer,
1991            } => {
1992                if recipient != self.my_peer_id.to_string() {
1993                    return Ok(());
1994                }
1995                self.handle_answer(sender_pubkey, &their_uuid, answer)
1996                    .await?;
1997            }
1998            SignalingMessage::Candidate {
1999                recipient,
2000                peer_id: their_uuid,
2001                candidate,
2002            } => {
2003                if recipient != self.my_peer_id.to_string() {
2004                    return Ok(());
2005                }
2006                self.handle_candidate(sender_pubkey, &their_uuid, candidate)
2007                    .await?;
2008            }
2009            SignalingMessage::Candidates {
2010                recipient,
2011                peer_id: their_uuid,
2012                candidates,
2013            } => {
2014                if recipient != self.my_peer_id.to_string() {
2015                    return Ok(());
2016                }
2017                self.handle_candidates(sender_pubkey, &their_uuid, candidates)
2018                    .await?;
2019            }
2020        }
2021
2022        Ok(())
2023    }
2024
2025    /// Handle incoming hello message
2026    async fn handle_hello(
2027        &self,
2028        sender_pubkey: &str,
2029        their_uuid: &str,
2030        source: &str,
2031        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
2032    ) -> Result<()> {
2033        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
2034        let peer_key = full_peer_id.to_string();
2035        let mut already_discovered = false;
2036        let mut signal_paths = BTreeSet::from([PeerSignalPath::from_source_name(source)]);
2037
2038        // Check if we already have this peer
2039        {
2040            let peers = self.state.peers.read().await;
2041            if !self.can_track_local_bus_peer(source, &peer_key, &peers) {
2042                debug!(
2043                    "Ignoring hello from {} via {} - local bus peer limit reached",
2044                    full_peer_id.short(),
2045                    source
2046                );
2047                return Ok(());
2048            }
2049            if let Some(entry) = peers.get(&peer_key) {
2050                // Already connected or connecting, just update last_seen
2051                if entry.state == ConnectionState::Connected
2052                    || entry.state == ConnectionState::Connecting
2053                {
2054                    return Ok(());
2055                }
2056                already_discovered = true;
2057                signal_paths.extend(entry.signal_paths.iter().copied());
2058            }
2059        }
2060
2061        // Classify the peer into a pool
2062        let pool = (self.peer_classifier)(sender_pubkey);
2063
2064        // Check pool limits
2065        let pool_counts = self.get_pool_counts().await;
2066        if !self.can_accept_peer(pool, &pool_counts) {
2067            debug!(
2068                "Ignoring hello from {} - pool {:?} is full",
2069                full_peer_id.short(),
2070                pool
2071            );
2072            return Ok(());
2073        }
2074
2075        // Decide if we should initiate based on tie-breaking
2076        let should_initiate = self.should_initiate(their_uuid);
2077
2078        // If pool is already satisfied, don't initiate new outbound connections
2079        // This reserves space for inbound connections
2080        let pool_satisfied = self.is_pool_satisfied(pool, &pool_counts);
2081        let will_initiate = should_initiate && !pool_satisfied;
2082
2083        info!(
2084            "Discovered peer: {} (pool: {:?}, initiate: {}, pool_satisfied: {})",
2085            full_peer_id.short(),
2086            pool,
2087            will_initiate,
2088            pool_satisfied
2089        );
2090
2091        // If we're not initiating and pool is satisfied, don't even add to discovered
2092        // (we won't accept their offer either since pool check happens in handle_offer)
2093        if !will_initiate && pool_satisfied {
2094            debug!(
2095                "Pool {:?} is satisfied, not tracking peer {}",
2096                pool,
2097                full_peer_id.short()
2098            );
2099            return Ok(());
2100        }
2101
2102        // Create peer entry with pool assignment
2103        {
2104            let mut peers = self.state.peers.write().await;
2105            peers.insert(
2106                peer_key.clone(),
2107                PeerEntry {
2108                    peer_id: full_peer_id.clone(),
2109                    direction: if will_initiate {
2110                        PeerDirection::Outbound
2111                    } else {
2112                        PeerDirection::Inbound
2113                    },
2114                    state: ConnectionState::Discovered,
2115                    last_seen: Instant::now(),
2116                    peer: None,
2117                    pool,
2118                    transport: PeerTransport::WebRtc,
2119                    signal_paths,
2120                    bytes_sent: 0,
2121                    bytes_received: 0,
2122                },
2123            );
2124        }
2125
2126        // If we discovered a peer but are not the initiator, send one immediate hello
2127        // to accelerate reciprocal discovery over relayless mesh paths.
2128        if !will_initiate && !already_discovered {
2129            self.dispatch_signaling_message(
2130                SignalingMessage::hello(&self.my_peer_id.uuid),
2131                relay_write_tx,
2132            )
2133            .await;
2134        }
2135
2136        // If we should initiate, create offer
2137        if will_initiate {
2138            self.initiate_connection(&full_peer_id, pool, relay_write_tx)
2139                .await?;
2140        }
2141
2142        Ok(())
2143    }
2144
2145    /// Initiate a connection to a peer (create and send offer)
2146    async fn initiate_connection(
2147        &self,
2148        peer_id: &PeerId,
2149        pool: PeerPool,
2150        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
2151    ) -> Result<()> {
2152        let peer_key = peer_id.to_string();
2153
2154        info!(
2155            "Initiating connection to {} (pool: {:?})",
2156            peer_id.short(),
2157            pool
2158        );
2159
2160        // Create peer connection with content store and state events
2161        let peer = Peer::new_with_store_and_events(
2162            peer_id.clone(),
2163            PeerDirection::Outbound,
2164            self.my_peer_id.clone(),
2165            self.signaling_tx.clone(),
2166            self.config.stun_servers.clone(),
2167            self.store.clone(),
2168            Some(self.state_event_tx.clone()),
2169            self.nostr_relay.clone(),
2170            Some(self.mesh_frame_tx.clone()),
2171            Some(self.state.cashu_quotes.clone()),
2172        )
2173        .await?;
2174
2175        peer.setup_handlers().await?;
2176
2177        // Create offer
2178        let offer = peer.connect().await?;
2179
2180        // Update state
2181        {
2182            let mut peers = self.state.peers.write().await;
2183            if let Some(entry) = peers.get_mut(&peer_key) {
2184                entry.state = ConnectionState::Connecting;
2185                entry.peer = Some(MeshPeer::WebRtc(Arc::new(peer)));
2186                entry.pool = pool;
2187            }
2188        }
2189
2190        // Send offer
2191        let offer_msg = SignalingMessage::Offer {
2192            offer,
2193            recipient: peer_id.to_string(),
2194            peer_id: self.my_peer_id.uuid.clone(),
2195        };
2196        self.dispatch_signaling_message(offer_msg, relay_write_tx)
2197            .await;
2198
2199        info!("Sent offer to {}", peer_id.short());
2200
2201        Ok(())
2202    }
2203
2204    /// Handle incoming offer
2205    async fn handle_offer(
2206        &self,
2207        sender_pubkey: &str,
2208        their_uuid: &str,
2209        offer: serde_json::Value,
2210        source: &str,
2211        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
2212    ) -> Result<()> {
2213        debug!(
2214            "handle_offer ENTRY: sender={}, uuid={}",
2215            &sender_pubkey[..8.min(sender_pubkey.len())],
2216            their_uuid
2217        );
2218        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
2219        let peer_key = full_peer_id.to_string();
2220        let mut signal_paths = BTreeSet::from([PeerSignalPath::from_source_name(source)]);
2221
2222        // Classify the peer into a pool
2223        let pool = (self.peer_classifier)(sender_pubkey);
2224
2225        info!(
2226            "Received offer from {} (pool: {:?})",
2227            full_peer_id.short(),
2228            pool
2229        );
2230
2231        // Check if we already have this peer with an actual connection
2232        {
2233            let peers = self.state.peers.read().await;
2234            if !self.can_track_local_bus_peer(source, &peer_key, &peers) {
2235                warn!(
2236                    "Rejecting offer from {} via {} - local bus peer limit reached",
2237                    full_peer_id.short(),
2238                    source
2239                );
2240                return Ok(());
2241            }
2242            debug!(
2243                "Checking for existing peer, peer_key: {}, known_peers: {}",
2244                peer_key,
2245                peers.len()
2246            );
2247            if let Some(entry) = peers.get(&peer_key) {
2248                // Only skip if we have an actual peer connection (not just discovered)
2249                if entry.peer.is_some() {
2250                    debug!(
2251                        "Already have peer {} with connection, skipping offer",
2252                        full_peer_id.short()
2253                    );
2254                    return Ok(());
2255                }
2256                signal_paths.extend(entry.signal_paths.iter().copied());
2257                debug!(
2258                    "Peer {} exists but has no connection, proceeding",
2259                    full_peer_id.short()
2260                );
2261            } else {
2262                debug!(
2263                    "Peer {} not found in peers map, will create new entry",
2264                    full_peer_id.short()
2265                );
2266            }
2267        }
2268
2269        // Check pool limits
2270        let pool_counts = self.get_pool_counts().await;
2271        debug!(
2272            "Pool counts: {:?}, checking can_accept_peer for {:?}",
2273            pool_counts, pool
2274        );
2275        if !self.can_accept_peer(pool, &pool_counts) {
2276            warn!(
2277                "Rejecting offer from {} - pool {:?} is full",
2278                full_peer_id.short(),
2279                pool
2280            );
2281            return Ok(());
2282        }
2283        debug!("Pool check passed for {}", full_peer_id.short());
2284
2285        // Create peer connection with content store and state events
2286        debug!("Creating peer connection for {}", full_peer_id.short());
2287        let peer = Peer::new_with_store_and_events(
2288            full_peer_id.clone(),
2289            PeerDirection::Inbound,
2290            self.my_peer_id.clone(),
2291            self.signaling_tx.clone(),
2292            self.config.stun_servers.clone(),
2293            self.store.clone(),
2294            Some(self.state_event_tx.clone()),
2295            self.nostr_relay.clone(),
2296            Some(self.mesh_frame_tx.clone()),
2297            Some(self.state.cashu_quotes.clone()),
2298        )
2299        .await?;
2300        debug!("Peer connection created for {}", full_peer_id.short());
2301
2302        peer.setup_handlers().await?;
2303        debug!("Handlers set up for {}", full_peer_id.short());
2304
2305        // Handle offer and create answer
2306        let answer = peer.handle_offer(offer).await?;
2307        debug!("Answer created for {}", full_peer_id.short());
2308
2309        // Update state
2310        {
2311            let mut peers = self.state.peers.write().await;
2312            peers.insert(
2313                peer_key,
2314                PeerEntry {
2315                    peer_id: full_peer_id.clone(),
2316                    direction: PeerDirection::Inbound,
2317                    state: ConnectionState::Connecting,
2318                    last_seen: Instant::now(),
2319                    peer: Some(MeshPeer::WebRtc(Arc::new(peer))),
2320                    pool,
2321                    transport: PeerTransport::WebRtc,
2322                    signal_paths,
2323                    bytes_sent: 0,
2324                    bytes_received: 0,
2325                },
2326            );
2327        }
2328
2329        // Send answer
2330        // Note: peer_id is just the UUID, not full pubkey:uuid
2331        // The recipient will construct full peer_id from sender pubkey + this UUID
2332        let answer_msg = SignalingMessage::Answer {
2333            answer,
2334            recipient: full_peer_id.to_string(),
2335            peer_id: self.my_peer_id.uuid.clone(),
2336        };
2337        self.dispatch_signaling_message(answer_msg, relay_write_tx)
2338            .await;
2339        info!("Sent answer to {}", full_peer_id.short());
2340
2341        Ok(())
2342    }
2343
2344    /// Handle incoming answer
2345    async fn handle_answer(
2346        &self,
2347        sender_pubkey: &str,
2348        their_uuid: &str,
2349        answer: serde_json::Value,
2350    ) -> Result<()> {
2351        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
2352        let peer_key = full_peer_id.to_string();
2353
2354        info!("Received answer from {}", full_peer_id.short());
2355
2356        let maybe_peer = {
2357            let peers = self.state.peers.read().await;
2358            peers.get(&peer_key).and_then(|entry| {
2359                // Skip if already connected - duplicate answers from multiple relays
2360                if entry.state == ConnectionState::Connected {
2361                    debug!(
2362                        "Ignoring duplicate answer from {} - already connected",
2363                        full_peer_id.short()
2364                    );
2365                    return None;
2366                }
2367                entry
2368                    .peer
2369                    .as_ref()
2370                    .and_then(|peer| peer.as_webrtc().cloned())
2371            })
2372        };
2373
2374        if let Some(webrtc_peer) = maybe_peer {
2375            // Skip if already connected - duplicate answers from multiple relays
2376            use webrtc::peer_connection::signaling_state::RTCSignalingState;
2377            let signaling_state = webrtc_peer.signaling_state();
2378            if signaling_state != RTCSignalingState::HaveLocalOffer {
2379                debug!(
2380                    "Ignoring answer from {} - signaling state is {:?}, not HaveLocalOffer",
2381                    full_peer_id.short(),
2382                    signaling_state
2383                );
2384                return Ok(());
2385            }
2386            webrtc_peer.handle_answer(answer).await?;
2387            info!("Applied answer from {}", full_peer_id.short());
2388        } else {
2389            let peers = self.state.peers.read().await;
2390            if let Some(entry) = peers.get(&peer_key) {
2391                if entry.peer.is_some() {
2392                    debug!(
2393                        "Peer {} does not use answer signaling",
2394                        full_peer_id.short()
2395                    );
2396                } else {
2397                    debug!("Peer {} has no connection object", full_peer_id.short());
2398                }
2399            } else {
2400                debug!("No peer found for key: {}", peer_key);
2401            }
2402        }
2403
2404        Ok(())
2405    }
2406
2407    /// Handle incoming ICE candidate
2408    async fn handle_candidate(
2409        &self,
2410        sender_pubkey: &str,
2411        their_uuid: &str,
2412        candidate: serde_json::Value,
2413    ) -> Result<()> {
2414        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
2415        let peer_key = full_peer_id.to_string();
2416
2417        info!("Received ICE candidate from {}", full_peer_id.short());
2418
2419        let maybe_peer = {
2420            let peers = self.state.peers.read().await;
2421            peers.get(&peer_key).and_then(|entry| {
2422                entry
2423                    .peer
2424                    .as_ref()
2425                    .and_then(|peer| peer.as_webrtc().cloned())
2426            })
2427        };
2428        if let Some(peer) = maybe_peer {
2429            peer.handle_candidate(candidate).await?;
2430        }
2431
2432        Ok(())
2433    }
2434
2435    /// Handle batched ICE candidates
2436    async fn handle_candidates(
2437        &self,
2438        sender_pubkey: &str,
2439        their_uuid: &str,
2440        candidates: Vec<serde_json::Value>,
2441    ) -> Result<()> {
2442        let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
2443        let peer_key = full_peer_id.to_string();
2444
2445        debug!(
2446            "Received {} candidates from {}",
2447            candidates.len(),
2448            full_peer_id.short()
2449        );
2450
2451        let maybe_peer = {
2452            let peers = self.state.peers.read().await;
2453            peers.get(&peer_key).and_then(|entry| {
2454                entry
2455                    .peer
2456                    .as_ref()
2457                    .and_then(|peer| peer.as_webrtc().cloned())
2458            })
2459        };
2460        if let Some(peer) = maybe_peer {
2461            for candidate in candidates {
2462                if let Err(e) = peer.handle_candidate(candidate).await {
2463                    debug!("Failed to add candidate: {}", e);
2464                }
2465            }
2466        }
2467
2468        Ok(())
2469    }
2470
2471    /// Handle peer state change events from peer connections
2472    async fn handle_peer_state_event(
2473        &self,
2474        event: PeerStateEvent,
2475        relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
2476    ) {
2477        match event {
2478            PeerStateEvent::Connected(peer_id) => {
2479                let peer_key = peer_id.to_string();
2480                let mut emit_hello = false;
2481                let mut peers = self.state.peers.write().await;
2482                if let Some(entry) = peers.get_mut(&peer_key) {
2483                    if entry.state != ConnectionState::Connected {
2484                        info!("Peer {} connected (via state event)", peer_id.short());
2485                        entry.state = ConnectionState::Connected;
2486                        emit_hello = true;
2487                        // Update connected count
2488                        self.state
2489                            .connected_count
2490                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2491                    }
2492                }
2493                drop(peers);
2494                if emit_hello {
2495                    self.dispatch_signaling_message(
2496                        SignalingMessage::hello(&self.my_peer_id.uuid),
2497                        relay_write_tx,
2498                    )
2499                    .await;
2500                }
2501            }
2502            PeerStateEvent::Failed(peer_id) => {
2503                let peer_key = peer_id.to_string();
2504                info!(
2505                    "Peer {} connection failed - removing from pool",
2506                    peer_id.short()
2507                );
2508                let removed = {
2509                    let mut peers = self.state.peers.write().await;
2510                    peers.remove(&peer_key)
2511                };
2512                if let Some(entry) = removed {
2513                    // Decrement connected count if was connected
2514                    if entry.state == ConnectionState::Connected {
2515                        self.state
2516                            .connected_count
2517                            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
2518                    }
2519                    // Close the peer connection if it exists
2520                    if let Some(peer) = entry.peer {
2521                        let _ = peer.close().await;
2522                    }
2523                }
2524            }
2525            PeerStateEvent::Disconnected(peer_id) => {
2526                let peer_key = peer_id.to_string();
2527                info!("Peer {} disconnected - removing from pool", peer_id.short());
2528                let removed = {
2529                    let mut peers = self.state.peers.write().await;
2530                    peers.remove(&peer_key)
2531                };
2532                if let Some(entry) = removed {
2533                    // Decrement connected count if was connected
2534                    if entry.state == ConnectionState::Connected {
2535                        self.state
2536                            .connected_count
2537                            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
2538                    }
2539                    // Close the peer connection if it exists
2540                    if let Some(peer) = entry.peer {
2541                        let _ = peer.close().await;
2542                    }
2543                }
2544            }
2545        }
2546    }
2547
2548    /// Cleanup stale peers and sync connection states (fallback, runs every 30s)
2549    async fn cleanup_stale_peers(&self) {
2550        let mut peers = self.state.peers.write().await;
2551        let mut connected_count = 0;
2552        let mut to_remove = Vec::new();
2553        let stale_timeout = Duration::from_secs(60); // Remove peers stuck in Discovered/Connecting for 60s
2554
2555        for (key, entry) in peers.iter_mut() {
2556            if let Some(ref peer) = entry.peer {
2557                // Sync connected state as fallback (in case event was missed)
2558                if peer.is_connected() {
2559                    if entry.state != ConnectionState::Connected {
2560                        info!(
2561                            "Peer {} is now connected (sync fallback)",
2562                            entry.peer_id.short()
2563                        );
2564                        entry.state = ConnectionState::Connected;
2565                    }
2566                    connected_count += 1;
2567                } else if entry.state == ConnectionState::Connected {
2568                    info!(
2569                        "Removing disconnected peer {} after transport closed",
2570                        entry.peer_id.short()
2571                    );
2572                    to_remove.push(key.clone());
2573                } else if entry.state == ConnectionState::Connecting
2574                    && entry.last_seen.elapsed() > stale_timeout
2575                {
2576                    // Peer stuck in Connecting for too long - mark for removal
2577                    info!(
2578                        "Removing stale peer {} (stuck in Connecting for {:?})",
2579                        entry.peer_id.short(),
2580                        entry.last_seen.elapsed()
2581                    );
2582                    to_remove.push(key.clone());
2583                }
2584            } else if entry.state == ConnectionState::Discovered
2585                && entry.last_seen.elapsed() > stale_timeout
2586            {
2587                // Discovered peer with no actual connection - remove
2588                debug!("Removing stale discovered peer {}", entry.peer_id.short());
2589                to_remove.push(key.clone());
2590            }
2591        }
2592
2593        // Remove stale peers
2594        let mut removed_peers = Vec::new();
2595        for key in to_remove {
2596            if let Some(entry) = peers.remove(&key) {
2597                removed_peers.push(entry);
2598            }
2599        }
2600        drop(peers);
2601
2602        for entry in removed_peers {
2603            if let Some(peer) = entry.peer {
2604                let _ = peer.close().await;
2605            }
2606        }
2607
2608        self.state
2609            .connected_count
2610            .store(connected_count, std::sync::atomic::Ordering::Relaxed);
2611    }
2612}
2613
2614pub type PeerRouter = WebRTCManager;
2615
2616// Keep the old PeerState for backward compatibility with tests
2617#[allow(dead_code)]
2618#[derive(Debug, Clone)]
2619pub struct PeerState {
2620    pub peer_id: PeerId,
2621    pub direction: PeerDirection,
2622    pub state: String,
2623    pub last_seen: Instant,
2624}
2625
2626#[cfg(test)]
2627mod tests {
2628    use super::*;
2629    use crate::webrtc::root_events::PeerRootEvent;
2630    use crate::webrtc::session::TestMeshPeer;
2631    use crate::webrtc::SelectionStrategy;
2632    use anyhow::Result as AnyResult;
2633    use async_trait::async_trait;
2634    use nostr::{EventBuilder, Keys, Tag};
2635    use std::time::Duration;
2636
2637    struct TestLocalBus {
2638        source: &'static str,
2639        root: Option<PeerRootEvent>,
2640    }
2641
2642    #[async_trait]
2643    impl super::super::LocalNostrBus for TestLocalBus {
2644        fn source_name(&self) -> &'static str {
2645            self.source
2646        }
2647
2648        async fn broadcast_event(&self, _event: &nostr::Event) -> AnyResult<()> {
2649            Ok(())
2650        }
2651
2652        async fn query_root(
2653            &self,
2654            _owner_pubkey: &str,
2655            _tree_name: &str,
2656            _timeout: Duration,
2657        ) -> Option<PeerRootEvent> {
2658            self.root.clone()
2659        }
2660    }
2661
2662    #[test]
2663    fn root_event_from_peer_extracts_tags() {
2664        let keys = Keys::generate();
2665        let hash = "ab".repeat(32);
2666        let event = EventBuilder::new(
2667            Kind::Custom(super::super::root_events::HASHTREE_KIND),
2668            "",
2669            [
2670                Tag::parse(&["d", "repo"]).unwrap(),
2671                Tag::parse(&["l", super::super::root_events::HASHTREE_LABEL]).unwrap(),
2672                Tag::parse(&["hash", &hash]).unwrap(),
2673                Tag::parse(&["encryptedKey", &"11".repeat(32)]).unwrap(),
2674            ],
2675        )
2676        .to_event(&keys)
2677        .unwrap();
2678
2679        let parsed = root_event_from_peer(&event, "peer-a", "repo").unwrap();
2680        let expected_encrypted = "11".repeat(32);
2681        assert_eq!(parsed.hash, hash);
2682        assert_eq!(parsed.peer_id, "peer-a");
2683        assert_eq!(
2684            parsed.encrypted_key.as_deref(),
2685            Some(expected_encrypted.as_str())
2686        );
2687        assert!(parsed.key.is_none());
2688    }
2689
2690    #[test]
2691    fn pick_latest_event_prefers_higher_event_id_on_timestamp_tie() {
2692        let keys = Keys::generate();
2693        let created_at = nostr::Timestamp::from_secs(1_700_000_000);
2694        let event_a = EventBuilder::new(
2695            Kind::Custom(super::super::root_events::HASHTREE_KIND),
2696            "",
2697            [],
2698        )
2699        .custom_created_at(created_at)
2700        .to_event(&keys)
2701        .unwrap();
2702        let event_b = EventBuilder::new(
2703            Kind::Custom(super::super::root_events::HASHTREE_KIND),
2704            "",
2705            [],
2706        )
2707        .custom_created_at(created_at)
2708        .to_event(&keys)
2709        .unwrap();
2710
2711        let expected = if event_a.id > event_b.id {
2712            event_a.id
2713        } else {
2714            event_b.id
2715        };
2716        let picked = pick_latest_event([&event_a, &event_b]).unwrap();
2717        assert_eq!(picked.id, expected);
2718    }
2719
2720    #[tokio::test]
2721    async fn resolve_root_from_local_buses_returns_source_and_first_match() {
2722        let state = WebRTCState::new();
2723        let root = PeerRootEvent {
2724            hash: "ab".repeat(32),
2725            key: None,
2726            encrypted_key: None,
2727            self_encrypted_key: None,
2728            event_id: "event-1".to_string(),
2729            created_at: 1,
2730            peer_id: "bus-peer".to_string(),
2731        };
2732
2733        state
2734            .set_local_buses(vec![
2735                Arc::new(TestLocalBus {
2736                    source: "empty",
2737                    root: None,
2738                }),
2739                Arc::new(TestLocalBus {
2740                    source: "mock-bus",
2741                    root: Some(root.clone()),
2742                }),
2743            ])
2744            .await;
2745
2746        let resolved = state
2747            .resolve_root_from_local_buses_with_source("owner", "tree", Duration::from_millis(10))
2748            .await
2749            .expect("expected root from local bus");
2750
2751        assert_eq!(resolved.0, "mock-bus");
2752        assert_eq!(resolved.1.hash, root.hash);
2753        assert_eq!(resolved.1.peer_id, root.peer_id);
2754    }
2755
2756    #[tokio::test]
2757    async fn can_track_local_bus_peer_enforces_wifi_aware_limit() {
2758        let keys = Keys::generate();
2759        let mut config = WebRTCConfig::default();
2760        config.wifi_aware.enabled = true;
2761        config.wifi_aware.max_peers = 1;
2762        let manager = WebRTCManager::new(keys, config);
2763        let existing_peer = PeerId::new("peer-a".to_string(), Some("sess-a".to_string()));
2764        let existing_key = existing_peer.to_string();
2765        let mut peers = HashMap::new();
2766        peers.insert(
2767            existing_key.clone(),
2768            PeerEntry {
2769                peer_id: existing_peer,
2770                direction: PeerDirection::Outbound,
2771                state: ConnectionState::Discovered,
2772                last_seen: Instant::now(),
2773                peer: None,
2774                pool: PeerPool::Other,
2775                transport: PeerTransport::WebRtc,
2776                signal_paths: BTreeSet::from([PeerSignalPath::WifiAware]),
2777                bytes_sent: 0,
2778                bytes_received: 0,
2779            },
2780        );
2781
2782        assert!(manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, &existing_key, &peers,));
2783        assert!(!manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, "peer-b:sess-b", &peers,));
2784        assert!(manager.can_track_local_bus_peer("relay", "peer-c:sess-c", &peers));
2785    }
2786
2787    #[tokio::test]
2788    async fn request_from_peers_with_source_accepts_generic_mesh_peers() {
2789        let state = WebRTCState::new();
2790        let data = b"offline-over-ble".to_vec();
2791        let hash_hex = hex::encode(hashtree_core::sha256(&data));
2792
2793        state.peers.write().await.insert(
2794            "peer-a".to_string(),
2795            PeerEntry {
2796                peer_id: PeerId::new("peer-a-pub".to_string(), Some("session-a".to_string())),
2797                direction: PeerDirection::Outbound,
2798                state: ConnectionState::Connected,
2799                last_seen: Instant::now(),
2800                peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_response(Some(
2801                    data.clone(),
2802                )))),
2803                pool: PeerPool::Other,
2804                transport: PeerTransport::Bluetooth,
2805                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2806                bytes_sent: 0,
2807                bytes_received: 0,
2808            },
2809        );
2810
2811        let resolved = state
2812            .request_from_peers_with_source(&hash_hex)
2813            .await
2814            .expect("expected mock mesh peer response");
2815
2816        assert_eq!(resolved.0, data);
2817        assert_eq!(resolved.1, "peer-a-pub:session-a");
2818    }
2819
2820    #[tokio::test]
2821    async fn request_from_peers_with_source_waits_full_timeout_for_last_generic_peer() {
2822        let state = WebRTCState::new_with_routing_and_cashu(
2823            SelectionStrategy::TitForTat,
2824            true,
2825            RequestDispatchConfig {
2826                initial_fanout: 1,
2827                hedge_fanout: 1,
2828                max_fanout: 1,
2829                hedge_interval_ms: 50,
2830            },
2831            Duration::from_millis(400),
2832            CashuRoutingConfig::default(),
2833            None,
2834            None,
2835        );
2836        let data = b"slow-offline-over-ble".to_vec();
2837        let hash_hex = hex::encode(hashtree_core::sha256(&data));
2838
2839        state.peers.write().await.insert(
2840            "peer-a".to_string(),
2841            PeerEntry {
2842                peer_id: PeerId::new("peer-a-pub".to_string(), Some("session-a".to_string())),
2843                direction: PeerDirection::Outbound,
2844                state: ConnectionState::Connected,
2845                last_seen: Instant::now(),
2846                peer: Some(MeshPeer::mock_for_tests(
2847                    TestMeshPeer::with_delayed_response(
2848                        Some(data.clone()),
2849                        Duration::from_millis(200),
2850                    ),
2851                )),
2852                pool: PeerPool::Other,
2853                transport: PeerTransport::Bluetooth,
2854                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2855                bytes_sent: 0,
2856                bytes_received: 0,
2857            },
2858        );
2859
2860        let resolved = state
2861            .request_from_peers_with_source(&hash_hex)
2862            .await
2863            .expect("expected delayed mock mesh peer response");
2864
2865        assert_eq!(resolved.0, data);
2866        assert_eq!(resolved.1, "peer-a-pub:session-a");
2867    }
2868
2869    #[tokio::test]
2870    async fn dispatch_signaling_message_is_noop_when_signaling_disabled() {
2871        let keys = Keys::generate();
2872        let mut config = WebRTCConfig::default();
2873        config.signaling_enabled = false;
2874        let manager = WebRTCManager::new(keys, config);
2875        let peer_id = PeerId::new("peer-a-pub".to_string(), Some("session-a".to_string()));
2876        let peer_key = peer_id.to_string();
2877        let peer = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
2878        let peer_ref = peer.mock_ref().expect("mock peer").clone();
2879
2880        manager.state.peers.write().await.insert(
2881            peer_key,
2882            PeerEntry {
2883                peer_id,
2884                direction: PeerDirection::Outbound,
2885                state: ConnectionState::Connected,
2886                last_seen: Instant::now(),
2887                peer: Some(peer),
2888                pool: PeerPool::Other,
2889                transport: PeerTransport::Bluetooth,
2890                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2891                bytes_sent: 0,
2892                bytes_received: 0,
2893            },
2894        );
2895
2896        let (relay_tx, _) = tokio::sync::broadcast::channel(4);
2897        manager
2898            .dispatch_signaling_message(
2899                SignalingMessage::hello(&manager.my_peer_id.uuid),
2900                &relay_tx,
2901            )
2902            .await;
2903
2904        assert_eq!(peer_ref.sent_frame_count().await, 0);
2905    }
2906
2907    #[tokio::test]
2908    async fn failed_peer_cleanup_does_not_hold_peer_map_lock_while_closing() {
2909        let keys = Keys::generate();
2910        let manager = Arc::new(WebRTCManager::new(keys, WebRTCConfig::default()));
2911        let peer_id = PeerId::new("peer-a-pub".to_string(), Some("session-a".to_string()));
2912        let peer_key = peer_id.to_string();
2913
2914        manager.state.peers.write().await.insert(
2915            peer_key.clone(),
2916            PeerEntry {
2917                peer_id: peer_id.clone(),
2918                direction: PeerDirection::Outbound,
2919                state: ConnectionState::Connected,
2920                last_seen: Instant::now(),
2921                peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_close(
2922                    Duration::from_millis(200),
2923                ))),
2924                pool: PeerPool::Other,
2925                transport: PeerTransport::Bluetooth,
2926                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2927                bytes_sent: 0,
2928                bytes_received: 0,
2929            },
2930        );
2931
2932        let (relay_tx, _) = tokio::sync::broadcast::channel(4);
2933        let manager_for_task = manager.clone();
2934        let peer_id_for_task = peer_id.clone();
2935        let cleanup_task = tokio::spawn(async move {
2936            manager_for_task
2937                .handle_peer_state_event(PeerStateEvent::Failed(peer_id_for_task), &relay_tx)
2938                .await;
2939        });
2940
2941        tokio::time::sleep(Duration::from_millis(20)).await;
2942
2943        let remaining = tokio::time::timeout(Duration::from_millis(50), async {
2944            manager.state.peers.read().await.len()
2945        })
2946        .await
2947        .expect("peer map read should not block on close");
2948
2949        assert_eq!(remaining, 0);
2950        cleanup_task.await.expect("cleanup task");
2951    }
2952
2953    #[tokio::test]
2954    async fn resolve_root_from_peers_does_not_hold_peer_map_lock_while_querying() {
2955        let keys = Keys::generate();
2956        let manager = Arc::new(WebRTCManager::new(keys.clone(), WebRTCConfig::default()));
2957        let owner_keys = Keys::generate();
2958        let owner_pubkey = owner_keys.public_key().to_hex();
2959        let tree_name = "video";
2960        let hash = "ab".repeat(32);
2961        let event = EventBuilder::new(
2962            Kind::Custom(super::super::root_events::HASHTREE_KIND),
2963            "",
2964            [
2965                Tag::parse(&["d", tree_name]).unwrap(),
2966                Tag::parse(&["l", super::super::root_events::HASHTREE_LABEL]).unwrap(),
2967                Tag::parse(&["hash", &hash]).unwrap(),
2968            ],
2969        )
2970        .to_event(&owner_keys)
2971        .unwrap();
2972
2973        let peer_id = PeerId::new("peer-a-pub".to_string(), Some("session-a".to_string()));
2974        let peer_key = peer_id.to_string();
2975
2976        manager.state.peers.write().await.insert(
2977            peer_key.clone(),
2978            PeerEntry {
2979                peer_id,
2980                direction: PeerDirection::Outbound,
2981                state: ConnectionState::Connected,
2982                last_seen: Instant::now(),
2983                peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_events(
2984                    vec![event],
2985                    Duration::from_millis(200),
2986                ))),
2987                pool: PeerPool::Other,
2988                transport: PeerTransport::Bluetooth,
2989                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2990                bytes_sent: 0,
2991                bytes_received: 0,
2992            },
2993        );
2994
2995        let manager_for_task = manager.clone();
2996        let owner_pubkey_for_task = owner_pubkey.clone();
2997        let resolve_task = tokio::spawn(async move {
2998            manager_for_task
2999                .state
3000                .resolve_root_from_peers(
3001                    &owner_pubkey_for_task,
3002                    tree_name,
3003                    Duration::from_millis(500),
3004                )
3005                .await
3006        });
3007
3008        tokio::time::sleep(Duration::from_millis(20)).await;
3009
3010        let manager_for_writer = manager.clone();
3011        let peer_key_for_writer = peer_key.clone();
3012        let writer_task = tokio::spawn(async move {
3013            let mut peers = manager_for_writer.state.peers.write().await;
3014            if let Some(entry) = peers.get_mut(&peer_key_for_writer) {
3015                entry.bytes_received += 1;
3016            }
3017        });
3018
3019        tokio::time::sleep(Duration::from_millis(20)).await;
3020
3021        let status_count = tokio::time::timeout(Duration::from_millis(50), async {
3022            manager.state.peers.read().await.len()
3023        })
3024        .await
3025        .expect("peer map read should not block on root query");
3026
3027        assert_eq!(status_count, 1);
3028        assert!(resolve_task.await.expect("resolve task").is_some());
3029        writer_task.await.expect("writer task");
3030    }
3031
3032    #[test]
3033    fn test_formal_timed_seen_set_rejects_duplicates() {
3034        let mut seen = TimedSeenSet::new(4, Duration::from_secs(60));
3035        assert!(seen.insert_if_new("frame-1".to_string()));
3036        assert!(!seen.insert_if_new("frame-1".to_string()));
3037        assert!(seen.insert_if_new("frame-2".to_string()));
3038    }
3039
3040    #[test]
3041    fn test_formal_timed_seen_set_evicts_oldest_when_capacity_exceeded() {
3042        let mut seen = TimedSeenSet::new(2, Duration::from_secs(60));
3043        assert!(seen.insert_if_new("a".to_string()));
3044        assert!(seen.insert_if_new("b".to_string()));
3045        assert!(seen.insert_if_new("c".to_string()));
3046
3047        // "a" should be evicted due to cap=2, so re-insert becomes new again.
3048        assert!(seen.insert_if_new("a".to_string()));
3049        assert!(!seen.insert_if_new("a".to_string()));
3050    }
3051
3052    #[test]
3053    fn test_request_dispatch_normalization_caps_to_available_peers() {
3054        let normalized = normalize_dispatch_config(
3055            RequestDispatchConfig {
3056                initial_fanout: 8,
3057                hedge_fanout: 6,
3058                max_fanout: 5,
3059                hedge_interval_ms: 120,
3060            },
3061            3,
3062        );
3063        assert_eq!(normalized.max_fanout, 3);
3064        assert_eq!(normalized.initial_fanout, 3);
3065        assert_eq!(normalized.hedge_fanout, 3);
3066    }
3067
3068    #[test]
3069    fn test_hedged_wave_plan_matches_dispatch_policy() {
3070        let plan = build_hedged_wave_plan(
3071            7,
3072            RequestDispatchConfig {
3073                initial_fanout: 2,
3074                hedge_fanout: 3,
3075                max_fanout: 6,
3076                hedge_interval_ms: 120,
3077            },
3078        );
3079        assert_eq!(plan, vec![2, 3, 1]);
3080    }
3081}