Skip to main content

hashtree_network/manager/
mod.rs

1//! WebRTC signaling over Nostr relays
2//!
3//! Protocol (compatible with hashtree-ts):
4//! - All signaling uses ephemeral kind 25050
5//! - Hello messages: #l: "hello" tag, broadcast for peer discovery (unencrypted)
6//! - Directed signaling (offer, answer, candidate, candidates): NIP-17 style
7//!   gift wrap for privacy - wrapped with ephemeral key, #p tag with recipient
8//!
9//! Security: Directed messages use gift wrapping with ephemeral keys so that
10//! relays cannot see the actual sender or correlate messages.
11
12pub use crate::{ConnectionState, PeerSignalPath, PeerTransport};
13use anyhow::Result;
14use async_trait::async_trait;
15use cashu_service::CashuPaymentClient;
16use nostr_sdk::nostr::Keys;
17#[cfg(test)]
18use nostr_sdk::nostr::Kind;
19use std::collections::{BTreeSet, HashMap};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::{mpsc, Mutex, RwLock};
23use tracing::{debug, error, info, warn};
24
25use crate::bluetooth::{
26    BluetoothConfig, BluetoothMesh, BluetoothPeerRegistrar, BluetoothRuntimeContext,
27};
28use crate::cashu::{CashuMintMetadataStore, CashuQuoteState, CashuRoutingConfig, NegotiatedQuote};
29use crate::local_bus::SharedLocalNostrBus;
30use crate::mesh_session::{
31    resolve_root_from_peer_sessions as resolve_root_via_peer_sessions, MeshSession,
32};
33use crate::mesh_store_core::{
34    run_hedged_waves, sync_selector_peers, HedgedWaveAction, RequestDispatchConfig,
35};
36use crate::multicast::{MulticastConfig, MulticastNostrBus};
37use crate::nostr::NostrRelayTransport;
38use crate::peer::{ContentStore, Peer, PendingRequest};
39use crate::peer_selector::PeerSelector;
40use crate::protocol::{DataQuoteRequest, DataRequest};
41use crate::relay_bridge::SharedMeshRelayClient;
42use crate::root_events::PeerRootEvent;
43use crate::runtime_control::{can_track_source_peer, PeerStateEvent};
44use crate::runtime_peer::{
45    MeshPeerEntry as SharedPeerEntry, PeerClassifier as SharedPeerClassifier,
46};
47use crate::runtime_state::MeshRuntimeState;
48use crate::session::MeshPeer;
49use crate::signaling::MeshRouter;
50use crate::transport::{
51    PeerLink as SharedPeerLink, PeerLinkFactory as SharedPeerLinkFactory,
52    SignalingTransport as SharedSignalingTransport, TransportError as SharedTransportError,
53};
54use crate::types::{
55    validate_mesh_frame, MeshNostrFrame, MeshNostrPayload, SignalingMessage, TimedSeenSet,
56};
57use crate::wifi_aware::{
58    mobile_wifi_aware_bridge, WifiAwareConfig, WifiAwareNostrBus, WIFI_AWARE_SOURCE,
59};
60use crate::{
61    ClassifyRequest as SharedClassifyRequest, IceCandidate as SharedIceCandidate, PeerDirection,
62    PeerId, PeerPool, MESH_SIGNALING_EVENT_KIND,
63};
64
65/// Callback type for classifying peers into pools
66pub type PeerClassifier = SharedPeerClassifier;
67pub type PeerEntry = SharedPeerEntry<MeshPeer>;
68
69#[derive(Clone)]
70pub struct WebRTCConfig {
71    pub relays: Vec<String>,
72    pub signaling_enabled: bool,
73    pub hash_get_enabled: bool,
74    pub max_outbound: usize,
75    pub max_inbound: usize,
76    pub hello_interval_ms: u64,
77    pub message_timeout_ms: u64,
78    pub stun_servers: Vec<String>,
79    pub debug: bool,
80    pub multicast: MulticastConfig,
81    pub wifi_aware: WifiAwareConfig,
82    pub bluetooth: BluetoothConfig,
83    pub pools: crate::PoolSettings,
84    pub request_selection_strategy: crate::SelectionStrategy,
85    pub request_fairness_enabled: bool,
86    pub request_dispatch: RequestDispatchConfig,
87}
88
89impl Default for WebRTCConfig {
90    fn default() -> Self {
91        Self {
92            relays: vec![
93                "wss://relay.damus.io".to_string(),
94                "wss://relay.primal.net".to_string(),
95                "wss://temp.iris.to".to_string(),
96                "wss://relay.snort.social".to_string(),
97            ],
98            signaling_enabled: true,
99            hash_get_enabled: true,
100            max_outbound: 6,
101            max_inbound: 6,
102            hello_interval_ms: 3000,
103            message_timeout_ms: 15000,
104            stun_servers: vec![
105                "stun:stun.iris.to:3478".to_string(),
106                "stun:stun.l.google.com:19302".to_string(),
107                "stun:stun.cloudflare.com:3478".to_string(),
108            ],
109            debug: false,
110            multicast: MulticastConfig::default(),
111            wifi_aware: WifiAwareConfig::default(),
112            bluetooth: BluetoothConfig::default(),
113            pools: crate::PoolSettings::default(),
114            request_selection_strategy: crate::SelectionStrategy::TitForTat,
115            request_fairness_enabled: true,
116            request_dispatch: RequestDispatchConfig {
117                initial_fanout: 2,
118                hedge_fanout: 1,
119                max_fanout: 8,
120                hedge_interval_ms: 120,
121            },
122        }
123    }
124}
125
126#[derive(Debug, Clone)]
127pub struct PeerStatus {
128    pub peer_id: String,
129    pub pubkey: String,
130    pub state: String,
131    pub direction: PeerDirection,
132    pub connected_at: Option<std::time::Instant>,
133    pub pool: PeerPool,
134}
135
136fn bluetooth_nostr_only_mode() -> bool {
137    matches!(
138        std::env::var("HTREE_BLUETOOTH_NOSTR_ONLY").ok().as_deref(),
139        Some("1" | "true" | "TRUE" | "yes" | "YES")
140    )
141}
142
143/// Shared state for the native mesh router.
144pub struct WebRTCState {
145    pub runtime: MeshRuntimeState<MeshPeer>,
146    /// Shared peer selector used by live retrieval; aligned with simulation strategies.
147    peer_selector: Arc<RwLock<PeerSelector>>,
148    /// Hedged dispatch policy for retrieval requests.
149    request_dispatch: RequestDispatchConfig,
150    /// Retrieval timeout for quote negotiation and single-peer fetches.
151    request_timeout: Duration,
152    /// Shared Cashu quote negotiation policy/state.
153    cashu_quotes: Arc<CashuQuoteState>,
154}
155const SEEN_FRAME_CAP: usize = 4096;
156const SEEN_FRAME_TTL: Duration = Duration::from_secs(120);
157const SEEN_EVENT_CAP: usize = 8192;
158const SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
159
160type PendingRequestsMap = Arc<Mutex<HashMap<String, PendingRequest>>>;
161type ConnectedPeer = (
162    String,
163    PendingRequestsMap,
164    Arc<webrtc::data_channel::RTCDataChannel>,
165);
166type ConnectedSession = (String, MeshPeer, PeerTransport);
167type SharedProductionRouter = MeshRouter<RouterSignalingBridge, SharedRouterPeerFactory>;
168
169#[derive(Clone)]
170struct RouterSignalingBridge {
171    peer_id: String,
172    signaling_tx: mpsc::Sender<SignalingMessage>,
173}
174
175impl RouterSignalingBridge {
176    fn new(peer_id: String, signaling_tx: mpsc::Sender<SignalingMessage>) -> Self {
177        Self {
178            peer_id,
179            signaling_tx,
180        }
181    }
182}
183
184#[async_trait]
185impl SharedSignalingTransport for RouterSignalingBridge {
186    async fn connect(&self, _relays: &[String]) -> Result<(), SharedTransportError> {
187        Ok(())
188    }
189
190    async fn disconnect(&self) {}
191
192    async fn publish(&self, msg: SignalingMessage) -> Result<(), SharedTransportError> {
193        self.signaling_tx
194            .send(msg)
195            .await
196            .map_err(|e| SharedTransportError::SendFailed(e.to_string()))
197    }
198
199    async fn recv(&self) -> Option<SignalingMessage> {
200        None
201    }
202
203    fn try_recv(&self) -> Option<SignalingMessage> {
204        None
205    }
206
207    fn peer_id(&self) -> &str {
208        &self.peer_id
209    }
210}
211
212struct SharedRouterPeerFactory {
213    my_peer_id: PeerId,
214    signaling_tx: mpsc::Sender<SignalingMessage>,
215    stun_servers: Vec<String>,
216    store: Option<Arc<dyn ContentStore>>,
217    state: Arc<WebRTCState>,
218    state_event_tx: mpsc::Sender<PeerStateEvent>,
219    nostr_relay: Option<SharedMeshRelayClient>,
220    mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
221    peer_classifier: PeerClassifier,
222    peers: RwLock<HashMap<String, Arc<Peer>>>,
223}
224
225impl SharedRouterPeerFactory {
226    fn new(
227        my_peer_id: PeerId,
228        signaling_tx: mpsc::Sender<SignalingMessage>,
229        stun_servers: Vec<String>,
230        store: Option<Arc<dyn ContentStore>>,
231        state: Arc<WebRTCState>,
232        state_event_tx: mpsc::Sender<PeerStateEvent>,
233        nostr_relay: Option<SharedMeshRelayClient>,
234        mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
235        peer_classifier: PeerClassifier,
236    ) -> Self {
237        Self {
238            my_peer_id,
239            signaling_tx,
240            stun_servers,
241            store,
242            state,
243            state_event_tx,
244            nostr_relay,
245            mesh_frame_tx,
246            peer_classifier,
247            peers: RwLock::new(HashMap::new()),
248        }
249    }
250
251    async fn register_peer(&self, peer_id: PeerId, direction: PeerDirection, peer: Arc<Peer>) {
252        let peer_key = peer_id.to_string();
253        let pool = (self.peer_classifier)(&peer_id.pubkey);
254        self.peers
255            .write()
256            .await
257            .insert(peer_key.clone(), peer.clone());
258
259        let mut peers = self.state.runtime.peers.write().await;
260        peers.insert(
261            peer_key,
262            PeerEntry {
263                peer_id,
264                direction,
265                state: ConnectionState::Connecting,
266                last_seen: Instant::now(),
267                peer: Some(MeshPeer::WebRtc(peer)),
268                pool,
269                transport: PeerTransport::WebRtc,
270                signal_paths: BTreeSet::from([PeerSignalPath::Relay]),
271                bytes_sent: 0,
272                bytes_received: 0,
273            },
274        );
275    }
276
277    async fn create_peer(
278        &self,
279        peer_id: PeerId,
280        direction: PeerDirection,
281    ) -> Result<Peer, SharedTransportError> {
282        Peer::new_with_store_and_events(
283            peer_id,
284            direction,
285            self.my_peer_id.clone(),
286            self.signaling_tx.clone(),
287            self.stun_servers.clone(),
288            self.store.clone(),
289            Some(self.state_event_tx.clone()),
290            self.nostr_relay.clone(),
291            Some(self.mesh_frame_tx.clone()),
292            Some(self.state.cashu_quotes.clone()),
293        )
294        .await
295        .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))
296    }
297}
298
299#[async_trait]
300impl SharedPeerLinkFactory for SharedRouterPeerFactory {
301    async fn create_offer(
302        &self,
303        target_peer_id: &str,
304    ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
305        let target_peer = PeerId::from_string(target_peer_id).ok_or_else(|| {
306            SharedTransportError::ConnectionFailed(format!("invalid peer id {target_peer_id}"))
307        })?;
308        let peer = Arc::new(
309            self.create_peer(target_peer.clone(), PeerDirection::Outbound)
310                .await?,
311        );
312        peer.setup_handlers()
313            .await
314            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
315        let offer = peer
316            .connect()
317            .await
318            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
319        let sdp = offer
320            .get("sdp")
321            .and_then(|value| value.as_str())
322            .ok_or_else(|| {
323                SharedTransportError::ConnectionFailed("missing SDP in CLI peer offer".to_string())
324            })?
325            .to_string();
326        self.register_peer(target_peer, PeerDirection::Outbound, peer.clone())
327            .await;
328        Ok((peer as Arc<dyn SharedPeerLink>, sdp))
329    }
330
331    async fn accept_offer(
332        &self,
333        from_peer_id: &str,
334        offer_sdp: &str,
335    ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
336        let from_peer = PeerId::from_string(from_peer_id).ok_or_else(|| {
337            SharedTransportError::ConnectionFailed(format!("invalid peer id {from_peer_id}"))
338        })?;
339        let peer = Arc::new(
340            self.create_peer(from_peer.clone(), PeerDirection::Inbound)
341                .await?,
342        );
343        peer.setup_handlers()
344            .await
345            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
346        let answer = peer
347            .handle_offer(serde_json::json!({ "type": "offer", "sdp": offer_sdp }))
348            .await
349            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
350        let sdp = answer
351            .get("sdp")
352            .and_then(|value| value.as_str())
353            .ok_or_else(|| {
354                SharedTransportError::ConnectionFailed("missing SDP in CLI peer answer".to_string())
355            })?
356            .to_string();
357        self.register_peer(from_peer, PeerDirection::Inbound, peer.clone())
358            .await;
359        Ok((peer as Arc<dyn SharedPeerLink>, sdp))
360    }
361
362    async fn handle_answer(
363        &self,
364        target_peer_id: &str,
365        answer_sdp: &str,
366    ) -> Result<Arc<dyn SharedPeerLink>, SharedTransportError> {
367        let peer = self
368            .peers
369            .read()
370            .await
371            .get(target_peer_id)
372            .cloned()
373            .ok_or_else(|| {
374                SharedTransportError::ConnectionFailed(format!(
375                    "missing outbound peer for {target_peer_id}"
376                ))
377            })?;
378        peer.handle_answer(serde_json::json!({ "type": "answer", "sdp": answer_sdp }))
379            .await
380            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
381        Ok(peer as Arc<dyn SharedPeerLink>)
382    }
383
384    async fn handle_candidate(
385        &self,
386        peer_id: &str,
387        candidate: SharedIceCandidate,
388    ) -> Result<(), SharedTransportError> {
389        let peer = self.peers.read().await.get(peer_id).cloned();
390        if let Some(peer) = peer {
391            peer.handle_candidate(serde_json::json!({
392                "candidate": candidate.candidate,
393                "sdpMLineIndex": candidate.sdp_m_line_index,
394                "sdpMid": candidate.sdp_mid,
395            }))
396            .await
397            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
398        }
399        Ok(())
400    }
401
402    async fn remove_peer(&self, peer_id: &str) -> Result<(), SharedTransportError> {
403        self.peers.write().await.remove(peer_id);
404        Ok(())
405    }
406}
407
408impl WebRTCState {
409    pub fn new() -> Self {
410        let cfg = WebRTCConfig::default();
411        Self::new_with_routing_and_cashu(
412            cfg.request_selection_strategy,
413            cfg.request_fairness_enabled,
414            cfg.request_dispatch,
415            Duration::from_millis(cfg.message_timeout_ms),
416            CashuRoutingConfig::default(),
417            None,
418            None,
419        )
420    }
421
422    pub fn new_with_routing(
423        selection_strategy: crate::SelectionStrategy,
424        fairness_enabled: bool,
425        request_dispatch: RequestDispatchConfig,
426    ) -> Self {
427        let cfg = WebRTCConfig::default();
428        Self::new_with_routing_and_cashu(
429            selection_strategy,
430            fairness_enabled,
431            request_dispatch,
432            Duration::from_millis(cfg.message_timeout_ms),
433            CashuRoutingConfig::default(),
434            None,
435            None,
436        )
437    }
438
439    pub fn new_with_routing_and_cashu(
440        selection_strategy: crate::SelectionStrategy,
441        fairness_enabled: bool,
442        request_dispatch: RequestDispatchConfig,
443        request_timeout: Duration,
444        cashu_routing: CashuRoutingConfig,
445        payment_client: Option<Arc<dyn CashuPaymentClient>>,
446        mint_metadata: Option<Arc<CashuMintMetadataStore>>,
447    ) -> Self {
448        let mut selector = PeerSelector::with_strategy(selection_strategy);
449        selector.set_fairness(fairness_enabled);
450        let peer_selector = Arc::new(RwLock::new(selector));
451        let cashu_quotes = Arc::new(if let Some(mint_metadata) = mint_metadata {
452            CashuQuoteState::new_with_mint_metadata(
453                cashu_routing,
454                peer_selector.clone(),
455                payment_client,
456                mint_metadata,
457            )
458        } else {
459            CashuQuoteState::new(cashu_routing, peer_selector.clone(), payment_client)
460        });
461        Self {
462            runtime: MeshRuntimeState::new(),
463            peer_selector,
464            request_dispatch,
465            request_timeout,
466            cashu_quotes,
467        }
468    }
469
470    pub async fn set_local_buses(&self, buses: Vec<SharedLocalNostrBus>) {
471        self.runtime.set_local_buses(buses).await;
472    }
473
474    pub async fn add_local_bus(&self, bus: SharedLocalNostrBus) {
475        self.runtime.add_local_bus(bus).await;
476    }
477
478    pub async fn set_multicast_bus(&self, bus: Option<Arc<MulticastNostrBus>>) {
479        let buses = bus
480            .into_iter()
481            .map(|bus| bus as SharedLocalNostrBus)
482            .collect();
483        self.set_local_buses(buses).await;
484    }
485
486    /// Drop all live peer sessions and clear topology-specific state while
487    /// keeping cumulative bandwidth counters intact.
488    pub async fn reset_runtime_state(&self) {
489        self.runtime.reset().await;
490    }
491
492    /// Get current bandwidth stats (bytes sent/received)
493    pub fn get_bandwidth(&self) -> (u64, u64) {
494        self.runtime.get_bandwidth()
495    }
496
497    pub fn get_mesh_stats(&self) -> (u64, u64, u64) {
498        self.runtime.get_mesh_stats()
499    }
500
501    pub fn record_mesh_received(&self) {
502        self.runtime.record_mesh_received();
503    }
504
505    pub fn record_mesh_forwarded(&self, count: u64) {
506        self.runtime.record_mesh_forwarded(count);
507    }
508
509    pub fn record_mesh_duplicate_drop(&self) {
510        self.runtime.record_mesh_duplicate_drop();
511    }
512
513    /// Record bytes sent (global + per-peer)
514    pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
515        self.runtime.record_sent(peer_id, bytes).await;
516    }
517
518    /// Record bytes received (global + per-peer)
519    pub async fn record_received(&self, peer_id: &str, bytes: u64) {
520        self.runtime.record_received(peer_id, bytes).await;
521    }
522
523    /// Request content by hash from connected peers
524    /// Queries peers in adaptive selector order with hedged fanout waves.
525    /// Returns the first successful response, or None if no peer has it
526    pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
527        self.request_from_peers_with_source(hash_hex)
528            .await
529            .map(|(data, _peer_id)| data)
530    }
531
532    /// Request content by hash from connected peers, returning data and source peer.
533    pub async fn request_from_peers_with_source(
534        &self,
535        hash_hex: &str,
536    ) -> Option<(Vec<u8>, String)> {
537        use crate::BLOB_REQUEST_POLICY;
538
539        let peer_hash_get = self.runtime.peer_hash_get_snapshot().await;
540        let peers = self.runtime.peers.read().await;
541
542        let peer_refs: Vec<_> = peers
543            .values()
544            .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
545            .filter_map(|p| {
546                if !peer_hash_get
547                    .get(&p.peer_id.to_string())
548                    .copied()
549                    .unwrap_or(true)
550                {
551                    return None;
552                }
553                p.peer
554                    .clone()
555                    .map(|peer| (p.peer_id.to_string(), peer, p.transport))
556            })
557            .collect();
558
559        drop(peers); // Release the read lock
560
561        let mut connected_peers: Vec<ConnectedPeer> = Vec::new();
562        let mut connected_sessions: Vec<ConnectedSession> = Vec::new();
563        for (peer_id, peer, transport) in peer_refs {
564            if !peer.is_ready() {
565                continue;
566            }
567            if bluetooth_nostr_only_mode() && transport == PeerTransport::Bluetooth {
568                continue;
569            }
570            if let Some(webrtc_peer) = peer.as_webrtc() {
571                let dc_guard = webrtc_peer.data_channel.lock().await;
572                if let Some(dc) = dc_guard.as_ref() {
573                    connected_peers.push((
574                        peer_id.clone(),
575                        webrtc_peer.pending_requests.clone(),
576                        dc.clone(),
577                    ));
578                }
579            }
580            connected_sessions.push((peer_id, peer, transport));
581        }
582
583        if connected_sessions.is_empty() {
584            debug!(
585                "No connected peers to query for {}",
586                &hash_hex[..8.min(hash_hex.len())]
587            );
588            return None;
589        }
590
591        // Convert hex to binary hash once
592        let hash_bytes = match hex::decode(hash_hex) {
593            Ok(b) => b,
594            Err(_) => return None,
595        };
596
597        let expected_hash: [u8; 32] = match hash_bytes.as_slice().try_into() {
598            Ok(h) => h,
599            Err(_) => {
600                debug!(
601                    "Invalid hash length {}, expected 32 bytes",
602                    hash_bytes.len()
603                );
604                return None;
605            }
606        };
607
608        let connected_peer_ids: Vec<String> = connected_sessions
609            .iter()
610            .map(|(peer_id, _, _)| peer_id.clone())
611            .collect();
612        sync_selector_peers(self.peer_selector.as_ref(), &connected_peer_ids).await;
613
614        let ordered_peer_ids = self.peer_selector.write().await.select_peers();
615        let mut quote_by_peer: HashMap<
616            String,
617            (
618                PendingRequestsMap,
619                Arc<webrtc::data_channel::RTCDataChannel>,
620            ),
621        > = connected_peers
622            .iter()
623            .cloned()
624            .map(|(peer_id, pending, dc)| (peer_id, (pending, dc)))
625            .collect();
626        let mut ordered_quote_peers: Vec<ConnectedPeer> = Vec::new();
627        for peer_id in &ordered_peer_ids {
628            if let Some((pending, dc)) = quote_by_peer.remove(peer_id) {
629                ordered_quote_peers.push((peer_id.clone(), pending, dc));
630            }
631        }
632        for (peer_id, (pending, dc)) in quote_by_peer {
633            ordered_quote_peers.push((peer_id, pending, dc));
634        }
635
636        let mut by_peer: HashMap<String, (MeshPeer, PeerTransport)> = connected_sessions
637            .into_iter()
638            .map(|(peer_id, peer, transport)| (peer_id, (peer, transport)))
639            .collect();
640
641        let mut ordered_peers: Vec<ConnectedSession> = Vec::new();
642        for peer_id in ordered_peer_ids {
643            if let Some((peer, transport)) = by_peer.remove(&peer_id) {
644                ordered_peers.push((peer_id, peer, transport));
645            }
646        }
647        for (peer_id, (peer, transport)) in by_peer {
648            ordered_peers.push((peer_id, peer, transport));
649        }
650
651        debug!(
652            "Querying {} peers for {} with shared hedged scheduler",
653            ordered_peers.len(),
654            &hash_hex[..8.min(hash_hex.len())],
655        );
656
657        if let Some((requested_mint, payment_sat, quote_ttl_ms)) =
658            self.cashu_quotes.requester_quote_terms().await
659        {
660            if let Some(quote) = self
661                .request_quote_from_peers(
662                    &hash_bytes,
663                    requested_mint,
664                    payment_sat,
665                    quote_ttl_ms,
666                    &ordered_quote_peers,
667                )
668                .await
669            {
670                if let Some(data) = self
671                    .request_from_single_peer(
672                        hash_hex,
673                        &hash_bytes,
674                        expected_hash,
675                        &quote.peer_id,
676                        Some(&quote),
677                        &ordered_quote_peers,
678                    )
679                    .await
680                {
681                    debug!(
682                        "Got quoted response from peer {} for {}",
683                        quote.peer_id,
684                        &hash_hex[..8.min(hash_hex.len())]
685                    );
686                    return Some((data, quote.peer_id));
687                }
688            }
689        }
690
691        let request = DataRequest {
692            h: hash_bytes.clone(),
693            htl: BLOB_REQUEST_POLICY.max_htl,
694            q: None,
695        };
696        let wire = crate::encode_request(&request);
697        let wire_len = wire.len() as u64;
698        let current_result_rx = Arc::new(Mutex::new(None));
699        if let Some((data, peer_id)) = run_hedged_waves(
700            ordered_peers.len(),
701            self.request_dispatch,
702            self.request_timeout,
703            |range| {
704                let wave_peers = ordered_peers[range].to_vec();
705                let (result_tx, result_rx) =
706                    mpsc::channel::<(String, Instant, Result<Option<Vec<u8>>>)>(wave_peers.len());
707                let current_result_rx = current_result_rx.clone();
708                let hash_hex = hash_hex.to_string();
709                async move {
710                    *current_result_rx.lock().await = Some(result_rx);
711                    let sent = wave_peers.len();
712                    for (peer_id, peer, transport) in wave_peers {
713                        if transport != PeerTransport::Bluetooth {
714                            self.record_sent(&peer_id, wire_len).await;
715                        }
716                        self.peer_selector
717                            .write()
718                            .await
719                            .record_request(&peer_id, wire_len);
720
721                        let result_tx = result_tx.clone();
722                        let peer_id_for_task = peer_id.clone();
723                        let peer = peer.clone();
724                        let hash_hex = hash_hex.clone();
725                        let per_request_timeout = self.request_timeout;
726                        tokio::spawn(async move {
727                            let started = Instant::now();
728                            let result = peer.request(&hash_hex, per_request_timeout).await;
729                            let _ = result_tx.send((peer_id_for_task, started, result)).await;
730                        });
731                    }
732                    drop(result_tx);
733                    sent
734                }
735            },
736            |wait| {
737                let current_result_rx = current_result_rx.clone();
738                async move {
739                    let mut current_result_rx = current_result_rx.lock().await;
740                    let Some(result_rx) = current_result_rx.as_mut() else {
741                        return HedgedWaveAction::Abort;
742                    };
743                    let deadline = Instant::now() + wait;
744                    loop {
745                        let now = Instant::now();
746                        if now >= deadline {
747                            return HedgedWaveAction::Continue;
748                        }
749                        let remaining = deadline.saturating_duration_since(now);
750                        match tokio::time::timeout(remaining, result_rx.recv()).await {
751                            Ok(Some((peer_id, started, Ok(Some(data))))) => {
752                                let rtt_ms = started.elapsed().as_millis() as u64;
753                                if hashtree_core::sha256(&data) == expected_hash {
754                                    let should_record = {
755                                        let peers = self.runtime.peers.read().await;
756                                        peers
757                                            .get(&peer_id)
758                                            .map(|entry| {
759                                                entry.transport != PeerTransport::Bluetooth
760                                            })
761                                            .unwrap_or(true)
762                                    };
763                                    if should_record {
764                                        self.record_received(&peer_id, data.len() as u64).await;
765                                    }
766                                    self.peer_selector.write().await.record_success(
767                                        &peer_id,
768                                        rtt_ms,
769                                        data.len() as u64,
770                                    );
771                                    return HedgedWaveAction::Success((data, peer_id));
772                                }
773                                self.peer_selector.write().await.record_failure(&peer_id);
774                            }
775                            Ok(Some((peer_id, _, Ok(None)))) | Ok(Some((peer_id, _, Err(_)))) => {
776                                self.peer_selector.write().await.record_timeout(&peer_id);
777                            }
778                            Ok(None) | Err(_) => return HedgedWaveAction::Continue,
779                        }
780                    }
781                }
782            },
783        )
784        .await
785        {
786            debug!(
787                "Got response from peer {} for {}",
788                peer_id,
789                &hash_hex[..8.min(hash_hex.len())]
790            );
791            return Some((data, peer_id));
792        }
793
794        debug!(
795            "No peer had data for {}",
796            &hash_hex[..8.min(hash_hex.len())]
797        );
798        None
799    }
800
801    async fn request_quote_from_peers(
802        &self,
803        hash_bytes: &[u8],
804        requested_mint: String,
805        payment_sat: u64,
806        quote_ttl_ms: u32,
807        ordered_peers: &[ConnectedPeer],
808    ) -> Option<NegotiatedQuote> {
809        if ordered_peers.is_empty() || quote_ttl_ms == 0 {
810            return None;
811        }
812
813        let hash_hex = hex::encode(hash_bytes);
814        let rx = self
815            .cashu_quotes
816            .register_pending_quote(hash_hex.clone(), Some(requested_mint.clone()), payment_sat)
817            .await;
818        let quote_request = DataQuoteRequest {
819            h: hash_bytes.to_vec(),
820            p: payment_sat,
821            t: quote_ttl_ms,
822            m: Some(requested_mint),
823        };
824        let wire = crate::encode_quote_request(&quote_request);
825        let rx = Arc::new(Mutex::new(rx));
826        let result = run_hedged_waves(
827            ordered_peers.len(),
828            self.request_dispatch,
829            self.request_timeout,
830            |range| {
831                let wave_peers = ordered_peers[range].to_vec();
832                let wire = wire.clone();
833                async move {
834                    let mut sent = 0usize;
835                    for (_, _, dc) in wave_peers {
836                        if dc.send(&bytes::Bytes::copy_from_slice(&wire)).await.is_ok() {
837                            sent += 1;
838                        }
839                    }
840                    sent
841                }
842            },
843            |wait| {
844                let rx = rx.clone();
845                async move {
846                    let mut rx = rx.lock().await;
847                    match tokio::time::timeout(wait, &mut *rx).await {
848                        Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
849                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
850                        Err(_) => HedgedWaveAction::Continue,
851                    }
852                }
853            },
854        )
855        .await;
856
857        self.cashu_quotes.clear_pending_quote(&hash_hex).await;
858        result
859    }
860
861    async fn request_from_single_peer(
862        &self,
863        hash_hex: &str,
864        hash_bytes: &[u8],
865        expected_hash: [u8; 32],
866        target_peer_id: &str,
867        quote: Option<&NegotiatedQuote>,
868        ordered_peers: &[ConnectedPeer],
869    ) -> Option<Vec<u8>> {
870        use crate::BLOB_REQUEST_POLICY;
871
872        let (pending_requests, dc) = ordered_peers
873            .iter()
874            .find(|(peer_id, _, _)| peer_id == target_peer_id)
875            .map(|(_, pending_requests, dc)| (pending_requests.clone(), dc.clone()))?;
876
877        let request = DataRequest {
878            h: hash_bytes.to_vec(),
879            htl: BLOB_REQUEST_POLICY.max_htl,
880            q: quote.map(|quote| quote.quote_id),
881        };
882        let wire = crate::encode_request(&request);
883        let wire_len = wire.len() as u64;
884        let sent_at = Instant::now();
885        let (tx, mut rx) = tokio::sync::oneshot::channel();
886
887        {
888            let mut pending = pending_requests.lock().await;
889            pending.insert(
890                hash_hex.to_string(),
891                if let Some(quote) = quote {
892                    PendingRequest::quoted(
893                        hash_bytes.to_vec(),
894                        tx,
895                        quote.quote_id,
896                        quote.mint_url.clone().unwrap_or_default(),
897                        quote.payment_sat,
898                    )
899                } else {
900                    PendingRequest::standard(hash_bytes.to_vec(), tx)
901                },
902            );
903        }
904
905        if dc
906            .send(&bytes::Bytes::copy_from_slice(&wire))
907            .await
908            .is_err()
909        {
910            let mut pending = pending_requests.lock().await;
911            pending.remove(hash_hex);
912            self.peer_selector
913                .write()
914                .await
915                .record_failure(target_peer_id);
916            return None;
917        }
918
919        self.record_sent(target_peer_id, wire_len).await;
920        self.peer_selector
921            .write()
922            .await
923            .record_request(target_peer_id, wire_len);
924
925        let wait_timeout = if let Some(quote) = quote {
926            let multiplier = quote.payment_sat.clamp(1, 32) as u128;
927            let extra_ms = self
928                .cashu_quotes
929                .settlement_timeout()
930                .as_millis()
931                .saturating_mul(multiplier);
932            self.request_timeout + Duration::from_millis(extra_ms.min(u64::MAX as u128) as u64)
933        } else {
934            self.request_timeout
935        };
936
937        match tokio::time::timeout(wait_timeout, &mut rx).await {
938            Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == expected_hash => {
939                let rtt_ms = sent_at.elapsed().as_millis() as u64;
940                self.record_received(target_peer_id, data.len() as u64)
941                    .await;
942                self.peer_selector.write().await.record_success(
943                    target_peer_id,
944                    rtt_ms,
945                    data.len() as u64,
946                );
947                Some(data)
948            }
949            Ok(Ok(Some(_))) => {
950                self.peer_selector
951                    .write()
952                    .await
953                    .record_failure(target_peer_id);
954                let pending = pending_requests.lock().await.remove(hash_hex);
955                if let Some(pending) = pending {
956                    if let Some(quoted) = pending.quoted {
957                        if let Some(in_flight) = quoted.in_flight_payment {
958                            let _ = self
959                                .cashu_quotes
960                                .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
961                                .await;
962                        }
963                    }
964                }
965                None
966            }
967            Ok(Ok(None)) | Ok(Err(_)) | Err(_) => {
968                let pending = pending_requests.lock().await.remove(hash_hex);
969                if let Some(pending) = pending {
970                    if let Some(quoted) = pending.quoted {
971                        if let Some(in_flight) = quoted.in_flight_payment {
972                            let _ = self
973                                .cashu_quotes
974                                .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
975                                .await;
976                        }
977                    }
978                }
979                self.peer_selector
980                    .write()
981                    .await
982                    .record_timeout(target_peer_id);
983                None
984            }
985        }
986    }
987
988    /// Resolve a hashtree root event through connected peers using Nostr REQ/EOSE over WebRTC.
989    pub async fn resolve_root_from_peers(
990        &self,
991        owner_pubkey: &str,
992        tree_name: &str,
993        per_peer_timeout: Duration,
994    ) -> Option<PeerRootEvent> {
995        let peer_refs: Vec<(String, Arc<dyn MeshSession>)> = {
996            let peers = self.runtime.peers.read().await;
997            peers
998                .values()
999                .filter(|entry| entry.state == ConnectionState::Connected)
1000                .filter(|entry| {
1001                    !bluetooth_nostr_only_mode() || entry.transport != PeerTransport::Bluetooth
1002                })
1003                .filter_map(|entry| {
1004                    let peer = entry.peer.as_ref()?;
1005                    Some((
1006                        entry.peer_id.short(),
1007                        Arc::new(peer.clone()) as Arc<dyn MeshSession>,
1008                    ))
1009                })
1010                .collect()
1011        };
1012
1013        let resolved =
1014            resolve_root_via_peer_sessions(peer_refs, owner_pubkey, tree_name, per_peer_timeout)
1015                .await;
1016        if let Some(root) = &resolved {
1017            debug!(
1018                "Resolved {}/{} via peer {} event {}",
1019                owner_pubkey, tree_name, root.peer_id, root.event_id
1020            );
1021        }
1022        resolved
1023    }
1024
1025    pub async fn resolve_root_from_local_buses_with_source(
1026        &self,
1027        owner_pubkey: &str,
1028        tree_name: &str,
1029        timeout: Duration,
1030    ) -> Option<(&'static str, PeerRootEvent)> {
1031        self.runtime
1032            .resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
1033            .await
1034    }
1035
1036    pub async fn resolve_root_from_local_buses(
1037        &self,
1038        owner_pubkey: &str,
1039        tree_name: &str,
1040        timeout: Duration,
1041    ) -> Option<PeerRootEvent> {
1042        self.resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
1043            .await
1044            .map(|(_, root)| root)
1045    }
1046
1047    pub async fn resolve_root_from_multicast(
1048        &self,
1049        owner_pubkey: &str,
1050        tree_name: &str,
1051        timeout: Duration,
1052    ) -> Option<PeerRootEvent> {
1053        self.resolve_root_from_local_buses(owner_pubkey, tree_name, timeout)
1054            .await
1055    }
1056}
1057
1058impl Default for WebRTCState {
1059    fn default() -> Self {
1060        Self::new()
1061    }
1062}
1063
1064/// Native mesh manager handles peer discovery and transport fan-out.
1065pub struct WebRTCManager {
1066    config: WebRTCConfig,
1067    my_peer_id: PeerId,
1068    keys: Keys,
1069    state: Arc<WebRTCState>,
1070    shutdown: Arc<tokio::sync::watch::Sender<bool>>,
1071    shutdown_rx: tokio::sync::watch::Receiver<bool>,
1072    /// Channel to send signaling messages to relays
1073    signaling_tx: mpsc::Sender<SignalingMessage>,
1074    signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
1075    /// Optional content store for serving hash requests
1076    store: Option<Arc<dyn ContentStore>>,
1077    /// Peer classifier for pool assignment
1078    peer_classifier: PeerClassifier,
1079    /// Optional Nostr relay for data-channel relay messages
1080    nostr_relay: Option<SharedMeshRelayClient>,
1081    local_buses: Vec<SharedLocalNostrBus>,
1082    /// Channel for peer state events (connection success/failure)
1083    state_event_tx: mpsc::Sender<PeerStateEvent>,
1084    state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
1085    /// Channel for relayless mesh signaling frames received from peers.
1086    mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
1087    mesh_frame_rx: Option<mpsc::Receiver<(PeerId, MeshNostrFrame)>>,
1088    shared_router: Option<Arc<SharedProductionRouter>>,
1089    seen_frame_ids: Arc<Mutex<TimedSeenSet>>,
1090    seen_event_ids: Arc<Mutex<TimedSeenSet>>,
1091}
1092
1093impl WebRTCManager {
1094    /// Create a new WebRTC manager
1095    pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
1096        let pubkey = keys.public_key().to_hex();
1097        let my_peer_id = PeerId::new(pubkey);
1098        let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
1099        let (signaling_tx, signaling_rx) = mpsc::channel(100);
1100        let (state_event_tx, state_event_rx) = mpsc::channel(100);
1101        let (mesh_frame_tx, mesh_frame_rx) = mpsc::channel(256);
1102        let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1103            config.request_selection_strategy,
1104            config.request_fairness_enabled,
1105            config.request_dispatch,
1106            Duration::from_millis(config.message_timeout_ms),
1107            CashuRoutingConfig::default(),
1108            None,
1109            None,
1110        ));
1111
1112        // Default classifier: all peers go to 'other' pool
1113        let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
1114
1115        Self {
1116            config,
1117            my_peer_id,
1118            keys,
1119            state,
1120            shutdown: Arc::new(shutdown),
1121            shutdown_rx,
1122            signaling_tx,
1123            signaling_rx: Some(signaling_rx),
1124            store: None,
1125            peer_classifier,
1126            nostr_relay: None,
1127            local_buses: Vec::new(),
1128            state_event_tx,
1129            state_event_rx: Some(state_event_rx),
1130            mesh_frame_tx,
1131            mesh_frame_rx: Some(mesh_frame_rx),
1132            shared_router: None,
1133            seen_frame_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1134                SEEN_FRAME_CAP,
1135                SEEN_FRAME_TTL,
1136            ))),
1137            seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1138                SEEN_EVENT_CAP,
1139                SEEN_EVENT_TTL,
1140            ))),
1141        }
1142    }
1143
1144    /// Create a new WebRTC manager reusing an existing shared state object.
1145    pub fn new_with_state(keys: Keys, config: WebRTCConfig, state: Arc<WebRTCState>) -> Self {
1146        let mut manager = Self::new(keys, config);
1147        manager.state = state;
1148        manager
1149    }
1150
1151    /// Create a new WebRTC manager with a peer classifier
1152    pub fn new_with_classifier(
1153        keys: Keys,
1154        config: WebRTCConfig,
1155        classifier: PeerClassifier,
1156    ) -> Self {
1157        let mut manager = Self::new(keys, config);
1158        manager.peer_classifier = classifier;
1159        manager
1160    }
1161
1162    /// Create a new WebRTC manager with a content store for serving hash requests
1163    pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
1164        let mut manager = Self::new(keys, config);
1165        manager.store = Some(store);
1166        manager
1167    }
1168
1169    /// Create a new WebRTC manager with store and classifier
1170    pub fn new_with_store_and_classifier(
1171        keys: Keys,
1172        config: WebRTCConfig,
1173        store: Arc<dyn ContentStore>,
1174        classifier: PeerClassifier,
1175    ) -> Self {
1176        Self::new_with_store_and_classifier_and_cashu(
1177            keys,
1178            config,
1179            store,
1180            classifier,
1181            CashuRoutingConfig::default(),
1182            None,
1183            None,
1184        )
1185    }
1186
1187    pub fn new_with_state_and_store_and_classifier(
1188        keys: Keys,
1189        config: WebRTCConfig,
1190        state: Arc<WebRTCState>,
1191        store: Arc<dyn ContentStore>,
1192        classifier: PeerClassifier,
1193    ) -> Self {
1194        let mut manager = Self::new_with_state(keys, config, state);
1195        manager.store = Some(store);
1196        manager.peer_classifier = classifier;
1197        manager
1198    }
1199
1200    pub fn new_with_store_and_classifier_and_cashu(
1201        keys: Keys,
1202        config: WebRTCConfig,
1203        store: Arc<dyn ContentStore>,
1204        classifier: PeerClassifier,
1205        cashu_routing: CashuRoutingConfig,
1206        payment_client: Option<Arc<dyn CashuPaymentClient>>,
1207        mint_metadata: Option<Arc<CashuMintMetadataStore>>,
1208    ) -> Self {
1209        let mut manager = Self::new(keys, config);
1210        manager.state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1211            manager.config.request_selection_strategy,
1212            manager.config.request_fairness_enabled,
1213            manager.config.request_dispatch,
1214            Duration::from_millis(manager.config.message_timeout_ms),
1215            cashu_routing,
1216            payment_client,
1217            mint_metadata,
1218        ));
1219        manager.store = Some(store);
1220        manager.peer_classifier = classifier;
1221        manager
1222    }
1223
1224    /// Set the content store for serving hash requests
1225    pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
1226        self.store = Some(store);
1227    }
1228
1229    /// Set the peer classifier
1230    pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
1231        self.peer_classifier = classifier;
1232    }
1233
1234    /// Set the Nostr relay for data-channel relay messages
1235    pub fn set_nostr_relay(&mut self, relay: SharedMeshRelayClient) {
1236        self.nostr_relay = Some(relay);
1237    }
1238
1239    /// Get my peer ID
1240    pub fn my_peer_id(&self) -> &PeerId {
1241        &self.my_peer_id
1242    }
1243
1244    /// Get shared state for external access
1245    pub fn state(&self) -> Arc<WebRTCState> {
1246        self.state.clone()
1247    }
1248
1249    /// Cloneable shutdown handle for external lifecycle control.
1250    pub fn shutdown_signal(&self) -> Arc<tokio::sync::watch::Sender<bool>> {
1251        self.shutdown.clone()
1252    }
1253
1254    /// Signal shutdown
1255    pub fn shutdown(&self) {
1256        let _ = self.shutdown.send(true);
1257    }
1258
1259    /// Get connected peer count
1260    pub async fn connected_count(&self) -> usize {
1261        self.state
1262            .runtime
1263            .connected_count
1264            .load(std::sync::atomic::Ordering::Relaxed)
1265    }
1266
1267    /// Get all peer statuses
1268    pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
1269        self.state
1270            .runtime
1271            .peers
1272            .read()
1273            .await
1274            .values()
1275            .map(|p| PeerStatus {
1276                peer_id: p.peer_id.to_string(),
1277                pubkey: p.peer_id.pubkey.clone(),
1278                state: p.state.to_string(),
1279                direction: p.direction,
1280                connected_at: Some(p.last_seen),
1281                pool: p.pool,
1282            })
1283            .collect()
1284    }
1285
1286    /// Get pool counts
1287    /// Returns (follows_connected, follows_active, other_connected, other_active)
1288    /// "active" = Connected or Connecting (excludes Discovered and Failed)
1289    pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
1290        let peers = self.state.runtime.peers.read().await;
1291        let mut follows_connected = 0;
1292        let mut follows_active = 0;
1293        let mut other_connected = 0;
1294        let mut other_active = 0;
1295
1296        for entry in peers.values() {
1297            // Only count Connected or Connecting as "active" connections
1298            // Discovered peers are just seen hellos, not real connections
1299            let is_active = entry.state == ConnectionState::Connected
1300                || entry.state == ConnectionState::Connecting;
1301
1302            match entry.pool {
1303                PeerPool::Follows => {
1304                    if is_active {
1305                        follows_active += 1;
1306                    }
1307                    if entry.state == ConnectionState::Connected {
1308                        follows_connected += 1;
1309                    }
1310                }
1311                PeerPool::Other => {
1312                    if is_active {
1313                        other_active += 1;
1314                    }
1315                    if entry.state == ConnectionState::Connected {
1316                        other_connected += 1;
1317                    }
1318                }
1319            }
1320        }
1321
1322        (
1323            follows_connected,
1324            follows_active,
1325            other_connected,
1326            other_active,
1327        )
1328    }
1329
1330    fn local_bus_max_peers(&self, source: &str) -> Option<usize> {
1331        match source {
1332            "multicast" => Some(self.config.multicast.max_peers),
1333            WIFI_AWARE_SOURCE => Some(self.config.wifi_aware.max_peers),
1334            _ => None,
1335        }
1336    }
1337
1338    #[cfg_attr(not(test), allow(dead_code))]
1339    fn can_track_local_bus_peer(
1340        &self,
1341        source: &str,
1342        peer_key: &str,
1343        peers: &HashMap<String, PeerEntry>,
1344    ) -> bool {
1345        can_track_source_peer(source, peer_key, peers, self.local_bus_max_peers(source))
1346    }
1347}
1348
1349mod runtime;