Skip to main content

hashtree_network/manager/
mod.rs

1//! WebRTC signaling over Nostr relays
2//!
3//! Protocol (compatible with hashtree-ts):
4//! - All signaling uses ephemeral kind 25050
5//! - Hello messages: #l: "hello" tag, broadcast for peer discovery (unencrypted)
6//! - Directed signaling (offer, answer, candidate, candidates): NIP-17 style
7//!   gift wrap for privacy - wrapped with ephemeral key, #p tag with recipient
8//!
9//! Security: Directed messages use gift wrapping with ephemeral keys so that
10//! relays cannot see the actual sender or correlate messages.
11
12pub use crate::{ConnectionState, PeerSignalPath, PeerTransport};
13use anyhow::Result;
14use async_trait::async_trait;
15use cashu_service::CashuPaymentClient;
16#[cfg(test)]
17use nostr_sdk::nostr::Kind;
18use nostr_sdk::nostr::{Event, Keys};
19use std::collections::{BTreeSet, HashMap};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::{mpsc, Mutex, RwLock};
23use tracing::{debug, error, info, warn};
24
25use crate::bluetooth::{
26    BluetoothConfig, BluetoothMesh, BluetoothPeerRegistrar, BluetoothRuntimeContext,
27};
28use crate::cashu::{CashuMintMetadataStore, CashuQuoteState, CashuRoutingConfig, NegotiatedQuote};
29use crate::local_bus::SharedLocalNostrBus;
30use crate::mesh_session::{
31    resolve_root_from_peer_sessions as resolve_root_via_peer_sessions, MeshSession,
32};
33use crate::mesh_store_core::{
34    run_hedged_waves, sync_selector_peers, HedgedWaveAction, RequestDispatchConfig,
35};
36use crate::multicast::{MulticastConfig, MulticastNostrBus};
37use crate::nostr::NostrRelayTransport;
38use crate::peer::{ContentStore, Peer, PendingRequest};
39use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectorSummary};
40use crate::protocol::{DataQuoteRequest, DataRequest};
41use crate::relay_bridge::SharedMeshRelayClient;
42use crate::root_events::PeerRootEvent;
43use crate::runtime_control::{can_track_source_peer, PeerStateEvent};
44use crate::runtime_peer::{
45    MeshPeerEntry as SharedPeerEntry, PeerClassifier as SharedPeerClassifier,
46};
47use crate::runtime_state::MeshRuntimeState;
48use crate::session::MeshPeer;
49use crate::signaling::MeshRouter;
50use crate::transport::{
51    PeerLink as SharedPeerLink, PeerLinkFactory as SharedPeerLinkFactory,
52    SignalingTransport as SharedSignalingTransport, TransportError as SharedTransportError,
53};
54use crate::types::{
55    validate_mesh_frame, KnownPeerRecord, KnownPeerSnapshot, MeshNostrFrame, MeshNostrPayload,
56    SignalingMessage, TimedSeenSet,
57};
58use crate::wifi_aware::{
59    mobile_wifi_aware_bridge, WifiAwareConfig, WifiAwareNostrBus, WIFI_AWARE_SOURCE,
60};
61use crate::{
62    ClassifyRequest as SharedClassifyRequest, IceCandidate as SharedIceCandidate, PeerDirection,
63    PeerId, PeerPool, MESH_SIGNALING_EVENT_KIND,
64};
65
66/// Callback type for classifying peers into pools
67pub type PeerClassifier = SharedPeerClassifier;
68pub type PeerEntry = SharedPeerEntry<MeshPeer>;
69
70#[derive(Clone)]
71pub struct WebRTCConfig {
72    pub relays: Vec<String>,
73    pub signaling_enabled: bool,
74    pub hash_get_enabled: bool,
75    pub signal_urls: Vec<String>,
76    pub max_outbound: usize,
77    pub max_inbound: usize,
78    pub hello_interval_ms: u64,
79    pub message_timeout_ms: u64,
80    pub stun_servers: Vec<String>,
81    pub debug: bool,
82    pub multicast: MulticastConfig,
83    pub wifi_aware: WifiAwareConfig,
84    pub bluetooth: BluetoothConfig,
85    pub pools: crate::PoolSettings,
86    pub request_selection_strategy: crate::SelectionStrategy,
87    pub request_fairness_enabled: bool,
88    pub request_dispatch: RequestDispatchConfig,
89}
90
91impl Default for WebRTCConfig {
92    fn default() -> Self {
93        Self {
94            relays: vec![
95                "wss://relay.damus.io".to_string(),
96                "wss://relay.primal.net".to_string(),
97                "wss://temp.iris.to".to_string(),
98                "wss://relay.snort.social".to_string(),
99            ],
100            signaling_enabled: true,
101            hash_get_enabled: true,
102            signal_urls: Vec::new(),
103            max_outbound: 6,
104            max_inbound: 6,
105            hello_interval_ms: 3000,
106            message_timeout_ms: 15000,
107            stun_servers: vec![
108                "stun:stun.iris.to:3478".to_string(),
109                "stun:stun.l.google.com:19302".to_string(),
110                "stun:stun.cloudflare.com:3478".to_string(),
111            ],
112            debug: false,
113            multicast: MulticastConfig::default(),
114            wifi_aware: WifiAwareConfig::default(),
115            bluetooth: BluetoothConfig::default(),
116            pools: crate::PoolSettings::default(),
117            request_selection_strategy: crate::SelectionStrategy::Weighted,
118            request_fairness_enabled: true,
119            request_dispatch: RequestDispatchConfig {
120                initial_fanout: 2,
121                hedge_fanout: 1,
122                max_fanout: 8,
123                hedge_interval_ms: 120,
124            },
125        }
126    }
127}
128
129#[derive(Debug, Clone)]
130pub struct PeerStatus {
131    pub peer_id: String,
132    pub pubkey: String,
133    pub state: String,
134    pub direction: PeerDirection,
135    pub connected_at: Option<std::time::Instant>,
136    pub pool: PeerPool,
137}
138
139fn bluetooth_nostr_only_mode() -> bool {
140    matches!(
141        std::env::var("HTREE_BLUETOOTH_NOSTR_ONLY").ok().as_deref(),
142        Some("1" | "true" | "TRUE" | "yes" | "YES")
143    )
144}
145
146/// Shared state for the native mesh router.
147pub struct WebRTCState {
148    pub runtime: MeshRuntimeState<MeshPeer>,
149    /// Shared peer selector used by live retrieval; aligned with simulation strategies.
150    peer_selector: Arc<RwLock<PeerSelector>>,
151    direct_signaling_tx: RwLock<Option<mpsc::Sender<(String, Event)>>>,
152    /// Hedged dispatch policy for retrieval requests.
153    request_dispatch: RequestDispatchConfig,
154    /// Retrieval timeout for quote negotiation and single-peer fetches.
155    request_timeout: Duration,
156    /// Shared Cashu quote negotiation policy/state.
157    cashu_quotes: Arc<CashuQuoteState>,
158}
159const SEEN_FRAME_CAP: usize = 4096;
160const SEEN_FRAME_TTL: Duration = Duration::from_secs(120);
161const SEEN_EVENT_CAP: usize = 8192;
162const SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
163
164type PendingRequestsMap = Arc<Mutex<HashMap<String, PendingRequest>>>;
165type ConnectedPeer = (
166    String,
167    PendingRequestsMap,
168    Arc<webrtc::data_channel::RTCDataChannel>,
169);
170type ConnectedSession = (String, MeshPeer, PeerTransport);
171type SharedProductionRouter = MeshRouter<RouterSignalingBridge, SharedRouterPeerFactory>;
172
173#[derive(Clone)]
174struct RouterSignalingBridge {
175    peer_id: String,
176    signaling_tx: mpsc::Sender<SignalingMessage>,
177}
178
179impl RouterSignalingBridge {
180    fn new(peer_id: String, signaling_tx: mpsc::Sender<SignalingMessage>) -> Self {
181        Self {
182            peer_id,
183            signaling_tx,
184        }
185    }
186}
187
188#[async_trait]
189impl SharedSignalingTransport for RouterSignalingBridge {
190    async fn connect(&self, _relays: &[String]) -> Result<(), SharedTransportError> {
191        Ok(())
192    }
193
194    async fn disconnect(&self) {}
195
196    async fn publish(&self, msg: SignalingMessage) -> Result<(), SharedTransportError> {
197        self.signaling_tx
198            .send(msg)
199            .await
200            .map_err(|e| SharedTransportError::SendFailed(e.to_string()))
201    }
202
203    async fn recv(&self) -> Option<SignalingMessage> {
204        None
205    }
206
207    fn try_recv(&self) -> Option<SignalingMessage> {
208        None
209    }
210
211    fn peer_id(&self) -> &str {
212        &self.peer_id
213    }
214}
215
216struct SharedRouterPeerFactory {
217    my_peer_id: PeerId,
218    signaling_tx: mpsc::Sender<SignalingMessage>,
219    stun_servers: Vec<String>,
220    store: Option<Arc<dyn ContentStore>>,
221    state: Arc<WebRTCState>,
222    state_event_tx: mpsc::Sender<PeerStateEvent>,
223    nostr_relay: Option<SharedMeshRelayClient>,
224    mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
225    signal_urls: Vec<String>,
226    peer_classifier: PeerClassifier,
227    peers: RwLock<HashMap<String, Arc<Peer>>>,
228}
229
230impl SharedRouterPeerFactory {
231    #[allow(clippy::too_many_arguments)]
232    fn new(
233        my_peer_id: PeerId,
234        signaling_tx: mpsc::Sender<SignalingMessage>,
235        stun_servers: Vec<String>,
236        store: Option<Arc<dyn ContentStore>>,
237        state: Arc<WebRTCState>,
238        state_event_tx: mpsc::Sender<PeerStateEvent>,
239        nostr_relay: Option<SharedMeshRelayClient>,
240        mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
241        signal_urls: Vec<String>,
242        peer_classifier: PeerClassifier,
243    ) -> Self {
244        Self {
245            my_peer_id,
246            signaling_tx,
247            stun_servers,
248            store,
249            state,
250            state_event_tx,
251            nostr_relay,
252            mesh_frame_tx,
253            signal_urls,
254            peer_classifier,
255            peers: RwLock::new(HashMap::new()),
256        }
257    }
258
259    async fn register_peer(&self, peer_id: PeerId, direction: PeerDirection, peer: Arc<Peer>) {
260        let peer_key = peer_id.to_string();
261        let pool = (self.peer_classifier)(&peer_id.pubkey);
262        self.peers
263            .write()
264            .await
265            .insert(peer_key.clone(), peer.clone());
266
267        let mut peers = self.state.runtime.peers.write().await;
268        peers.insert(
269            peer_key,
270            PeerEntry {
271                peer_id,
272                direction,
273                state: ConnectionState::Connecting,
274                last_seen: Instant::now(),
275                peer: Some(MeshPeer::WebRtc(peer)),
276                pool,
277                transport: PeerTransport::WebRtc,
278                signal_paths: BTreeSet::from([PeerSignalPath::Relay]),
279                bytes_sent: 0,
280                bytes_received: 0,
281            },
282        );
283    }
284
285    async fn create_peer(
286        &self,
287        peer_id: PeerId,
288        direction: PeerDirection,
289    ) -> Result<Peer, SharedTransportError> {
290        let mut peer = Peer::new_with_store_and_events(
291            peer_id,
292            direction,
293            self.my_peer_id.clone(),
294            self.signaling_tx.clone(),
295            self.stun_servers.clone(),
296            self.store.clone(),
297            Some(self.state_event_tx.clone()),
298            self.nostr_relay.clone(),
299            Some(self.mesh_frame_tx.clone()),
300            Some(self.state.cashu_quotes.clone()),
301        )
302        .await
303        .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
304        peer.set_signal_urls(self.signal_urls.clone());
305        Ok(peer)
306    }
307}
308
309#[async_trait]
310impl SharedPeerLinkFactory for SharedRouterPeerFactory {
311    async fn create_offer(
312        &self,
313        target_peer_id: &str,
314    ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
315        let target_peer = PeerId::from_string(target_peer_id).ok_or_else(|| {
316            SharedTransportError::ConnectionFailed(format!("invalid peer id {target_peer_id}"))
317        })?;
318        let peer = Arc::new(
319            self.create_peer(target_peer.clone(), PeerDirection::Outbound)
320                .await?,
321        );
322        peer.setup_handlers()
323            .await
324            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
325        let offer = peer
326            .connect()
327            .await
328            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
329        let sdp = offer
330            .get("sdp")
331            .and_then(|value| value.as_str())
332            .ok_or_else(|| {
333                SharedTransportError::ConnectionFailed("missing SDP in CLI peer offer".to_string())
334            })?
335            .to_string();
336        self.register_peer(target_peer, PeerDirection::Outbound, peer.clone())
337            .await;
338        Ok((peer as Arc<dyn SharedPeerLink>, sdp))
339    }
340
341    async fn accept_offer(
342        &self,
343        from_peer_id: &str,
344        offer_sdp: &str,
345    ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
346        let from_peer = PeerId::from_string(from_peer_id).ok_or_else(|| {
347            SharedTransportError::ConnectionFailed(format!("invalid peer id {from_peer_id}"))
348        })?;
349        let peer = Arc::new(
350            self.create_peer(from_peer.clone(), PeerDirection::Inbound)
351                .await?,
352        );
353        peer.setup_handlers()
354            .await
355            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
356        let answer = peer
357            .handle_offer(serde_json::json!({ "type": "offer", "sdp": offer_sdp }))
358            .await
359            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
360        let sdp = answer
361            .get("sdp")
362            .and_then(|value| value.as_str())
363            .ok_or_else(|| {
364                SharedTransportError::ConnectionFailed("missing SDP in CLI peer answer".to_string())
365            })?
366            .to_string();
367        self.register_peer(from_peer, PeerDirection::Inbound, peer.clone())
368            .await;
369        Ok((peer as Arc<dyn SharedPeerLink>, sdp))
370    }
371
372    async fn handle_answer(
373        &self,
374        target_peer_id: &str,
375        answer_sdp: &str,
376    ) -> Result<Arc<dyn SharedPeerLink>, SharedTransportError> {
377        let peer = self
378            .peers
379            .read()
380            .await
381            .get(target_peer_id)
382            .cloned()
383            .ok_or_else(|| {
384                SharedTransportError::ConnectionFailed(format!(
385                    "missing outbound peer for {target_peer_id}"
386                ))
387            })?;
388        peer.handle_answer(serde_json::json!({ "type": "answer", "sdp": answer_sdp }))
389            .await
390            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
391        Ok(peer as Arc<dyn SharedPeerLink>)
392    }
393
394    async fn handle_candidate(
395        &self,
396        peer_id: &str,
397        candidate: SharedIceCandidate,
398    ) -> Result<(), SharedTransportError> {
399        let peer = self.peers.read().await.get(peer_id).cloned();
400        if let Some(peer) = peer {
401            peer.handle_candidate(serde_json::json!({
402                "candidate": candidate.candidate,
403                "sdpMLineIndex": candidate.sdp_m_line_index,
404                "sdpMid": candidate.sdp_mid,
405            }))
406            .await
407            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
408        }
409        Ok(())
410    }
411
412    async fn remove_peer(&self, peer_id: &str) -> Result<(), SharedTransportError> {
413        self.peers.write().await.remove(peer_id);
414        Ok(())
415    }
416}
417
418impl WebRTCState {
419    pub fn new() -> Self {
420        let cfg = WebRTCConfig::default();
421        Self::new_with_routing_and_cashu(
422            cfg.request_selection_strategy,
423            cfg.request_fairness_enabled,
424            cfg.request_dispatch,
425            Duration::from_millis(cfg.message_timeout_ms),
426            CashuRoutingConfig::default(),
427            None,
428            None,
429        )
430    }
431
432    pub fn new_with_routing(
433        selection_strategy: crate::SelectionStrategy,
434        fairness_enabled: bool,
435        request_dispatch: RequestDispatchConfig,
436    ) -> Self {
437        let cfg = WebRTCConfig::default();
438        Self::new_with_routing_and_cashu(
439            selection_strategy,
440            fairness_enabled,
441            request_dispatch,
442            Duration::from_millis(cfg.message_timeout_ms),
443            CashuRoutingConfig::default(),
444            None,
445            None,
446        )
447    }
448
449    pub fn new_with_routing_and_cashu(
450        selection_strategy: crate::SelectionStrategy,
451        fairness_enabled: bool,
452        request_dispatch: RequestDispatchConfig,
453        request_timeout: Duration,
454        cashu_routing: CashuRoutingConfig,
455        payment_client: Option<Arc<dyn CashuPaymentClient>>,
456        mint_metadata: Option<Arc<CashuMintMetadataStore>>,
457    ) -> Self {
458        let mut selector = PeerSelector::with_strategy(selection_strategy);
459        selector.set_fairness(fairness_enabled);
460        let peer_selector = Arc::new(RwLock::new(selector));
461        let cashu_quotes = Arc::new(if let Some(mint_metadata) = mint_metadata {
462            CashuQuoteState::new_with_mint_metadata(
463                cashu_routing,
464                peer_selector.clone(),
465                payment_client,
466                mint_metadata,
467            )
468        } else {
469            CashuQuoteState::new(cashu_routing, peer_selector.clone(), payment_client)
470        });
471        Self {
472            runtime: MeshRuntimeState::new(),
473            peer_selector,
474            direct_signaling_tx: RwLock::new(None),
475            request_dispatch,
476            request_timeout,
477            cashu_quotes,
478        }
479    }
480
481    pub async fn set_local_buses(&self, buses: Vec<SharedLocalNostrBus>) {
482        self.runtime.set_local_buses(buses).await;
483    }
484
485    pub async fn add_local_bus(&self, bus: SharedLocalNostrBus) {
486        self.runtime.add_local_bus(bus).await;
487    }
488
489    pub async fn set_multicast_bus(&self, bus: Option<Arc<MulticastNostrBus>>) {
490        let buses = bus
491            .into_iter()
492            .map(|bus| bus as SharedLocalNostrBus)
493            .collect();
494        self.set_local_buses(buses).await;
495    }
496
497    /// Drop all live peer sessions and clear topology-specific state while
498    /// keeping cumulative bandwidth counters intact.
499    pub async fn reset_runtime_state(&self) {
500        self.runtime.reset().await;
501    }
502
503    pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
504        self.peer_selector
505            .read()
506            .await
507            .export_peer_metadata_snapshot()
508    }
509
510    pub async fn import_peer_metadata_snapshot(&self, snapshot: &PeerMetadataSnapshot) {
511        self.peer_selector
512            .write()
513            .await
514            .import_peer_metadata_snapshot(snapshot);
515    }
516
517    pub async fn selector_summary(&self) -> SelectorSummary {
518        self.peer_selector.read().await.summary()
519    }
520
521    pub async fn known_peer_snapshot(&self) -> KnownPeerSnapshot {
522        self.runtime.known_peer_snapshot().await
523    }
524
525    pub async fn import_known_peer_snapshot(&self, snapshot: &KnownPeerSnapshot) {
526        self.runtime.import_known_peer_snapshot(snapshot).await;
527    }
528
529    pub async fn ordered_known_peers(&self) -> Vec<KnownPeerRecord> {
530        let snapshot = self.runtime.known_peer_snapshot().await;
531        let mut by_peer = snapshot
532            .peers
533            .into_iter()
534            .map(|peer| (peer.peer_id.clone(), peer))
535            .collect::<HashMap<_, _>>();
536
537        let ordered_ids = {
538            let mut selector = self.peer_selector.write().await;
539            for peer_id in by_peer.keys() {
540                selector.add_peer(peer_id);
541            }
542            selector.select_peers()
543        };
544
545        let mut ordered = Vec::new();
546        for peer_id in ordered_ids {
547            if let Some(peer) = by_peer.remove(&peer_id) {
548                ordered.push(peer);
549            }
550        }
551        let mut remaining = by_peer.into_values().collect::<Vec<_>>();
552        remaining.sort_by(|a, b| a.peer_id.cmp(&b.peer_id));
553        ordered.extend(remaining);
554        ordered
555    }
556
557    pub async fn set_direct_signaling_sender(&self, tx: Option<mpsc::Sender<(String, Event)>>) {
558        *self.direct_signaling_tx.write().await = tx;
559    }
560
561    pub async fn submit_direct_signaling_event(&self, source: String, event: Event) -> bool {
562        let tx = self.direct_signaling_tx.read().await.clone();
563        let Some(tx) = tx else {
564            return false;
565        };
566        tx.send((source, event)).await.is_ok()
567    }
568
569    /// Get current bandwidth stats (bytes sent/received)
570    pub fn get_bandwidth(&self) -> (u64, u64) {
571        self.runtime.get_bandwidth()
572    }
573
574    pub fn get_mesh_stats(&self) -> (u64, u64, u64) {
575        self.runtime.get_mesh_stats()
576    }
577
578    pub fn record_mesh_received(&self) {
579        self.runtime.record_mesh_received();
580    }
581
582    pub fn record_mesh_forwarded(&self, count: u64) {
583        self.runtime.record_mesh_forwarded(count);
584    }
585
586    pub fn record_mesh_duplicate_drop(&self) {
587        self.runtime.record_mesh_duplicate_drop();
588    }
589
590    /// Record bytes sent (global + per-peer)
591    pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
592        self.runtime.record_sent(peer_id, bytes).await;
593    }
594
595    /// Record bytes received (global + per-peer)
596    pub async fn record_received(&self, peer_id: &str, bytes: u64) {
597        self.runtime.record_received(peer_id, bytes).await;
598    }
599
600    /// Request content by hash from connected peers
601    /// Queries peers in adaptive selector order with hedged fanout waves.
602    /// Returns the first successful response, or None if no peer has it
603    pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
604        self.request_from_peers_with_source(hash_hex)
605            .await
606            .map(|(data, _peer_id)| data)
607    }
608
609    /// Request content by hash from connected peers, returning data and source peer.
610    pub async fn request_from_peers_with_source(
611        &self,
612        hash_hex: &str,
613    ) -> Option<(Vec<u8>, String)> {
614        use crate::BLOB_REQUEST_POLICY;
615
616        let peer_hash_get = self.runtime.peer_hash_get_snapshot().await;
617        let peers = self.runtime.peers.read().await;
618
619        let peer_refs: Vec<_> = peers
620            .values()
621            .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
622            .filter_map(|p| {
623                if !peer_hash_get
624                    .get(&p.peer_id.to_string())
625                    .copied()
626                    .unwrap_or(true)
627                {
628                    return None;
629                }
630                p.peer
631                    .clone()
632                    .map(|peer| (p.peer_id.to_string(), peer, p.transport))
633            })
634            .collect();
635
636        drop(peers); // Release the read lock
637
638        let mut connected_peers: Vec<ConnectedPeer> = Vec::new();
639        let mut connected_sessions: Vec<ConnectedSession> = Vec::new();
640        for (peer_id, peer, transport) in peer_refs {
641            if !peer.is_ready() {
642                continue;
643            }
644            if bluetooth_nostr_only_mode() && transport == PeerTransport::Bluetooth {
645                continue;
646            }
647            if let Some(webrtc_peer) = peer.as_webrtc() {
648                let dc_guard = webrtc_peer.data_channel.lock().await;
649                if let Some(dc) = dc_guard.as_ref() {
650                    connected_peers.push((
651                        peer_id.clone(),
652                        webrtc_peer.pending_requests.clone(),
653                        dc.clone(),
654                    ));
655                }
656            }
657            connected_sessions.push((peer_id, peer, transport));
658        }
659
660        if connected_sessions.is_empty() {
661            debug!(
662                "No connected peers to query for {}",
663                &hash_hex[..8.min(hash_hex.len())]
664            );
665            return None;
666        }
667
668        // Convert hex to binary hash once
669        let hash_bytes = match hex::decode(hash_hex) {
670            Ok(b) => b,
671            Err(_) => return None,
672        };
673
674        let expected_hash: [u8; 32] = match hash_bytes.as_slice().try_into() {
675            Ok(h) => h,
676            Err(_) => {
677                debug!(
678                    "Invalid hash length {}, expected 32 bytes",
679                    hash_bytes.len()
680                );
681                return None;
682            }
683        };
684
685        let connected_peer_ids: Vec<String> = connected_sessions
686            .iter()
687            .map(|(peer_id, _, _)| peer_id.clone())
688            .collect();
689        sync_selector_peers(self.peer_selector.as_ref(), &connected_peer_ids).await;
690
691        let ordered_peer_ids = self.peer_selector.write().await.select_peers();
692        let mut quote_by_peer: HashMap<
693            String,
694            (
695                PendingRequestsMap,
696                Arc<webrtc::data_channel::RTCDataChannel>,
697            ),
698        > = connected_peers
699            .iter()
700            .cloned()
701            .map(|(peer_id, pending, dc)| (peer_id, (pending, dc)))
702            .collect();
703        let mut ordered_quote_peers: Vec<ConnectedPeer> = Vec::new();
704        for peer_id in &ordered_peer_ids {
705            if let Some((pending, dc)) = quote_by_peer.remove(peer_id) {
706                ordered_quote_peers.push((peer_id.clone(), pending, dc));
707            }
708        }
709        for (peer_id, (pending, dc)) in quote_by_peer {
710            ordered_quote_peers.push((peer_id, pending, dc));
711        }
712
713        let mut by_peer: HashMap<String, (MeshPeer, PeerTransport)> = connected_sessions
714            .into_iter()
715            .map(|(peer_id, peer, transport)| (peer_id, (peer, transport)))
716            .collect();
717
718        let mut ordered_peers: Vec<ConnectedSession> = Vec::new();
719        for peer_id in ordered_peer_ids {
720            if let Some((peer, transport)) = by_peer.remove(&peer_id) {
721                ordered_peers.push((peer_id, peer, transport));
722            }
723        }
724        for (peer_id, (peer, transport)) in by_peer {
725            ordered_peers.push((peer_id, peer, transport));
726        }
727
728        debug!(
729            "Querying {} peers for {} with shared hedged scheduler",
730            ordered_peers.len(),
731            &hash_hex[..8.min(hash_hex.len())],
732        );
733
734        if let Some((requested_mint, payment_sat, quote_ttl_ms)) =
735            self.cashu_quotes.requester_quote_terms().await
736        {
737            if let Some(quote) = self
738                .request_quote_from_peers(
739                    &hash_bytes,
740                    requested_mint,
741                    payment_sat,
742                    quote_ttl_ms,
743                    &ordered_quote_peers,
744                )
745                .await
746            {
747                if let Some(data) = self
748                    .request_from_single_peer(
749                        hash_hex,
750                        &hash_bytes,
751                        expected_hash,
752                        &quote.peer_id,
753                        Some(&quote),
754                        &ordered_quote_peers,
755                    )
756                    .await
757                {
758                    debug!(
759                        "Got quoted response from peer {} for {}",
760                        quote.peer_id,
761                        &hash_hex[..8.min(hash_hex.len())]
762                    );
763                    return Some((data, quote.peer_id));
764                }
765            }
766        }
767
768        let request = DataRequest {
769            h: hash_bytes.clone(),
770            htl: BLOB_REQUEST_POLICY.max_htl,
771            q: None,
772        };
773        let wire = crate::encode_request(&request);
774        let wire_len = wire.len() as u64;
775        let current_result_rx = Arc::new(Mutex::new(None));
776        if let Some((data, peer_id)) = run_hedged_waves(
777            ordered_peers.len(),
778            self.request_dispatch,
779            self.request_timeout,
780            |range| {
781                let wave_peers = ordered_peers[range].to_vec();
782                let (result_tx, result_rx) =
783                    mpsc::channel::<(String, Instant, Result<Option<Vec<u8>>>)>(wave_peers.len());
784                let current_result_rx = current_result_rx.clone();
785                let hash_hex = hash_hex.to_string();
786                async move {
787                    *current_result_rx.lock().await = Some(result_rx);
788                    let sent = wave_peers.len();
789                    for (peer_id, peer, transport) in wave_peers {
790                        if transport != PeerTransport::Bluetooth {
791                            self.record_sent(&peer_id, wire_len).await;
792                        }
793                        self.peer_selector
794                            .write()
795                            .await
796                            .record_request(&peer_id, wire_len);
797
798                        let result_tx = result_tx.clone();
799                        let peer_id_for_task = peer_id.clone();
800                        let peer = peer.clone();
801                        let hash_hex = hash_hex.clone();
802                        let per_request_timeout = self.request_timeout;
803                        tokio::spawn(async move {
804                            let started = Instant::now();
805                            let result = peer.request(&hash_hex, per_request_timeout).await;
806                            let _ = result_tx.send((peer_id_for_task, started, result)).await;
807                        });
808                    }
809                    drop(result_tx);
810                    sent
811                }
812            },
813            |wait| {
814                let current_result_rx = current_result_rx.clone();
815                async move {
816                    let mut current_result_rx = current_result_rx.lock().await;
817                    let Some(result_rx) = current_result_rx.as_mut() else {
818                        return HedgedWaveAction::Abort;
819                    };
820                    let deadline = Instant::now() + wait;
821                    loop {
822                        let now = Instant::now();
823                        if now >= deadline {
824                            return HedgedWaveAction::Continue;
825                        }
826                        let remaining = deadline.saturating_duration_since(now);
827                        match tokio::time::timeout(remaining, result_rx.recv()).await {
828                            Ok(Some((peer_id, started, Ok(Some(data))))) => {
829                                let rtt_ms = started.elapsed().as_millis() as u64;
830                                if hashtree_core::sha256(&data) == expected_hash {
831                                    let should_record = {
832                                        let peers = self.runtime.peers.read().await;
833                                        peers
834                                            .get(&peer_id)
835                                            .map(|entry| {
836                                                entry.transport != PeerTransport::Bluetooth
837                                            })
838                                            .unwrap_or(true)
839                                    };
840                                    if should_record {
841                                        self.record_received(&peer_id, data.len() as u64).await;
842                                    }
843                                    self.peer_selector.write().await.record_success(
844                                        &peer_id,
845                                        rtt_ms,
846                                        data.len() as u64,
847                                    );
848                                    return HedgedWaveAction::Success((data, peer_id));
849                                }
850                                self.peer_selector.write().await.record_failure(&peer_id);
851                            }
852                            Ok(Some((peer_id, _, Ok(None)))) | Ok(Some((peer_id, _, Err(_)))) => {
853                                self.peer_selector.write().await.record_timeout(&peer_id);
854                            }
855                            Ok(None) | Err(_) => return HedgedWaveAction::Continue,
856                        }
857                    }
858                }
859            },
860        )
861        .await
862        {
863            debug!(
864                "Got response from peer {} for {}",
865                peer_id,
866                &hash_hex[..8.min(hash_hex.len())]
867            );
868            return Some((data, peer_id));
869        }
870
871        debug!(
872            "No peer had data for {}",
873            &hash_hex[..8.min(hash_hex.len())]
874        );
875        None
876    }
877
878    async fn request_quote_from_peers(
879        &self,
880        hash_bytes: &[u8],
881        requested_mint: String,
882        payment_sat: u64,
883        quote_ttl_ms: u32,
884        ordered_peers: &[ConnectedPeer],
885    ) -> Option<NegotiatedQuote> {
886        if ordered_peers.is_empty() || quote_ttl_ms == 0 {
887            return None;
888        }
889
890        let hash_hex = hex::encode(hash_bytes);
891        let rx = self
892            .cashu_quotes
893            .register_pending_quote(hash_hex.clone(), Some(requested_mint.clone()), payment_sat)
894            .await;
895        let quote_request = DataQuoteRequest {
896            h: hash_bytes.to_vec(),
897            p: payment_sat,
898            t: quote_ttl_ms,
899            m: Some(requested_mint),
900        };
901        let wire = crate::encode_quote_request(&quote_request);
902        let rx = Arc::new(Mutex::new(rx));
903        let result = run_hedged_waves(
904            ordered_peers.len(),
905            self.request_dispatch,
906            self.request_timeout,
907            |range| {
908                let wave_peers = ordered_peers[range].to_vec();
909                let wire = wire.clone();
910                async move {
911                    let mut sent = 0usize;
912                    for (_, _, dc) in wave_peers {
913                        if dc.send(&bytes::Bytes::copy_from_slice(&wire)).await.is_ok() {
914                            sent += 1;
915                        }
916                    }
917                    sent
918                }
919            },
920            |wait| {
921                let rx = rx.clone();
922                async move {
923                    let mut rx = rx.lock().await;
924                    match tokio::time::timeout(wait, &mut *rx).await {
925                        Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
926                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
927                        Err(_) => HedgedWaveAction::Continue,
928                    }
929                }
930            },
931        )
932        .await;
933
934        self.cashu_quotes.clear_pending_quote(&hash_hex).await;
935        result
936    }
937
938    async fn request_from_single_peer(
939        &self,
940        hash_hex: &str,
941        hash_bytes: &[u8],
942        expected_hash: [u8; 32],
943        target_peer_id: &str,
944        quote: Option<&NegotiatedQuote>,
945        ordered_peers: &[ConnectedPeer],
946    ) -> Option<Vec<u8>> {
947        use crate::BLOB_REQUEST_POLICY;
948
949        let (pending_requests, dc) = ordered_peers
950            .iter()
951            .find(|(peer_id, _, _)| peer_id == target_peer_id)
952            .map(|(_, pending_requests, dc)| (pending_requests.clone(), dc.clone()))?;
953
954        let request = DataRequest {
955            h: hash_bytes.to_vec(),
956            htl: BLOB_REQUEST_POLICY.max_htl,
957            q: quote.map(|quote| quote.quote_id),
958        };
959        let wire = crate::encode_request(&request);
960        let wire_len = wire.len() as u64;
961        let sent_at = Instant::now();
962        let (tx, mut rx) = tokio::sync::oneshot::channel();
963
964        {
965            let mut pending = pending_requests.lock().await;
966            pending.insert(
967                hash_hex.to_string(),
968                if let Some(quote) = quote {
969                    PendingRequest::quoted(
970                        hash_bytes.to_vec(),
971                        tx,
972                        quote.quote_id,
973                        quote.mint_url.clone().unwrap_or_default(),
974                        quote.payment_sat,
975                    )
976                } else {
977                    PendingRequest::standard(hash_bytes.to_vec(), tx)
978                },
979            );
980        }
981
982        if dc
983            .send(&bytes::Bytes::copy_from_slice(&wire))
984            .await
985            .is_err()
986        {
987            let mut pending = pending_requests.lock().await;
988            pending.remove(hash_hex);
989            self.peer_selector
990                .write()
991                .await
992                .record_failure(target_peer_id);
993            return None;
994        }
995
996        self.record_sent(target_peer_id, wire_len).await;
997        self.peer_selector
998            .write()
999            .await
1000            .record_request(target_peer_id, wire_len);
1001
1002        let wait_timeout = if let Some(quote) = quote {
1003            let multiplier = quote.payment_sat.clamp(1, 32) as u128;
1004            let extra_ms = self
1005                .cashu_quotes
1006                .settlement_timeout()
1007                .as_millis()
1008                .saturating_mul(multiplier);
1009            self.request_timeout + Duration::from_millis(extra_ms.min(u64::MAX as u128) as u64)
1010        } else {
1011            self.request_timeout
1012        };
1013
1014        match tokio::time::timeout(wait_timeout, &mut rx).await {
1015            Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == expected_hash => {
1016                let rtt_ms = sent_at.elapsed().as_millis() as u64;
1017                self.record_received(target_peer_id, data.len() as u64)
1018                    .await;
1019                self.peer_selector.write().await.record_success(
1020                    target_peer_id,
1021                    rtt_ms,
1022                    data.len() as u64,
1023                );
1024                Some(data)
1025            }
1026            Ok(Ok(Some(_))) => {
1027                self.peer_selector
1028                    .write()
1029                    .await
1030                    .record_failure(target_peer_id);
1031                let pending = pending_requests.lock().await.remove(hash_hex);
1032                if let Some(pending) = pending {
1033                    if let Some(quoted) = pending.quoted {
1034                        if let Some(in_flight) = quoted.in_flight_payment {
1035                            let _ = self
1036                                .cashu_quotes
1037                                .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
1038                                .await;
1039                        }
1040                    }
1041                }
1042                None
1043            }
1044            Ok(Ok(None)) | Ok(Err(_)) | Err(_) => {
1045                let pending = pending_requests.lock().await.remove(hash_hex);
1046                if let Some(pending) = pending {
1047                    if let Some(quoted) = pending.quoted {
1048                        if let Some(in_flight) = quoted.in_flight_payment {
1049                            let _ = self
1050                                .cashu_quotes
1051                                .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
1052                                .await;
1053                        }
1054                    }
1055                }
1056                self.peer_selector
1057                    .write()
1058                    .await
1059                    .record_timeout(target_peer_id);
1060                None
1061            }
1062        }
1063    }
1064
1065    /// Resolve a hashtree root event through connected peers using Nostr REQ/EOSE over WebRTC.
1066    pub async fn resolve_root_from_peers(
1067        &self,
1068        owner_pubkey: &str,
1069        tree_name: &str,
1070        per_peer_timeout: Duration,
1071    ) -> Option<PeerRootEvent> {
1072        let peer_refs: Vec<(String, Arc<dyn MeshSession>)> = {
1073            let peers = self.runtime.peers.read().await;
1074            peers
1075                .values()
1076                .filter(|entry| entry.state == ConnectionState::Connected)
1077                .filter(|entry| {
1078                    !bluetooth_nostr_only_mode() || entry.transport != PeerTransport::Bluetooth
1079                })
1080                .filter_map(|entry| {
1081                    let peer = entry.peer.as_ref()?;
1082                    Some((
1083                        entry.peer_id.short(),
1084                        Arc::new(peer.clone()) as Arc<dyn MeshSession>,
1085                    ))
1086                })
1087                .collect()
1088        };
1089
1090        let resolved =
1091            resolve_root_via_peer_sessions(peer_refs, owner_pubkey, tree_name, per_peer_timeout)
1092                .await;
1093        if let Some(root) = &resolved {
1094            debug!(
1095                "Resolved {}/{} via peer {} event {}",
1096                owner_pubkey, tree_name, root.peer_id, root.event_id
1097            );
1098        }
1099        resolved
1100    }
1101
1102    pub async fn resolve_root_from_local_buses_with_source(
1103        &self,
1104        owner_pubkey: &str,
1105        tree_name: &str,
1106        timeout: Duration,
1107    ) -> Option<(&'static str, PeerRootEvent)> {
1108        self.runtime
1109            .resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
1110            .await
1111    }
1112
1113    pub async fn resolve_root_from_local_buses(
1114        &self,
1115        owner_pubkey: &str,
1116        tree_name: &str,
1117        timeout: Duration,
1118    ) -> Option<PeerRootEvent> {
1119        self.resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
1120            .await
1121            .map(|(_, root)| root)
1122    }
1123
1124    pub async fn resolve_root_from_multicast(
1125        &self,
1126        owner_pubkey: &str,
1127        tree_name: &str,
1128        timeout: Duration,
1129    ) -> Option<PeerRootEvent> {
1130        self.resolve_root_from_local_buses(owner_pubkey, tree_name, timeout)
1131            .await
1132    }
1133}
1134
1135impl Default for WebRTCState {
1136    fn default() -> Self {
1137        Self::new()
1138    }
1139}
1140
1141/// Native mesh manager handles peer discovery and transport fan-out.
1142pub struct WebRTCManager {
1143    config: WebRTCConfig,
1144    my_peer_id: PeerId,
1145    keys: Keys,
1146    state: Arc<WebRTCState>,
1147    shutdown: Arc<tokio::sync::watch::Sender<bool>>,
1148    shutdown_rx: tokio::sync::watch::Receiver<bool>,
1149    /// Channel to send signaling messages to relays
1150    signaling_tx: mpsc::Sender<SignalingMessage>,
1151    signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
1152    /// Optional content store for serving hash requests
1153    store: Option<Arc<dyn ContentStore>>,
1154    /// Peer classifier for pool assignment
1155    peer_classifier: PeerClassifier,
1156    /// Optional Nostr relay for data-channel relay messages
1157    nostr_relay: Option<SharedMeshRelayClient>,
1158    local_buses: Vec<SharedLocalNostrBus>,
1159    /// Channel for peer state events (connection success/failure)
1160    state_event_tx: mpsc::Sender<PeerStateEvent>,
1161    state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
1162    /// Channel for relayless mesh signaling frames received from peers.
1163    mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
1164    mesh_frame_rx: Option<mpsc::Receiver<(PeerId, MeshNostrFrame)>>,
1165    shared_router: Option<Arc<SharedProductionRouter>>,
1166    seen_frame_ids: Arc<Mutex<TimedSeenSet>>,
1167    seen_event_ids: Arc<Mutex<TimedSeenSet>>,
1168}
1169
1170impl WebRTCManager {
1171    /// Create a new WebRTC manager
1172    pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
1173        let pubkey = keys.public_key().to_hex();
1174        let my_peer_id = PeerId::new(pubkey);
1175        let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
1176        let (signaling_tx, signaling_rx) = mpsc::channel(100);
1177        let (state_event_tx, state_event_rx) = mpsc::channel(100);
1178        let (mesh_frame_tx, mesh_frame_rx) = mpsc::channel(256);
1179        let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1180            config.request_selection_strategy,
1181            config.request_fairness_enabled,
1182            config.request_dispatch,
1183            Duration::from_millis(config.message_timeout_ms),
1184            CashuRoutingConfig::default(),
1185            None,
1186            None,
1187        ));
1188
1189        // Default classifier: all peers go to 'other' pool
1190        let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
1191
1192        Self {
1193            config,
1194            my_peer_id,
1195            keys,
1196            state,
1197            shutdown: Arc::new(shutdown),
1198            shutdown_rx,
1199            signaling_tx,
1200            signaling_rx: Some(signaling_rx),
1201            store: None,
1202            peer_classifier,
1203            nostr_relay: None,
1204            local_buses: Vec::new(),
1205            state_event_tx,
1206            state_event_rx: Some(state_event_rx),
1207            mesh_frame_tx,
1208            mesh_frame_rx: Some(mesh_frame_rx),
1209            shared_router: None,
1210            seen_frame_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1211                SEEN_FRAME_CAP,
1212                SEEN_FRAME_TTL,
1213            ))),
1214            seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1215                SEEN_EVENT_CAP,
1216                SEEN_EVENT_TTL,
1217            ))),
1218        }
1219    }
1220
1221    /// Create a new WebRTC manager reusing an existing shared state object.
1222    pub fn new_with_state(keys: Keys, config: WebRTCConfig, state: Arc<WebRTCState>) -> Self {
1223        let mut manager = Self::new(keys, config);
1224        manager.state = state;
1225        manager
1226    }
1227
1228    /// Create a new WebRTC manager with a peer classifier
1229    pub fn new_with_classifier(
1230        keys: Keys,
1231        config: WebRTCConfig,
1232        classifier: PeerClassifier,
1233    ) -> Self {
1234        let mut manager = Self::new(keys, config);
1235        manager.peer_classifier = classifier;
1236        manager
1237    }
1238
1239    /// Create a new WebRTC manager with a content store for serving hash requests
1240    pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
1241        let mut manager = Self::new(keys, config);
1242        manager.store = Some(store);
1243        manager
1244    }
1245
1246    /// Create a new WebRTC manager with store and classifier
1247    pub fn new_with_store_and_classifier(
1248        keys: Keys,
1249        config: WebRTCConfig,
1250        store: Arc<dyn ContentStore>,
1251        classifier: PeerClassifier,
1252    ) -> Self {
1253        Self::new_with_store_and_classifier_and_cashu(
1254            keys,
1255            config,
1256            store,
1257            classifier,
1258            CashuRoutingConfig::default(),
1259            None,
1260            None,
1261        )
1262    }
1263
1264    pub fn new_with_state_and_store_and_classifier(
1265        keys: Keys,
1266        config: WebRTCConfig,
1267        state: Arc<WebRTCState>,
1268        store: Arc<dyn ContentStore>,
1269        classifier: PeerClassifier,
1270    ) -> Self {
1271        let mut manager = Self::new_with_state(keys, config, state);
1272        manager.store = Some(store);
1273        manager.peer_classifier = classifier;
1274        manager
1275    }
1276
1277    pub fn new_with_store_and_classifier_and_cashu(
1278        keys: Keys,
1279        config: WebRTCConfig,
1280        store: Arc<dyn ContentStore>,
1281        classifier: PeerClassifier,
1282        cashu_routing: CashuRoutingConfig,
1283        payment_client: Option<Arc<dyn CashuPaymentClient>>,
1284        mint_metadata: Option<Arc<CashuMintMetadataStore>>,
1285    ) -> Self {
1286        let mut manager = Self::new(keys, config);
1287        manager.state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1288            manager.config.request_selection_strategy,
1289            manager.config.request_fairness_enabled,
1290            manager.config.request_dispatch,
1291            Duration::from_millis(manager.config.message_timeout_ms),
1292            cashu_routing,
1293            payment_client,
1294            mint_metadata,
1295        ));
1296        manager.store = Some(store);
1297        manager.peer_classifier = classifier;
1298        manager
1299    }
1300
1301    /// Set the content store for serving hash requests
1302    pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
1303        self.store = Some(store);
1304    }
1305
1306    /// Set the peer classifier
1307    pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
1308        self.peer_classifier = classifier;
1309    }
1310
1311    /// Set the Nostr relay for data-channel relay messages
1312    pub fn set_nostr_relay(&mut self, relay: SharedMeshRelayClient) {
1313        self.nostr_relay = Some(relay);
1314    }
1315
1316    /// Get my peer ID
1317    pub fn my_peer_id(&self) -> &PeerId {
1318        &self.my_peer_id
1319    }
1320
1321    /// Get shared state for external access
1322    pub fn state(&self) -> Arc<WebRTCState> {
1323        self.state.clone()
1324    }
1325
1326    /// Cloneable shutdown handle for external lifecycle control.
1327    pub fn shutdown_signal(&self) -> Arc<tokio::sync::watch::Sender<bool>> {
1328        self.shutdown.clone()
1329    }
1330
1331    /// Signal shutdown
1332    pub fn shutdown(&self) {
1333        let _ = self.shutdown.send(true);
1334    }
1335
1336    /// Get connected peer count
1337    pub async fn connected_count(&self) -> usize {
1338        self.state
1339            .runtime
1340            .connected_count
1341            .load(std::sync::atomic::Ordering::Relaxed)
1342    }
1343
1344    /// Get all peer statuses
1345    pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
1346        self.state
1347            .runtime
1348            .peers
1349            .read()
1350            .await
1351            .values()
1352            .map(|p| PeerStatus {
1353                peer_id: p.peer_id.to_string(),
1354                pubkey: p.peer_id.pubkey.clone(),
1355                state: p.state.to_string(),
1356                direction: p.direction,
1357                connected_at: Some(p.last_seen),
1358                pool: p.pool,
1359            })
1360            .collect()
1361    }
1362
1363    /// Get pool counts
1364    /// Returns (follows_connected, follows_active, other_connected, other_active)
1365    /// "active" = Connected or Connecting (excludes Discovered and Failed)
1366    pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
1367        let peers = self.state.runtime.peers.read().await;
1368        let mut follows_connected = 0;
1369        let mut follows_active = 0;
1370        let mut other_connected = 0;
1371        let mut other_active = 0;
1372
1373        for entry in peers.values() {
1374            // Only count Connected or Connecting as "active" connections
1375            // Discovered peers are just seen hellos, not real connections
1376            let is_active = entry.state == ConnectionState::Connected
1377                || entry.state == ConnectionState::Connecting;
1378
1379            match entry.pool {
1380                PeerPool::Follows => {
1381                    if is_active {
1382                        follows_active += 1;
1383                    }
1384                    if entry.state == ConnectionState::Connected {
1385                        follows_connected += 1;
1386                    }
1387                }
1388                PeerPool::Other => {
1389                    if is_active {
1390                        other_active += 1;
1391                    }
1392                    if entry.state == ConnectionState::Connected {
1393                        other_connected += 1;
1394                    }
1395                }
1396            }
1397        }
1398
1399        (
1400            follows_connected,
1401            follows_active,
1402            other_connected,
1403            other_active,
1404        )
1405    }
1406
1407    fn local_bus_max_peers(&self, source: &str) -> Option<usize> {
1408        match source {
1409            "multicast" => Some(self.config.multicast.max_peers),
1410            WIFI_AWARE_SOURCE => Some(self.config.wifi_aware.max_peers),
1411            _ => None,
1412        }
1413    }
1414
1415    #[cfg_attr(not(test), allow(dead_code))]
1416    fn can_track_local_bus_peer(
1417        &self,
1418        source: &str,
1419        peer_key: &str,
1420        peers: &HashMap<String, PeerEntry>,
1421    ) -> bool {
1422        can_track_source_peer(source, peer_key, peers, self.local_bus_max_peers(source))
1423    }
1424}
1425
1426mod runtime;