Skip to main content

hashtree_network/manager/
mod.rs

1//! WebRTC signaling over Nostr relays
2//!
3//! Protocol (compatible with hashtree-ts):
4//! - All signaling uses ephemeral kind 25050
5//! - Hello messages: #l: "hello" tag, broadcast for peer discovery (unencrypted)
6//! - Directed signaling (offer, answer, candidate, candidates): NIP-17 style
7//!   gift wrap for privacy - wrapped with ephemeral key, #p tag with recipient
8//!
9//! Security: Directed messages use gift wrapping with ephemeral keys so that
10//! relays cannot see the actual sender or correlate messages.
11
12pub use crate::{ConnectionState, PeerSignalPath, PeerTransport};
13use anyhow::Result;
14use async_trait::async_trait;
15use cashu_service::CashuPaymentClient;
16#[cfg(test)]
17use nostr_sdk::nostr::Kind;
18use nostr_sdk::nostr::{Event, Keys};
19use std::collections::{BTreeSet, HashMap};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::{mpsc, Mutex, RwLock};
23use tracing::{debug, error, info, warn};
24
25use crate::bluetooth::{
26    BluetoothConfig, BluetoothMesh, BluetoothPeerRegistrar, BluetoothRuntimeContext,
27};
28use crate::cashu::{CashuMintMetadataStore, CashuQuoteState, CashuRoutingConfig, NegotiatedQuote};
29use crate::local_bus::SharedLocalNostrBus;
30use crate::mesh_session::{
31    resolve_root_from_peer_sessions as resolve_root_via_peer_sessions, MeshSession,
32};
33use crate::mesh_store_core::{
34    run_hedged_waves, sync_selector_peers, HedgedWaveAction, RequestDispatchConfig,
35};
36use crate::multicast::{MulticastConfig, MulticastNostrBus};
37use crate::nostr::NostrRelayTransport;
38use crate::peer::{ContentStore, Peer, PendingRequest};
39use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectorSummary};
40use crate::protocol::{DataQuoteRequest, DataRequest};
41use crate::relay_bridge::SharedMeshRelayClient;
42use crate::root_events::PeerRootEvent;
43use crate::runtime_control::{can_track_source_peer, PeerStateEvent};
44use crate::runtime_peer::{
45    MeshPeerEntry as SharedPeerEntry, PeerClassifier as SharedPeerClassifier,
46};
47use crate::runtime_state::MeshRuntimeState;
48use crate::session::MeshPeer;
49use crate::signaling::MeshRouter;
50use crate::transport::{
51    PeerLink as SharedPeerLink, PeerLinkFactory as SharedPeerLinkFactory,
52    SignalingTransport as SharedSignalingTransport, TransportError as SharedTransportError,
53};
54use crate::types::{
55    validate_mesh_frame, KnownPeerRecord, KnownPeerSnapshot, MeshNostrFrame, MeshNostrPayload,
56    SignalingMessage, TimedSeenSet,
57};
58use crate::wifi_aware::{
59    mobile_wifi_aware_bridge, WifiAwareConfig, WifiAwareNostrBus, WIFI_AWARE_SOURCE,
60};
61use crate::{
62    ClassifyRequest as SharedClassifyRequest, IceCandidate as SharedIceCandidate, PeerDirection,
63    PeerId, PeerPool, MESH_SIGNALING_EVENT_KIND,
64};
65
66/// Callback type for classifying peers into pools
67pub type PeerClassifier = SharedPeerClassifier;
68pub type PeerEntry = SharedPeerEntry<MeshPeer>;
69
70#[derive(Clone)]
71pub struct WebRTCConfig {
72    pub relays: Vec<String>,
73    pub signaling_enabled: bool,
74    pub hash_get_enabled: bool,
75    pub signal_urls: Vec<String>,
76    pub max_outbound: usize,
77    pub max_inbound: usize,
78    pub hello_interval_ms: u64,
79    pub message_timeout_ms: u64,
80    pub stun_servers: Vec<String>,
81    pub debug: bool,
82    pub multicast: MulticastConfig,
83    pub wifi_aware: WifiAwareConfig,
84    pub bluetooth: BluetoothConfig,
85    pub pools: crate::PoolSettings,
86    pub request_selection_strategy: crate::SelectionStrategy,
87    pub request_fairness_enabled: bool,
88    pub request_dispatch: RequestDispatchConfig,
89}
90
91impl Default for WebRTCConfig {
92    fn default() -> Self {
93        Self {
94            relays: vec![
95                "wss://relay.damus.io".to_string(),
96                "wss://relay.primal.net".to_string(),
97                "wss://temp.iris.to".to_string(),
98                "wss://relay.snort.social".to_string(),
99            ],
100            signaling_enabled: true,
101            hash_get_enabled: true,
102            signal_urls: Vec::new(),
103            max_outbound: 6,
104            max_inbound: 6,
105            hello_interval_ms: 3000,
106            message_timeout_ms: 15000,
107            stun_servers: vec![
108                "stun:stun.iris.to:3478".to_string(),
109                "stun:stun.l.google.com:19302".to_string(),
110                "stun:stun.cloudflare.com:3478".to_string(),
111            ],
112            debug: false,
113            multicast: MulticastConfig::default(),
114            wifi_aware: WifiAwareConfig::default(),
115            bluetooth: BluetoothConfig::default(),
116            pools: crate::PoolSettings::default(),
117            request_selection_strategy: crate::SelectionStrategy::Weighted,
118            request_fairness_enabled: true,
119            request_dispatch: RequestDispatchConfig {
120                initial_fanout: 2,
121                hedge_fanout: 1,
122                max_fanout: 8,
123                hedge_interval_ms: 120,
124            },
125        }
126    }
127}
128
129#[derive(Debug, Clone)]
130pub struct PeerStatus {
131    pub peer_id: String,
132    pub pubkey: String,
133    pub state: String,
134    pub direction: PeerDirection,
135    pub connected_at: Option<std::time::Instant>,
136    pub pool: PeerPool,
137}
138
139fn bluetooth_nostr_only_mode() -> bool {
140    matches!(
141        std::env::var("HTREE_BLUETOOTH_NOSTR_ONLY").ok().as_deref(),
142        Some("1" | "true" | "TRUE" | "yes" | "YES")
143    )
144}
145
146/// Shared state for the native mesh router.
147pub struct WebRTCState {
148    pub runtime: MeshRuntimeState<MeshPeer>,
149    /// Shared peer selector used by live retrieval; aligned with simulation strategies.
150    peer_selector: Arc<RwLock<PeerSelector>>,
151    direct_signaling_tx: RwLock<Option<mpsc::Sender<(String, Event)>>>,
152    /// Hedged dispatch policy for retrieval requests.
153    request_dispatch: RequestDispatchConfig,
154    /// Retrieval timeout for quote negotiation and single-peer fetches.
155    request_timeout: Duration,
156    /// Shared Cashu quote negotiation policy/state.
157    cashu_quotes: Arc<CashuQuoteState>,
158}
159const SEEN_FRAME_CAP: usize = 4096;
160const SEEN_FRAME_TTL: Duration = Duration::from_secs(120);
161const SEEN_EVENT_CAP: usize = 8192;
162const SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
163
164type PendingRequestsMap = Arc<Mutex<HashMap<String, PendingRequest>>>;
165type ConnectedPeer = (
166    String,
167    PendingRequestsMap,
168    Arc<webrtc::data_channel::RTCDataChannel>,
169);
170type ConnectedSession = (String, MeshPeer, PeerTransport);
171type SharedProductionRouter = MeshRouter<RouterSignalingBridge, SharedRouterPeerFactory>;
172
173#[derive(Clone)]
174struct RouterSignalingBridge {
175    peer_id: String,
176    signaling_tx: mpsc::Sender<SignalingMessage>,
177}
178
179impl RouterSignalingBridge {
180    fn new(peer_id: String, signaling_tx: mpsc::Sender<SignalingMessage>) -> Self {
181        Self {
182            peer_id,
183            signaling_tx,
184        }
185    }
186}
187
188#[async_trait]
189impl SharedSignalingTransport for RouterSignalingBridge {
190    async fn connect(&self, _relays: &[String]) -> Result<(), SharedTransportError> {
191        Ok(())
192    }
193
194    async fn disconnect(&self) {}
195
196    async fn publish(&self, msg: SignalingMessage) -> Result<(), SharedTransportError> {
197        self.signaling_tx
198            .send(msg)
199            .await
200            .map_err(|e| SharedTransportError::SendFailed(e.to_string()))
201    }
202
203    async fn recv(&self) -> Option<SignalingMessage> {
204        None
205    }
206
207    fn try_recv(&self) -> Option<SignalingMessage> {
208        None
209    }
210
211    fn peer_id(&self) -> &str {
212        &self.peer_id
213    }
214}
215
216struct SharedRouterPeerFactory {
217    my_peer_id: PeerId,
218    signaling_tx: mpsc::Sender<SignalingMessage>,
219    stun_servers: Vec<String>,
220    store: Option<Arc<dyn ContentStore>>,
221    state: Arc<WebRTCState>,
222    state_event_tx: mpsc::Sender<PeerStateEvent>,
223    nostr_relay: Option<SharedMeshRelayClient>,
224    mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
225    signal_urls: Vec<String>,
226    peer_classifier: PeerClassifier,
227    peers: RwLock<HashMap<String, Arc<Peer>>>,
228}
229
230impl SharedRouterPeerFactory {
231    fn new(
232        my_peer_id: PeerId,
233        signaling_tx: mpsc::Sender<SignalingMessage>,
234        stun_servers: Vec<String>,
235        store: Option<Arc<dyn ContentStore>>,
236        state: Arc<WebRTCState>,
237        state_event_tx: mpsc::Sender<PeerStateEvent>,
238        nostr_relay: Option<SharedMeshRelayClient>,
239        mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
240        signal_urls: Vec<String>,
241        peer_classifier: PeerClassifier,
242    ) -> Self {
243        Self {
244            my_peer_id,
245            signaling_tx,
246            stun_servers,
247            store,
248            state,
249            state_event_tx,
250            nostr_relay,
251            mesh_frame_tx,
252            signal_urls,
253            peer_classifier,
254            peers: RwLock::new(HashMap::new()),
255        }
256    }
257
258    async fn register_peer(&self, peer_id: PeerId, direction: PeerDirection, peer: Arc<Peer>) {
259        let peer_key = peer_id.to_string();
260        let pool = (self.peer_classifier)(&peer_id.pubkey);
261        self.peers
262            .write()
263            .await
264            .insert(peer_key.clone(), peer.clone());
265
266        let mut peers = self.state.runtime.peers.write().await;
267        peers.insert(
268            peer_key,
269            PeerEntry {
270                peer_id,
271                direction,
272                state: ConnectionState::Connecting,
273                last_seen: Instant::now(),
274                peer: Some(MeshPeer::WebRtc(peer)),
275                pool,
276                transport: PeerTransport::WebRtc,
277                signal_paths: BTreeSet::from([PeerSignalPath::Relay]),
278                bytes_sent: 0,
279                bytes_received: 0,
280            },
281        );
282    }
283
284    async fn create_peer(
285        &self,
286        peer_id: PeerId,
287        direction: PeerDirection,
288    ) -> Result<Peer, SharedTransportError> {
289        let mut peer = Peer::new_with_store_and_events(
290            peer_id,
291            direction,
292            self.my_peer_id.clone(),
293            self.signaling_tx.clone(),
294            self.stun_servers.clone(),
295            self.store.clone(),
296            Some(self.state_event_tx.clone()),
297            self.nostr_relay.clone(),
298            Some(self.mesh_frame_tx.clone()),
299            Some(self.state.cashu_quotes.clone()),
300        )
301        .await
302        .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
303        peer.set_signal_urls(self.signal_urls.clone());
304        Ok(peer)
305    }
306}
307
308#[async_trait]
309impl SharedPeerLinkFactory for SharedRouterPeerFactory {
310    async fn create_offer(
311        &self,
312        target_peer_id: &str,
313    ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
314        let target_peer = PeerId::from_string(target_peer_id).ok_or_else(|| {
315            SharedTransportError::ConnectionFailed(format!("invalid peer id {target_peer_id}"))
316        })?;
317        let peer = Arc::new(
318            self.create_peer(target_peer.clone(), PeerDirection::Outbound)
319                .await?,
320        );
321        peer.setup_handlers()
322            .await
323            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
324        let offer = peer
325            .connect()
326            .await
327            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
328        let sdp = offer
329            .get("sdp")
330            .and_then(|value| value.as_str())
331            .ok_or_else(|| {
332                SharedTransportError::ConnectionFailed("missing SDP in CLI peer offer".to_string())
333            })?
334            .to_string();
335        self.register_peer(target_peer, PeerDirection::Outbound, peer.clone())
336            .await;
337        Ok((peer as Arc<dyn SharedPeerLink>, sdp))
338    }
339
340    async fn accept_offer(
341        &self,
342        from_peer_id: &str,
343        offer_sdp: &str,
344    ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
345        let from_peer = PeerId::from_string(from_peer_id).ok_or_else(|| {
346            SharedTransportError::ConnectionFailed(format!("invalid peer id {from_peer_id}"))
347        })?;
348        let peer = Arc::new(
349            self.create_peer(from_peer.clone(), PeerDirection::Inbound)
350                .await?,
351        );
352        peer.setup_handlers()
353            .await
354            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
355        let answer = peer
356            .handle_offer(serde_json::json!({ "type": "offer", "sdp": offer_sdp }))
357            .await
358            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
359        let sdp = answer
360            .get("sdp")
361            .and_then(|value| value.as_str())
362            .ok_or_else(|| {
363                SharedTransportError::ConnectionFailed("missing SDP in CLI peer answer".to_string())
364            })?
365            .to_string();
366        self.register_peer(from_peer, PeerDirection::Inbound, peer.clone())
367            .await;
368        Ok((peer as Arc<dyn SharedPeerLink>, sdp))
369    }
370
371    async fn handle_answer(
372        &self,
373        target_peer_id: &str,
374        answer_sdp: &str,
375    ) -> Result<Arc<dyn SharedPeerLink>, SharedTransportError> {
376        let peer = self
377            .peers
378            .read()
379            .await
380            .get(target_peer_id)
381            .cloned()
382            .ok_or_else(|| {
383                SharedTransportError::ConnectionFailed(format!(
384                    "missing outbound peer for {target_peer_id}"
385                ))
386            })?;
387        peer.handle_answer(serde_json::json!({ "type": "answer", "sdp": answer_sdp }))
388            .await
389            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
390        Ok(peer as Arc<dyn SharedPeerLink>)
391    }
392
393    async fn handle_candidate(
394        &self,
395        peer_id: &str,
396        candidate: SharedIceCandidate,
397    ) -> Result<(), SharedTransportError> {
398        let peer = self.peers.read().await.get(peer_id).cloned();
399        if let Some(peer) = peer {
400            peer.handle_candidate(serde_json::json!({
401                "candidate": candidate.candidate,
402                "sdpMLineIndex": candidate.sdp_m_line_index,
403                "sdpMid": candidate.sdp_mid,
404            }))
405            .await
406            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
407        }
408        Ok(())
409    }
410
411    async fn remove_peer(&self, peer_id: &str) -> Result<(), SharedTransportError> {
412        self.peers.write().await.remove(peer_id);
413        Ok(())
414    }
415}
416
417impl WebRTCState {
418    pub fn new() -> Self {
419        let cfg = WebRTCConfig::default();
420        Self::new_with_routing_and_cashu(
421            cfg.request_selection_strategy,
422            cfg.request_fairness_enabled,
423            cfg.request_dispatch,
424            Duration::from_millis(cfg.message_timeout_ms),
425            CashuRoutingConfig::default(),
426            None,
427            None,
428        )
429    }
430
431    pub fn new_with_routing(
432        selection_strategy: crate::SelectionStrategy,
433        fairness_enabled: bool,
434        request_dispatch: RequestDispatchConfig,
435    ) -> Self {
436        let cfg = WebRTCConfig::default();
437        Self::new_with_routing_and_cashu(
438            selection_strategy,
439            fairness_enabled,
440            request_dispatch,
441            Duration::from_millis(cfg.message_timeout_ms),
442            CashuRoutingConfig::default(),
443            None,
444            None,
445        )
446    }
447
448    pub fn new_with_routing_and_cashu(
449        selection_strategy: crate::SelectionStrategy,
450        fairness_enabled: bool,
451        request_dispatch: RequestDispatchConfig,
452        request_timeout: Duration,
453        cashu_routing: CashuRoutingConfig,
454        payment_client: Option<Arc<dyn CashuPaymentClient>>,
455        mint_metadata: Option<Arc<CashuMintMetadataStore>>,
456    ) -> Self {
457        let mut selector = PeerSelector::with_strategy(selection_strategy);
458        selector.set_fairness(fairness_enabled);
459        let peer_selector = Arc::new(RwLock::new(selector));
460        let cashu_quotes = Arc::new(if let Some(mint_metadata) = mint_metadata {
461            CashuQuoteState::new_with_mint_metadata(
462                cashu_routing,
463                peer_selector.clone(),
464                payment_client,
465                mint_metadata,
466            )
467        } else {
468            CashuQuoteState::new(cashu_routing, peer_selector.clone(), payment_client)
469        });
470        Self {
471            runtime: MeshRuntimeState::new(),
472            peer_selector,
473            direct_signaling_tx: RwLock::new(None),
474            request_dispatch,
475            request_timeout,
476            cashu_quotes,
477        }
478    }
479
480    pub async fn set_local_buses(&self, buses: Vec<SharedLocalNostrBus>) {
481        self.runtime.set_local_buses(buses).await;
482    }
483
484    pub async fn add_local_bus(&self, bus: SharedLocalNostrBus) {
485        self.runtime.add_local_bus(bus).await;
486    }
487
488    pub async fn set_multicast_bus(&self, bus: Option<Arc<MulticastNostrBus>>) {
489        let buses = bus
490            .into_iter()
491            .map(|bus| bus as SharedLocalNostrBus)
492            .collect();
493        self.set_local_buses(buses).await;
494    }
495
496    /// Drop all live peer sessions and clear topology-specific state while
497    /// keeping cumulative bandwidth counters intact.
498    pub async fn reset_runtime_state(&self) {
499        self.runtime.reset().await;
500    }
501
502    pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
503        self.peer_selector
504            .read()
505            .await
506            .export_peer_metadata_snapshot()
507    }
508
509    pub async fn import_peer_metadata_snapshot(&self, snapshot: &PeerMetadataSnapshot) {
510        self.peer_selector
511            .write()
512            .await
513            .import_peer_metadata_snapshot(snapshot);
514    }
515
516    pub async fn selector_summary(&self) -> SelectorSummary {
517        self.peer_selector.read().await.summary()
518    }
519
520    pub async fn known_peer_snapshot(&self) -> KnownPeerSnapshot {
521        self.runtime.known_peer_snapshot().await
522    }
523
524    pub async fn import_known_peer_snapshot(&self, snapshot: &KnownPeerSnapshot) {
525        self.runtime.import_known_peer_snapshot(snapshot).await;
526    }
527
528    pub async fn ordered_known_peers(&self) -> Vec<KnownPeerRecord> {
529        let snapshot = self.runtime.known_peer_snapshot().await;
530        let mut by_peer = snapshot
531            .peers
532            .into_iter()
533            .map(|peer| (peer.peer_id.clone(), peer))
534            .collect::<HashMap<_, _>>();
535
536        let ordered_ids = {
537            let mut selector = self.peer_selector.write().await;
538            for peer_id in by_peer.keys() {
539                selector.add_peer(peer_id);
540            }
541            selector.select_peers()
542        };
543
544        let mut ordered = Vec::new();
545        for peer_id in ordered_ids {
546            if let Some(peer) = by_peer.remove(&peer_id) {
547                ordered.push(peer);
548            }
549        }
550        let mut remaining = by_peer.into_values().collect::<Vec<_>>();
551        remaining.sort_by(|a, b| a.peer_id.cmp(&b.peer_id));
552        ordered.extend(remaining);
553        ordered
554    }
555
556    pub async fn set_direct_signaling_sender(&self, tx: Option<mpsc::Sender<(String, Event)>>) {
557        *self.direct_signaling_tx.write().await = tx;
558    }
559
560    pub async fn submit_direct_signaling_event(&self, source: String, event: Event) -> bool {
561        let tx = self.direct_signaling_tx.read().await.clone();
562        let Some(tx) = tx else {
563            return false;
564        };
565        tx.send((source, event)).await.is_ok()
566    }
567
568    /// Get current bandwidth stats (bytes sent/received)
569    pub fn get_bandwidth(&self) -> (u64, u64) {
570        self.runtime.get_bandwidth()
571    }
572
573    pub fn get_mesh_stats(&self) -> (u64, u64, u64) {
574        self.runtime.get_mesh_stats()
575    }
576
577    pub fn record_mesh_received(&self) {
578        self.runtime.record_mesh_received();
579    }
580
581    pub fn record_mesh_forwarded(&self, count: u64) {
582        self.runtime.record_mesh_forwarded(count);
583    }
584
585    pub fn record_mesh_duplicate_drop(&self) {
586        self.runtime.record_mesh_duplicate_drop();
587    }
588
589    /// Record bytes sent (global + per-peer)
590    pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
591        self.runtime.record_sent(peer_id, bytes).await;
592    }
593
594    /// Record bytes received (global + per-peer)
595    pub async fn record_received(&self, peer_id: &str, bytes: u64) {
596        self.runtime.record_received(peer_id, bytes).await;
597    }
598
599    /// Request content by hash from connected peers
600    /// Queries peers in adaptive selector order with hedged fanout waves.
601    /// Returns the first successful response, or None if no peer has it
602    pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
603        self.request_from_peers_with_source(hash_hex)
604            .await
605            .map(|(data, _peer_id)| data)
606    }
607
608    /// Request content by hash from connected peers, returning data and source peer.
609    pub async fn request_from_peers_with_source(
610        &self,
611        hash_hex: &str,
612    ) -> Option<(Vec<u8>, String)> {
613        use crate::BLOB_REQUEST_POLICY;
614
615        let peer_hash_get = self.runtime.peer_hash_get_snapshot().await;
616        let peers = self.runtime.peers.read().await;
617
618        let peer_refs: Vec<_> = peers
619            .values()
620            .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
621            .filter_map(|p| {
622                if !peer_hash_get
623                    .get(&p.peer_id.to_string())
624                    .copied()
625                    .unwrap_or(true)
626                {
627                    return None;
628                }
629                p.peer
630                    .clone()
631                    .map(|peer| (p.peer_id.to_string(), peer, p.transport))
632            })
633            .collect();
634
635        drop(peers); // Release the read lock
636
637        let mut connected_peers: Vec<ConnectedPeer> = Vec::new();
638        let mut connected_sessions: Vec<ConnectedSession> = Vec::new();
639        for (peer_id, peer, transport) in peer_refs {
640            if !peer.is_ready() {
641                continue;
642            }
643            if bluetooth_nostr_only_mode() && transport == PeerTransport::Bluetooth {
644                continue;
645            }
646            if let Some(webrtc_peer) = peer.as_webrtc() {
647                let dc_guard = webrtc_peer.data_channel.lock().await;
648                if let Some(dc) = dc_guard.as_ref() {
649                    connected_peers.push((
650                        peer_id.clone(),
651                        webrtc_peer.pending_requests.clone(),
652                        dc.clone(),
653                    ));
654                }
655            }
656            connected_sessions.push((peer_id, peer, transport));
657        }
658
659        if connected_sessions.is_empty() {
660            debug!(
661                "No connected peers to query for {}",
662                &hash_hex[..8.min(hash_hex.len())]
663            );
664            return None;
665        }
666
667        // Convert hex to binary hash once
668        let hash_bytes = match hex::decode(hash_hex) {
669            Ok(b) => b,
670            Err(_) => return None,
671        };
672
673        let expected_hash: [u8; 32] = match hash_bytes.as_slice().try_into() {
674            Ok(h) => h,
675            Err(_) => {
676                debug!(
677                    "Invalid hash length {}, expected 32 bytes",
678                    hash_bytes.len()
679                );
680                return None;
681            }
682        };
683
684        let connected_peer_ids: Vec<String> = connected_sessions
685            .iter()
686            .map(|(peer_id, _, _)| peer_id.clone())
687            .collect();
688        sync_selector_peers(self.peer_selector.as_ref(), &connected_peer_ids).await;
689
690        let ordered_peer_ids = self.peer_selector.write().await.select_peers();
691        let mut quote_by_peer: HashMap<
692            String,
693            (
694                PendingRequestsMap,
695                Arc<webrtc::data_channel::RTCDataChannel>,
696            ),
697        > = connected_peers
698            .iter()
699            .cloned()
700            .map(|(peer_id, pending, dc)| (peer_id, (pending, dc)))
701            .collect();
702        let mut ordered_quote_peers: Vec<ConnectedPeer> = Vec::new();
703        for peer_id in &ordered_peer_ids {
704            if let Some((pending, dc)) = quote_by_peer.remove(peer_id) {
705                ordered_quote_peers.push((peer_id.clone(), pending, dc));
706            }
707        }
708        for (peer_id, (pending, dc)) in quote_by_peer {
709            ordered_quote_peers.push((peer_id, pending, dc));
710        }
711
712        let mut by_peer: HashMap<String, (MeshPeer, PeerTransport)> = connected_sessions
713            .into_iter()
714            .map(|(peer_id, peer, transport)| (peer_id, (peer, transport)))
715            .collect();
716
717        let mut ordered_peers: Vec<ConnectedSession> = Vec::new();
718        for peer_id in ordered_peer_ids {
719            if let Some((peer, transport)) = by_peer.remove(&peer_id) {
720                ordered_peers.push((peer_id, peer, transport));
721            }
722        }
723        for (peer_id, (peer, transport)) in by_peer {
724            ordered_peers.push((peer_id, peer, transport));
725        }
726
727        debug!(
728            "Querying {} peers for {} with shared hedged scheduler",
729            ordered_peers.len(),
730            &hash_hex[..8.min(hash_hex.len())],
731        );
732
733        if let Some((requested_mint, payment_sat, quote_ttl_ms)) =
734            self.cashu_quotes.requester_quote_terms().await
735        {
736            if let Some(quote) = self
737                .request_quote_from_peers(
738                    &hash_bytes,
739                    requested_mint,
740                    payment_sat,
741                    quote_ttl_ms,
742                    &ordered_quote_peers,
743                )
744                .await
745            {
746                if let Some(data) = self
747                    .request_from_single_peer(
748                        hash_hex,
749                        &hash_bytes,
750                        expected_hash,
751                        &quote.peer_id,
752                        Some(&quote),
753                        &ordered_quote_peers,
754                    )
755                    .await
756                {
757                    debug!(
758                        "Got quoted response from peer {} for {}",
759                        quote.peer_id,
760                        &hash_hex[..8.min(hash_hex.len())]
761                    );
762                    return Some((data, quote.peer_id));
763                }
764            }
765        }
766
767        let request = DataRequest {
768            h: hash_bytes.clone(),
769            htl: BLOB_REQUEST_POLICY.max_htl,
770            q: None,
771        };
772        let wire = crate::encode_request(&request);
773        let wire_len = wire.len() as u64;
774        let current_result_rx = Arc::new(Mutex::new(None));
775        if let Some((data, peer_id)) = run_hedged_waves(
776            ordered_peers.len(),
777            self.request_dispatch,
778            self.request_timeout,
779            |range| {
780                let wave_peers = ordered_peers[range].to_vec();
781                let (result_tx, result_rx) =
782                    mpsc::channel::<(String, Instant, Result<Option<Vec<u8>>>)>(wave_peers.len());
783                let current_result_rx = current_result_rx.clone();
784                let hash_hex = hash_hex.to_string();
785                async move {
786                    *current_result_rx.lock().await = Some(result_rx);
787                    let sent = wave_peers.len();
788                    for (peer_id, peer, transport) in wave_peers {
789                        if transport != PeerTransport::Bluetooth {
790                            self.record_sent(&peer_id, wire_len).await;
791                        }
792                        self.peer_selector
793                            .write()
794                            .await
795                            .record_request(&peer_id, wire_len);
796
797                        let result_tx = result_tx.clone();
798                        let peer_id_for_task = peer_id.clone();
799                        let peer = peer.clone();
800                        let hash_hex = hash_hex.clone();
801                        let per_request_timeout = self.request_timeout;
802                        tokio::spawn(async move {
803                            let started = Instant::now();
804                            let result = peer.request(&hash_hex, per_request_timeout).await;
805                            let _ = result_tx.send((peer_id_for_task, started, result)).await;
806                        });
807                    }
808                    drop(result_tx);
809                    sent
810                }
811            },
812            |wait| {
813                let current_result_rx = current_result_rx.clone();
814                async move {
815                    let mut current_result_rx = current_result_rx.lock().await;
816                    let Some(result_rx) = current_result_rx.as_mut() else {
817                        return HedgedWaveAction::Abort;
818                    };
819                    let deadline = Instant::now() + wait;
820                    loop {
821                        let now = Instant::now();
822                        if now >= deadline {
823                            return HedgedWaveAction::Continue;
824                        }
825                        let remaining = deadline.saturating_duration_since(now);
826                        match tokio::time::timeout(remaining, result_rx.recv()).await {
827                            Ok(Some((peer_id, started, Ok(Some(data))))) => {
828                                let rtt_ms = started.elapsed().as_millis() as u64;
829                                if hashtree_core::sha256(&data) == expected_hash {
830                                    let should_record = {
831                                        let peers = self.runtime.peers.read().await;
832                                        peers
833                                            .get(&peer_id)
834                                            .map(|entry| {
835                                                entry.transport != PeerTransport::Bluetooth
836                                            })
837                                            .unwrap_or(true)
838                                    };
839                                    if should_record {
840                                        self.record_received(&peer_id, data.len() as u64).await;
841                                    }
842                                    self.peer_selector.write().await.record_success(
843                                        &peer_id,
844                                        rtt_ms,
845                                        data.len() as u64,
846                                    );
847                                    return HedgedWaveAction::Success((data, peer_id));
848                                }
849                                self.peer_selector.write().await.record_failure(&peer_id);
850                            }
851                            Ok(Some((peer_id, _, Ok(None)))) | Ok(Some((peer_id, _, Err(_)))) => {
852                                self.peer_selector.write().await.record_timeout(&peer_id);
853                            }
854                            Ok(None) | Err(_) => return HedgedWaveAction::Continue,
855                        }
856                    }
857                }
858            },
859        )
860        .await
861        {
862            debug!(
863                "Got response from peer {} for {}",
864                peer_id,
865                &hash_hex[..8.min(hash_hex.len())]
866            );
867            return Some((data, peer_id));
868        }
869
870        debug!(
871            "No peer had data for {}",
872            &hash_hex[..8.min(hash_hex.len())]
873        );
874        None
875    }
876
877    async fn request_quote_from_peers(
878        &self,
879        hash_bytes: &[u8],
880        requested_mint: String,
881        payment_sat: u64,
882        quote_ttl_ms: u32,
883        ordered_peers: &[ConnectedPeer],
884    ) -> Option<NegotiatedQuote> {
885        if ordered_peers.is_empty() || quote_ttl_ms == 0 {
886            return None;
887        }
888
889        let hash_hex = hex::encode(hash_bytes);
890        let rx = self
891            .cashu_quotes
892            .register_pending_quote(hash_hex.clone(), Some(requested_mint.clone()), payment_sat)
893            .await;
894        let quote_request = DataQuoteRequest {
895            h: hash_bytes.to_vec(),
896            p: payment_sat,
897            t: quote_ttl_ms,
898            m: Some(requested_mint),
899        };
900        let wire = crate::encode_quote_request(&quote_request);
901        let rx = Arc::new(Mutex::new(rx));
902        let result = run_hedged_waves(
903            ordered_peers.len(),
904            self.request_dispatch,
905            self.request_timeout,
906            |range| {
907                let wave_peers = ordered_peers[range].to_vec();
908                let wire = wire.clone();
909                async move {
910                    let mut sent = 0usize;
911                    for (_, _, dc) in wave_peers {
912                        if dc.send(&bytes::Bytes::copy_from_slice(&wire)).await.is_ok() {
913                            sent += 1;
914                        }
915                    }
916                    sent
917                }
918            },
919            |wait| {
920                let rx = rx.clone();
921                async move {
922                    let mut rx = rx.lock().await;
923                    match tokio::time::timeout(wait, &mut *rx).await {
924                        Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
925                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
926                        Err(_) => HedgedWaveAction::Continue,
927                    }
928                }
929            },
930        )
931        .await;
932
933        self.cashu_quotes.clear_pending_quote(&hash_hex).await;
934        result
935    }
936
937    async fn request_from_single_peer(
938        &self,
939        hash_hex: &str,
940        hash_bytes: &[u8],
941        expected_hash: [u8; 32],
942        target_peer_id: &str,
943        quote: Option<&NegotiatedQuote>,
944        ordered_peers: &[ConnectedPeer],
945    ) -> Option<Vec<u8>> {
946        use crate::BLOB_REQUEST_POLICY;
947
948        let (pending_requests, dc) = ordered_peers
949            .iter()
950            .find(|(peer_id, _, _)| peer_id == target_peer_id)
951            .map(|(_, pending_requests, dc)| (pending_requests.clone(), dc.clone()))?;
952
953        let request = DataRequest {
954            h: hash_bytes.to_vec(),
955            htl: BLOB_REQUEST_POLICY.max_htl,
956            q: quote.map(|quote| quote.quote_id),
957        };
958        let wire = crate::encode_request(&request);
959        let wire_len = wire.len() as u64;
960        let sent_at = Instant::now();
961        let (tx, mut rx) = tokio::sync::oneshot::channel();
962
963        {
964            let mut pending = pending_requests.lock().await;
965            pending.insert(
966                hash_hex.to_string(),
967                if let Some(quote) = quote {
968                    PendingRequest::quoted(
969                        hash_bytes.to_vec(),
970                        tx,
971                        quote.quote_id,
972                        quote.mint_url.clone().unwrap_or_default(),
973                        quote.payment_sat,
974                    )
975                } else {
976                    PendingRequest::standard(hash_bytes.to_vec(), tx)
977                },
978            );
979        }
980
981        if dc
982            .send(&bytes::Bytes::copy_from_slice(&wire))
983            .await
984            .is_err()
985        {
986            let mut pending = pending_requests.lock().await;
987            pending.remove(hash_hex);
988            self.peer_selector
989                .write()
990                .await
991                .record_failure(target_peer_id);
992            return None;
993        }
994
995        self.record_sent(target_peer_id, wire_len).await;
996        self.peer_selector
997            .write()
998            .await
999            .record_request(target_peer_id, wire_len);
1000
1001        let wait_timeout = if let Some(quote) = quote {
1002            let multiplier = quote.payment_sat.clamp(1, 32) as u128;
1003            let extra_ms = self
1004                .cashu_quotes
1005                .settlement_timeout()
1006                .as_millis()
1007                .saturating_mul(multiplier);
1008            self.request_timeout + Duration::from_millis(extra_ms.min(u64::MAX as u128) as u64)
1009        } else {
1010            self.request_timeout
1011        };
1012
1013        match tokio::time::timeout(wait_timeout, &mut rx).await {
1014            Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == expected_hash => {
1015                let rtt_ms = sent_at.elapsed().as_millis() as u64;
1016                self.record_received(target_peer_id, data.len() as u64)
1017                    .await;
1018                self.peer_selector.write().await.record_success(
1019                    target_peer_id,
1020                    rtt_ms,
1021                    data.len() as u64,
1022                );
1023                Some(data)
1024            }
1025            Ok(Ok(Some(_))) => {
1026                self.peer_selector
1027                    .write()
1028                    .await
1029                    .record_failure(target_peer_id);
1030                let pending = pending_requests.lock().await.remove(hash_hex);
1031                if let Some(pending) = pending {
1032                    if let Some(quoted) = pending.quoted {
1033                        if let Some(in_flight) = quoted.in_flight_payment {
1034                            let _ = self
1035                                .cashu_quotes
1036                                .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
1037                                .await;
1038                        }
1039                    }
1040                }
1041                None
1042            }
1043            Ok(Ok(None)) | Ok(Err(_)) | Err(_) => {
1044                let pending = pending_requests.lock().await.remove(hash_hex);
1045                if let Some(pending) = pending {
1046                    if let Some(quoted) = pending.quoted {
1047                        if let Some(in_flight) = quoted.in_flight_payment {
1048                            let _ = self
1049                                .cashu_quotes
1050                                .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
1051                                .await;
1052                        }
1053                    }
1054                }
1055                self.peer_selector
1056                    .write()
1057                    .await
1058                    .record_timeout(target_peer_id);
1059                None
1060            }
1061        }
1062    }
1063
1064    /// Resolve a hashtree root event through connected peers using Nostr REQ/EOSE over WebRTC.
1065    pub async fn resolve_root_from_peers(
1066        &self,
1067        owner_pubkey: &str,
1068        tree_name: &str,
1069        per_peer_timeout: Duration,
1070    ) -> Option<PeerRootEvent> {
1071        let peer_refs: Vec<(String, Arc<dyn MeshSession>)> = {
1072            let peers = self.runtime.peers.read().await;
1073            peers
1074                .values()
1075                .filter(|entry| entry.state == ConnectionState::Connected)
1076                .filter(|entry| {
1077                    !bluetooth_nostr_only_mode() || entry.transport != PeerTransport::Bluetooth
1078                })
1079                .filter_map(|entry| {
1080                    let peer = entry.peer.as_ref()?;
1081                    Some((
1082                        entry.peer_id.short(),
1083                        Arc::new(peer.clone()) as Arc<dyn MeshSession>,
1084                    ))
1085                })
1086                .collect()
1087        };
1088
1089        let resolved =
1090            resolve_root_via_peer_sessions(peer_refs, owner_pubkey, tree_name, per_peer_timeout)
1091                .await;
1092        if let Some(root) = &resolved {
1093            debug!(
1094                "Resolved {}/{} via peer {} event {}",
1095                owner_pubkey, tree_name, root.peer_id, root.event_id
1096            );
1097        }
1098        resolved
1099    }
1100
1101    pub async fn resolve_root_from_local_buses_with_source(
1102        &self,
1103        owner_pubkey: &str,
1104        tree_name: &str,
1105        timeout: Duration,
1106    ) -> Option<(&'static str, PeerRootEvent)> {
1107        self.runtime
1108            .resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
1109            .await
1110    }
1111
1112    pub async fn resolve_root_from_local_buses(
1113        &self,
1114        owner_pubkey: &str,
1115        tree_name: &str,
1116        timeout: Duration,
1117    ) -> Option<PeerRootEvent> {
1118        self.resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
1119            .await
1120            .map(|(_, root)| root)
1121    }
1122
1123    pub async fn resolve_root_from_multicast(
1124        &self,
1125        owner_pubkey: &str,
1126        tree_name: &str,
1127        timeout: Duration,
1128    ) -> Option<PeerRootEvent> {
1129        self.resolve_root_from_local_buses(owner_pubkey, tree_name, timeout)
1130            .await
1131    }
1132}
1133
1134impl Default for WebRTCState {
1135    fn default() -> Self {
1136        Self::new()
1137    }
1138}
1139
1140/// Native mesh manager handles peer discovery and transport fan-out.
1141pub struct WebRTCManager {
1142    config: WebRTCConfig,
1143    my_peer_id: PeerId,
1144    keys: Keys,
1145    state: Arc<WebRTCState>,
1146    shutdown: Arc<tokio::sync::watch::Sender<bool>>,
1147    shutdown_rx: tokio::sync::watch::Receiver<bool>,
1148    /// Channel to send signaling messages to relays
1149    signaling_tx: mpsc::Sender<SignalingMessage>,
1150    signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
1151    /// Optional content store for serving hash requests
1152    store: Option<Arc<dyn ContentStore>>,
1153    /// Peer classifier for pool assignment
1154    peer_classifier: PeerClassifier,
1155    /// Optional Nostr relay for data-channel relay messages
1156    nostr_relay: Option<SharedMeshRelayClient>,
1157    local_buses: Vec<SharedLocalNostrBus>,
1158    /// Channel for peer state events (connection success/failure)
1159    state_event_tx: mpsc::Sender<PeerStateEvent>,
1160    state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
1161    /// Channel for relayless mesh signaling frames received from peers.
1162    mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
1163    mesh_frame_rx: Option<mpsc::Receiver<(PeerId, MeshNostrFrame)>>,
1164    shared_router: Option<Arc<SharedProductionRouter>>,
1165    seen_frame_ids: Arc<Mutex<TimedSeenSet>>,
1166    seen_event_ids: Arc<Mutex<TimedSeenSet>>,
1167}
1168
1169impl WebRTCManager {
1170    /// Create a new WebRTC manager
1171    pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
1172        let pubkey = keys.public_key().to_hex();
1173        let my_peer_id = PeerId::new(pubkey);
1174        let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
1175        let (signaling_tx, signaling_rx) = mpsc::channel(100);
1176        let (state_event_tx, state_event_rx) = mpsc::channel(100);
1177        let (mesh_frame_tx, mesh_frame_rx) = mpsc::channel(256);
1178        let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1179            config.request_selection_strategy,
1180            config.request_fairness_enabled,
1181            config.request_dispatch,
1182            Duration::from_millis(config.message_timeout_ms),
1183            CashuRoutingConfig::default(),
1184            None,
1185            None,
1186        ));
1187
1188        // Default classifier: all peers go to 'other' pool
1189        let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
1190
1191        Self {
1192            config,
1193            my_peer_id,
1194            keys,
1195            state,
1196            shutdown: Arc::new(shutdown),
1197            shutdown_rx,
1198            signaling_tx,
1199            signaling_rx: Some(signaling_rx),
1200            store: None,
1201            peer_classifier,
1202            nostr_relay: None,
1203            local_buses: Vec::new(),
1204            state_event_tx,
1205            state_event_rx: Some(state_event_rx),
1206            mesh_frame_tx,
1207            mesh_frame_rx: Some(mesh_frame_rx),
1208            shared_router: None,
1209            seen_frame_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1210                SEEN_FRAME_CAP,
1211                SEEN_FRAME_TTL,
1212            ))),
1213            seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1214                SEEN_EVENT_CAP,
1215                SEEN_EVENT_TTL,
1216            ))),
1217        }
1218    }
1219
1220    /// Create a new WebRTC manager reusing an existing shared state object.
1221    pub fn new_with_state(keys: Keys, config: WebRTCConfig, state: Arc<WebRTCState>) -> Self {
1222        let mut manager = Self::new(keys, config);
1223        manager.state = state;
1224        manager
1225    }
1226
1227    /// Create a new WebRTC manager with a peer classifier
1228    pub fn new_with_classifier(
1229        keys: Keys,
1230        config: WebRTCConfig,
1231        classifier: PeerClassifier,
1232    ) -> Self {
1233        let mut manager = Self::new(keys, config);
1234        manager.peer_classifier = classifier;
1235        manager
1236    }
1237
1238    /// Create a new WebRTC manager with a content store for serving hash requests
1239    pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
1240        let mut manager = Self::new(keys, config);
1241        manager.store = Some(store);
1242        manager
1243    }
1244
1245    /// Create a new WebRTC manager with store and classifier
1246    pub fn new_with_store_and_classifier(
1247        keys: Keys,
1248        config: WebRTCConfig,
1249        store: Arc<dyn ContentStore>,
1250        classifier: PeerClassifier,
1251    ) -> Self {
1252        Self::new_with_store_and_classifier_and_cashu(
1253            keys,
1254            config,
1255            store,
1256            classifier,
1257            CashuRoutingConfig::default(),
1258            None,
1259            None,
1260        )
1261    }
1262
1263    pub fn new_with_state_and_store_and_classifier(
1264        keys: Keys,
1265        config: WebRTCConfig,
1266        state: Arc<WebRTCState>,
1267        store: Arc<dyn ContentStore>,
1268        classifier: PeerClassifier,
1269    ) -> Self {
1270        let mut manager = Self::new_with_state(keys, config, state);
1271        manager.store = Some(store);
1272        manager.peer_classifier = classifier;
1273        manager
1274    }
1275
1276    pub fn new_with_store_and_classifier_and_cashu(
1277        keys: Keys,
1278        config: WebRTCConfig,
1279        store: Arc<dyn ContentStore>,
1280        classifier: PeerClassifier,
1281        cashu_routing: CashuRoutingConfig,
1282        payment_client: Option<Arc<dyn CashuPaymentClient>>,
1283        mint_metadata: Option<Arc<CashuMintMetadataStore>>,
1284    ) -> Self {
1285        let mut manager = Self::new(keys, config);
1286        manager.state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1287            manager.config.request_selection_strategy,
1288            manager.config.request_fairness_enabled,
1289            manager.config.request_dispatch,
1290            Duration::from_millis(manager.config.message_timeout_ms),
1291            cashu_routing,
1292            payment_client,
1293            mint_metadata,
1294        ));
1295        manager.store = Some(store);
1296        manager.peer_classifier = classifier;
1297        manager
1298    }
1299
1300    /// Set the content store for serving hash requests
1301    pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
1302        self.store = Some(store);
1303    }
1304
1305    /// Set the peer classifier
1306    pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
1307        self.peer_classifier = classifier;
1308    }
1309
1310    /// Set the Nostr relay for data-channel relay messages
1311    pub fn set_nostr_relay(&mut self, relay: SharedMeshRelayClient) {
1312        self.nostr_relay = Some(relay);
1313    }
1314
1315    /// Get my peer ID
1316    pub fn my_peer_id(&self) -> &PeerId {
1317        &self.my_peer_id
1318    }
1319
1320    /// Get shared state for external access
1321    pub fn state(&self) -> Arc<WebRTCState> {
1322        self.state.clone()
1323    }
1324
1325    /// Cloneable shutdown handle for external lifecycle control.
1326    pub fn shutdown_signal(&self) -> Arc<tokio::sync::watch::Sender<bool>> {
1327        self.shutdown.clone()
1328    }
1329
1330    /// Signal shutdown
1331    pub fn shutdown(&self) {
1332        let _ = self.shutdown.send(true);
1333    }
1334
1335    /// Get connected peer count
1336    pub async fn connected_count(&self) -> usize {
1337        self.state
1338            .runtime
1339            .connected_count
1340            .load(std::sync::atomic::Ordering::Relaxed)
1341    }
1342
1343    /// Get all peer statuses
1344    pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
1345        self.state
1346            .runtime
1347            .peers
1348            .read()
1349            .await
1350            .values()
1351            .map(|p| PeerStatus {
1352                peer_id: p.peer_id.to_string(),
1353                pubkey: p.peer_id.pubkey.clone(),
1354                state: p.state.to_string(),
1355                direction: p.direction,
1356                connected_at: Some(p.last_seen),
1357                pool: p.pool,
1358            })
1359            .collect()
1360    }
1361
1362    /// Get pool counts
1363    /// Returns (follows_connected, follows_active, other_connected, other_active)
1364    /// "active" = Connected or Connecting (excludes Discovered and Failed)
1365    pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
1366        let peers = self.state.runtime.peers.read().await;
1367        let mut follows_connected = 0;
1368        let mut follows_active = 0;
1369        let mut other_connected = 0;
1370        let mut other_active = 0;
1371
1372        for entry in peers.values() {
1373            // Only count Connected or Connecting as "active" connections
1374            // Discovered peers are just seen hellos, not real connections
1375            let is_active = entry.state == ConnectionState::Connected
1376                || entry.state == ConnectionState::Connecting;
1377
1378            match entry.pool {
1379                PeerPool::Follows => {
1380                    if is_active {
1381                        follows_active += 1;
1382                    }
1383                    if entry.state == ConnectionState::Connected {
1384                        follows_connected += 1;
1385                    }
1386                }
1387                PeerPool::Other => {
1388                    if is_active {
1389                        other_active += 1;
1390                    }
1391                    if entry.state == ConnectionState::Connected {
1392                        other_connected += 1;
1393                    }
1394                }
1395            }
1396        }
1397
1398        (
1399            follows_connected,
1400            follows_active,
1401            other_connected,
1402            other_active,
1403        )
1404    }
1405
1406    fn local_bus_max_peers(&self, source: &str) -> Option<usize> {
1407        match source {
1408            "multicast" => Some(self.config.multicast.max_peers),
1409            WIFI_AWARE_SOURCE => Some(self.config.wifi_aware.max_peers),
1410            _ => None,
1411        }
1412    }
1413
1414    #[cfg_attr(not(test), allow(dead_code))]
1415    fn can_track_local_bus_peer(
1416        &self,
1417        source: &str,
1418        peer_key: &str,
1419        peers: &HashMap<String, PeerEntry>,
1420    ) -> bool {
1421        can_track_source_peer(source, peer_key, peers, self.local_bus_max_peers(source))
1422    }
1423}
1424
1425mod runtime;