Skip to main content

hashtree_network/manager/
mod.rs

1//! WebRTC signaling over Nostr relays
2//!
3//! Protocol (compatible with hashtree-ts):
4//! - All signaling uses ephemeral kind 25050
5//! - Hello messages: #l: "hello" tag, broadcast for peer discovery (unencrypted)
6//! - Directed signaling (offer, answer, candidate, candidates): NIP-17 style
7//!   gift wrap for privacy - wrapped with ephemeral key, #p tag with recipient
8//!
9//! Security: Directed messages use gift wrapping with ephemeral keys so that
10//! relays cannot see the actual sender or correlate messages.
11
12pub use crate::{ConnectionState, PeerSignalPath, PeerTransport};
13use anyhow::Result;
14use async_trait::async_trait;
15use cashu_service::CashuPaymentClient;
16use nostr_sdk::nostr::Keys;
17#[cfg(test)]
18use nostr_sdk::nostr::Kind;
19use std::collections::{BTreeSet, HashMap};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::{mpsc, Mutex, RwLock};
23use tracing::{debug, error, info, warn};
24
25use crate::bluetooth::{
26    BluetoothConfig, BluetoothMesh, BluetoothPeerRegistrar, BluetoothRuntimeContext,
27};
28use crate::cashu::{CashuMintMetadataStore, CashuQuoteState, CashuRoutingConfig, NegotiatedQuote};
29use crate::local_bus::SharedLocalNostrBus;
30use crate::mesh_session::{
31    resolve_root_from_peer_sessions as resolve_root_via_peer_sessions, MeshSession,
32};
33use crate::mesh_store_core::{
34    run_hedged_waves, sync_selector_peers, HedgedWaveAction, RequestDispatchConfig,
35};
36use crate::multicast::{MulticastConfig, MulticastNostrBus};
37use crate::nostr::NostrRelayTransport;
38use crate::peer::{ContentStore, Peer, PendingRequest};
39use crate::peer_selector::PeerSelector;
40use crate::protocol::{DataQuoteRequest, DataRequest};
41use crate::relay_bridge::SharedMeshRelayClient;
42use crate::root_events::PeerRootEvent;
43use crate::runtime_control::{can_track_source_peer, PeerStateEvent};
44use crate::runtime_peer::{
45    MeshPeerEntry as SharedPeerEntry, PeerClassifier as SharedPeerClassifier,
46};
47use crate::runtime_state::MeshRuntimeState;
48use crate::session::MeshPeer;
49use crate::signaling::MeshRouter;
50use crate::transport::{
51    PeerLink as SharedPeerLink, PeerLinkFactory as SharedPeerLinkFactory,
52    SignalingTransport as SharedSignalingTransport, TransportError as SharedTransportError,
53};
54use crate::types::{
55    validate_mesh_frame, MeshNostrFrame, MeshNostrPayload, SignalingMessage, TimedSeenSet,
56};
57use crate::wifi_aware::{
58    mobile_wifi_aware_bridge, WifiAwareConfig, WifiAwareNostrBus, WIFI_AWARE_SOURCE,
59};
60use crate::{
61    ClassifyRequest as SharedClassifyRequest, IceCandidate as SharedIceCandidate, PeerDirection,
62    PeerId, PeerPool, MESH_SIGNALING_EVENT_KIND,
63};
64
65/// Callback type for classifying peers into pools
66pub type PeerClassifier = SharedPeerClassifier;
67pub type PeerEntry = SharedPeerEntry<MeshPeer>;
68
69#[derive(Clone)]
70pub struct WebRTCConfig {
71    pub relays: Vec<String>,
72    pub signaling_enabled: bool,
73    pub max_outbound: usize,
74    pub max_inbound: usize,
75    pub hello_interval_ms: u64,
76    pub message_timeout_ms: u64,
77    pub stun_servers: Vec<String>,
78    pub debug: bool,
79    pub multicast: MulticastConfig,
80    pub wifi_aware: WifiAwareConfig,
81    pub bluetooth: BluetoothConfig,
82    pub pools: crate::PoolSettings,
83    pub request_selection_strategy: crate::SelectionStrategy,
84    pub request_fairness_enabled: bool,
85    pub request_dispatch: RequestDispatchConfig,
86}
87
88impl Default for WebRTCConfig {
89    fn default() -> Self {
90        Self {
91            relays: vec![
92                "wss://relay.damus.io".to_string(),
93                "wss://relay.primal.net".to_string(),
94                "wss://temp.iris.to".to_string(),
95                "wss://relay.snort.social".to_string(),
96            ],
97            signaling_enabled: true,
98            max_outbound: 6,
99            max_inbound: 6,
100            hello_interval_ms: 3000,
101            message_timeout_ms: 15000,
102            stun_servers: vec![
103                "stun:stun.iris.to:3478".to_string(),
104                "stun:stun.l.google.com:19302".to_string(),
105                "stun:stun.cloudflare.com:3478".to_string(),
106            ],
107            debug: false,
108            multicast: MulticastConfig::default(),
109            wifi_aware: WifiAwareConfig::default(),
110            bluetooth: BluetoothConfig::default(),
111            pools: crate::PoolSettings::default(),
112            request_selection_strategy: crate::SelectionStrategy::TitForTat,
113            request_fairness_enabled: true,
114            request_dispatch: RequestDispatchConfig {
115                initial_fanout: 2,
116                hedge_fanout: 1,
117                max_fanout: 8,
118                hedge_interval_ms: 120,
119            },
120        }
121    }
122}
123
124#[derive(Debug, Clone)]
125pub struct PeerStatus {
126    pub peer_id: String,
127    pub pubkey: String,
128    pub state: String,
129    pub direction: PeerDirection,
130    pub connected_at: Option<std::time::Instant>,
131    pub pool: PeerPool,
132}
133
134fn bluetooth_nostr_only_mode() -> bool {
135    matches!(
136        std::env::var("HTREE_BLUETOOTH_NOSTR_ONLY").ok().as_deref(),
137        Some("1" | "true" | "TRUE" | "yes" | "YES")
138    )
139}
140
141/// Shared state for the native mesh router.
142pub struct WebRTCState {
143    pub runtime: MeshRuntimeState<MeshPeer>,
144    /// Shared peer selector used by live retrieval; aligned with simulation strategies.
145    peer_selector: Arc<RwLock<PeerSelector>>,
146    /// Hedged dispatch policy for retrieval requests.
147    request_dispatch: RequestDispatchConfig,
148    /// Retrieval timeout for quote negotiation and single-peer fetches.
149    request_timeout: Duration,
150    /// Shared Cashu quote negotiation policy/state.
151    cashu_quotes: Arc<CashuQuoteState>,
152}
153const SEEN_FRAME_CAP: usize = 4096;
154const SEEN_FRAME_TTL: Duration = Duration::from_secs(120);
155const SEEN_EVENT_CAP: usize = 8192;
156const SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
157
158type PendingRequestsMap = Arc<Mutex<HashMap<String, PendingRequest>>>;
159type ConnectedPeer = (
160    String,
161    PendingRequestsMap,
162    Arc<webrtc::data_channel::RTCDataChannel>,
163);
164type ConnectedSession = (String, MeshPeer, PeerTransport);
165type SharedProductionRouter = MeshRouter<RouterSignalingBridge, SharedRouterPeerFactory>;
166
167#[derive(Clone)]
168struct RouterSignalingBridge {
169    peer_id: String,
170    signaling_tx: mpsc::Sender<SignalingMessage>,
171}
172
173impl RouterSignalingBridge {
174    fn new(peer_id: String, signaling_tx: mpsc::Sender<SignalingMessage>) -> Self {
175        Self {
176            peer_id,
177            signaling_tx,
178        }
179    }
180}
181
182#[async_trait]
183impl SharedSignalingTransport for RouterSignalingBridge {
184    async fn connect(&self, _relays: &[String]) -> Result<(), SharedTransportError> {
185        Ok(())
186    }
187
188    async fn disconnect(&self) {}
189
190    async fn publish(&self, msg: SignalingMessage) -> Result<(), SharedTransportError> {
191        self.signaling_tx
192            .send(msg)
193            .await
194            .map_err(|e| SharedTransportError::SendFailed(e.to_string()))
195    }
196
197    async fn recv(&self) -> Option<SignalingMessage> {
198        None
199    }
200
201    fn try_recv(&self) -> Option<SignalingMessage> {
202        None
203    }
204
205    fn peer_id(&self) -> &str {
206        &self.peer_id
207    }
208}
209
210struct SharedRouterPeerFactory {
211    my_peer_id: PeerId,
212    signaling_tx: mpsc::Sender<SignalingMessage>,
213    stun_servers: Vec<String>,
214    store: Option<Arc<dyn ContentStore>>,
215    state: Arc<WebRTCState>,
216    state_event_tx: mpsc::Sender<PeerStateEvent>,
217    nostr_relay: Option<SharedMeshRelayClient>,
218    mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
219    peer_classifier: PeerClassifier,
220    peers: RwLock<HashMap<String, Arc<Peer>>>,
221}
222
223impl SharedRouterPeerFactory {
224    fn new(
225        my_peer_id: PeerId,
226        signaling_tx: mpsc::Sender<SignalingMessage>,
227        stun_servers: Vec<String>,
228        store: Option<Arc<dyn ContentStore>>,
229        state: Arc<WebRTCState>,
230        state_event_tx: mpsc::Sender<PeerStateEvent>,
231        nostr_relay: Option<SharedMeshRelayClient>,
232        mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
233        peer_classifier: PeerClassifier,
234    ) -> Self {
235        Self {
236            my_peer_id,
237            signaling_tx,
238            stun_servers,
239            store,
240            state,
241            state_event_tx,
242            nostr_relay,
243            mesh_frame_tx,
244            peer_classifier,
245            peers: RwLock::new(HashMap::new()),
246        }
247    }
248
249    async fn register_peer(&self, peer_id: PeerId, direction: PeerDirection, peer: Arc<Peer>) {
250        let peer_key = peer_id.to_string();
251        let pool = (self.peer_classifier)(&peer_id.pubkey);
252        self.peers
253            .write()
254            .await
255            .insert(peer_key.clone(), peer.clone());
256
257        let mut peers = self.state.runtime.peers.write().await;
258        peers.insert(
259            peer_key,
260            PeerEntry {
261                peer_id,
262                direction,
263                state: ConnectionState::Connecting,
264                last_seen: Instant::now(),
265                peer: Some(MeshPeer::WebRtc(peer)),
266                pool,
267                transport: PeerTransport::WebRtc,
268                signal_paths: BTreeSet::from([PeerSignalPath::Relay]),
269                bytes_sent: 0,
270                bytes_received: 0,
271            },
272        );
273    }
274
275    async fn create_peer(
276        &self,
277        peer_id: PeerId,
278        direction: PeerDirection,
279    ) -> Result<Peer, SharedTransportError> {
280        Peer::new_with_store_and_events(
281            peer_id,
282            direction,
283            self.my_peer_id.clone(),
284            self.signaling_tx.clone(),
285            self.stun_servers.clone(),
286            self.store.clone(),
287            Some(self.state_event_tx.clone()),
288            self.nostr_relay.clone(),
289            Some(self.mesh_frame_tx.clone()),
290            Some(self.state.cashu_quotes.clone()),
291        )
292        .await
293        .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))
294    }
295}
296
297#[async_trait]
298impl SharedPeerLinkFactory for SharedRouterPeerFactory {
299    async fn create_offer(
300        &self,
301        target_peer_id: &str,
302    ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
303        let target_peer = PeerId::from_string(target_peer_id).ok_or_else(|| {
304            SharedTransportError::ConnectionFailed(format!("invalid peer id {target_peer_id}"))
305        })?;
306        let peer = Arc::new(
307            self.create_peer(target_peer.clone(), PeerDirection::Outbound)
308                .await?,
309        );
310        peer.setup_handlers()
311            .await
312            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
313        let offer = peer
314            .connect()
315            .await
316            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
317        let sdp = offer
318            .get("sdp")
319            .and_then(|value| value.as_str())
320            .ok_or_else(|| {
321                SharedTransportError::ConnectionFailed("missing SDP in CLI peer offer".to_string())
322            })?
323            .to_string();
324        self.register_peer(target_peer, PeerDirection::Outbound, peer.clone())
325            .await;
326        Ok((peer as Arc<dyn SharedPeerLink>, sdp))
327    }
328
329    async fn accept_offer(
330        &self,
331        from_peer_id: &str,
332        offer_sdp: &str,
333    ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
334        let from_peer = PeerId::from_string(from_peer_id).ok_or_else(|| {
335            SharedTransportError::ConnectionFailed(format!("invalid peer id {from_peer_id}"))
336        })?;
337        let peer = Arc::new(
338            self.create_peer(from_peer.clone(), PeerDirection::Inbound)
339                .await?,
340        );
341        peer.setup_handlers()
342            .await
343            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
344        let answer = peer
345            .handle_offer(serde_json::json!({ "type": "offer", "sdp": offer_sdp }))
346            .await
347            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
348        let sdp = answer
349            .get("sdp")
350            .and_then(|value| value.as_str())
351            .ok_or_else(|| {
352                SharedTransportError::ConnectionFailed("missing SDP in CLI peer answer".to_string())
353            })?
354            .to_string();
355        self.register_peer(from_peer, PeerDirection::Inbound, peer.clone())
356            .await;
357        Ok((peer as Arc<dyn SharedPeerLink>, sdp))
358    }
359
360    async fn handle_answer(
361        &self,
362        target_peer_id: &str,
363        answer_sdp: &str,
364    ) -> Result<Arc<dyn SharedPeerLink>, SharedTransportError> {
365        let peer = self
366            .peers
367            .read()
368            .await
369            .get(target_peer_id)
370            .cloned()
371            .ok_or_else(|| {
372                SharedTransportError::ConnectionFailed(format!(
373                    "missing outbound peer for {target_peer_id}"
374                ))
375            })?;
376        peer.handle_answer(serde_json::json!({ "type": "answer", "sdp": answer_sdp }))
377            .await
378            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
379        Ok(peer as Arc<dyn SharedPeerLink>)
380    }
381
382    async fn handle_candidate(
383        &self,
384        peer_id: &str,
385        candidate: SharedIceCandidate,
386    ) -> Result<(), SharedTransportError> {
387        let peer = self.peers.read().await.get(peer_id).cloned();
388        if let Some(peer) = peer {
389            peer.handle_candidate(serde_json::json!({
390                "candidate": candidate.candidate,
391                "sdpMLineIndex": candidate.sdp_m_line_index,
392                "sdpMid": candidate.sdp_mid,
393            }))
394            .await
395            .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
396        }
397        Ok(())
398    }
399
400    async fn remove_peer(&self, peer_id: &str) -> Result<(), SharedTransportError> {
401        self.peers.write().await.remove(peer_id);
402        Ok(())
403    }
404}
405
406impl WebRTCState {
407    pub fn new() -> Self {
408        let cfg = WebRTCConfig::default();
409        Self::new_with_routing_and_cashu(
410            cfg.request_selection_strategy,
411            cfg.request_fairness_enabled,
412            cfg.request_dispatch,
413            Duration::from_millis(cfg.message_timeout_ms),
414            CashuRoutingConfig::default(),
415            None,
416            None,
417        )
418    }
419
420    pub fn new_with_routing(
421        selection_strategy: crate::SelectionStrategy,
422        fairness_enabled: bool,
423        request_dispatch: RequestDispatchConfig,
424    ) -> Self {
425        let cfg = WebRTCConfig::default();
426        Self::new_with_routing_and_cashu(
427            selection_strategy,
428            fairness_enabled,
429            request_dispatch,
430            Duration::from_millis(cfg.message_timeout_ms),
431            CashuRoutingConfig::default(),
432            None,
433            None,
434        )
435    }
436
437    pub fn new_with_routing_and_cashu(
438        selection_strategy: crate::SelectionStrategy,
439        fairness_enabled: bool,
440        request_dispatch: RequestDispatchConfig,
441        request_timeout: Duration,
442        cashu_routing: CashuRoutingConfig,
443        payment_client: Option<Arc<dyn CashuPaymentClient>>,
444        mint_metadata: Option<Arc<CashuMintMetadataStore>>,
445    ) -> Self {
446        let mut selector = PeerSelector::with_strategy(selection_strategy);
447        selector.set_fairness(fairness_enabled);
448        let peer_selector = Arc::new(RwLock::new(selector));
449        let cashu_quotes = Arc::new(if let Some(mint_metadata) = mint_metadata {
450            CashuQuoteState::new_with_mint_metadata(
451                cashu_routing,
452                peer_selector.clone(),
453                payment_client,
454                mint_metadata,
455            )
456        } else {
457            CashuQuoteState::new(cashu_routing, peer_selector.clone(), payment_client)
458        });
459        Self {
460            runtime: MeshRuntimeState::new(),
461            peer_selector,
462            request_dispatch,
463            request_timeout,
464            cashu_quotes,
465        }
466    }
467
468    pub async fn set_local_buses(&self, buses: Vec<SharedLocalNostrBus>) {
469        self.runtime.set_local_buses(buses).await;
470    }
471
472    pub async fn add_local_bus(&self, bus: SharedLocalNostrBus) {
473        self.runtime.add_local_bus(bus).await;
474    }
475
476    pub async fn set_multicast_bus(&self, bus: Option<Arc<MulticastNostrBus>>) {
477        let buses = bus
478            .into_iter()
479            .map(|bus| bus as SharedLocalNostrBus)
480            .collect();
481        self.set_local_buses(buses).await;
482    }
483
484    /// Drop all live peer sessions and clear topology-specific state while
485    /// keeping cumulative bandwidth counters intact.
486    pub async fn reset_runtime_state(&self) {
487        self.runtime.reset().await;
488    }
489
490    /// Get current bandwidth stats (bytes sent/received)
491    pub fn get_bandwidth(&self) -> (u64, u64) {
492        self.runtime.get_bandwidth()
493    }
494
495    pub fn get_mesh_stats(&self) -> (u64, u64, u64) {
496        self.runtime.get_mesh_stats()
497    }
498
499    pub fn record_mesh_received(&self) {
500        self.runtime.record_mesh_received();
501    }
502
503    pub fn record_mesh_forwarded(&self, count: u64) {
504        self.runtime.record_mesh_forwarded(count);
505    }
506
507    pub fn record_mesh_duplicate_drop(&self) {
508        self.runtime.record_mesh_duplicate_drop();
509    }
510
511    /// Record bytes sent (global + per-peer)
512    pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
513        self.runtime.record_sent(peer_id, bytes).await;
514    }
515
516    /// Record bytes received (global + per-peer)
517    pub async fn record_received(&self, peer_id: &str, bytes: u64) {
518        self.runtime.record_received(peer_id, bytes).await;
519    }
520
521    /// Request content by hash from connected peers
522    /// Queries peers in adaptive selector order with hedged fanout waves.
523    /// Returns the first successful response, or None if no peer has it
524    pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
525        self.request_from_peers_with_source(hash_hex)
526            .await
527            .map(|(data, _peer_id)| data)
528    }
529
530    /// Request content by hash from connected peers, returning data and source peer.
531    pub async fn request_from_peers_with_source(
532        &self,
533        hash_hex: &str,
534    ) -> Option<(Vec<u8>, String)> {
535        use crate::BLOB_REQUEST_POLICY;
536
537        let peers = self.runtime.peers.read().await;
538
539        let peer_refs: Vec<_> = peers
540            .values()
541            .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
542            .filter_map(|p| {
543                p.peer
544                    .clone()
545                    .map(|peer| (p.peer_id.to_string(), peer, p.transport))
546            })
547            .collect();
548
549        drop(peers); // Release the read lock
550
551        let mut connected_peers: Vec<ConnectedPeer> = Vec::new();
552        let mut connected_sessions: Vec<ConnectedSession> = Vec::new();
553        for (peer_id, peer, transport) in peer_refs {
554            if !peer.is_ready() {
555                continue;
556            }
557            if bluetooth_nostr_only_mode() && transport == PeerTransport::Bluetooth {
558                continue;
559            }
560            if let Some(webrtc_peer) = peer.as_webrtc() {
561                let dc_guard = webrtc_peer.data_channel.lock().await;
562                if let Some(dc) = dc_guard.as_ref() {
563                    connected_peers.push((
564                        peer_id.clone(),
565                        webrtc_peer.pending_requests.clone(),
566                        dc.clone(),
567                    ));
568                }
569            }
570            connected_sessions.push((peer_id, peer, transport));
571        }
572
573        if connected_sessions.is_empty() {
574            debug!(
575                "No connected peers to query for {}",
576                &hash_hex[..8.min(hash_hex.len())]
577            );
578            return None;
579        }
580
581        // Convert hex to binary hash once
582        let hash_bytes = match hex::decode(hash_hex) {
583            Ok(b) => b,
584            Err(_) => return None,
585        };
586
587        let expected_hash: [u8; 32] = match hash_bytes.as_slice().try_into() {
588            Ok(h) => h,
589            Err(_) => {
590                debug!(
591                    "Invalid hash length {}, expected 32 bytes",
592                    hash_bytes.len()
593                );
594                return None;
595            }
596        };
597
598        let connected_peer_ids: Vec<String> = connected_sessions
599            .iter()
600            .map(|(peer_id, _, _)| peer_id.clone())
601            .collect();
602        sync_selector_peers(self.peer_selector.as_ref(), &connected_peer_ids).await;
603
604        let ordered_peer_ids = self.peer_selector.write().await.select_peers();
605        let mut quote_by_peer: HashMap<
606            String,
607            (
608                PendingRequestsMap,
609                Arc<webrtc::data_channel::RTCDataChannel>,
610            ),
611        > = connected_peers
612            .iter()
613            .cloned()
614            .map(|(peer_id, pending, dc)| (peer_id, (pending, dc)))
615            .collect();
616        let mut ordered_quote_peers: Vec<ConnectedPeer> = Vec::new();
617        for peer_id in &ordered_peer_ids {
618            if let Some((pending, dc)) = quote_by_peer.remove(peer_id) {
619                ordered_quote_peers.push((peer_id.clone(), pending, dc));
620            }
621        }
622        for (peer_id, (pending, dc)) in quote_by_peer {
623            ordered_quote_peers.push((peer_id, pending, dc));
624        }
625
626        let mut by_peer: HashMap<String, (MeshPeer, PeerTransport)> = connected_sessions
627            .into_iter()
628            .map(|(peer_id, peer, transport)| (peer_id, (peer, transport)))
629            .collect();
630
631        let mut ordered_peers: Vec<ConnectedSession> = Vec::new();
632        for peer_id in ordered_peer_ids {
633            if let Some((peer, transport)) = by_peer.remove(&peer_id) {
634                ordered_peers.push((peer_id, peer, transport));
635            }
636        }
637        for (peer_id, (peer, transport)) in by_peer {
638            ordered_peers.push((peer_id, peer, transport));
639        }
640
641        debug!(
642            "Querying {} peers for {} with shared hedged scheduler",
643            ordered_peers.len(),
644            &hash_hex[..8.min(hash_hex.len())],
645        );
646
647        if let Some((requested_mint, payment_sat, quote_ttl_ms)) =
648            self.cashu_quotes.requester_quote_terms().await
649        {
650            if let Some(quote) = self
651                .request_quote_from_peers(
652                    &hash_bytes,
653                    requested_mint,
654                    payment_sat,
655                    quote_ttl_ms,
656                    &ordered_quote_peers,
657                )
658                .await
659            {
660                if let Some(data) = self
661                    .request_from_single_peer(
662                        hash_hex,
663                        &hash_bytes,
664                        expected_hash,
665                        &quote.peer_id,
666                        Some(&quote),
667                        &ordered_quote_peers,
668                    )
669                    .await
670                {
671                    debug!(
672                        "Got quoted response from peer {} for {}",
673                        quote.peer_id,
674                        &hash_hex[..8.min(hash_hex.len())]
675                    );
676                    return Some((data, quote.peer_id));
677                }
678            }
679        }
680
681        let request = DataRequest {
682            h: hash_bytes.clone(),
683            htl: BLOB_REQUEST_POLICY.max_htl,
684            q: None,
685        };
686        let wire = crate::encode_request(&request);
687        let wire_len = wire.len() as u64;
688        let current_result_rx = Arc::new(Mutex::new(None));
689        if let Some((data, peer_id)) = run_hedged_waves(
690            ordered_peers.len(),
691            self.request_dispatch,
692            self.request_timeout,
693            |range| {
694                let wave_peers = ordered_peers[range].to_vec();
695                let (result_tx, result_rx) =
696                    mpsc::channel::<(String, Instant, Result<Option<Vec<u8>>>)>(wave_peers.len());
697                let current_result_rx = current_result_rx.clone();
698                let hash_hex = hash_hex.to_string();
699                async move {
700                    *current_result_rx.lock().await = Some(result_rx);
701                    let sent = wave_peers.len();
702                    for (peer_id, peer, transport) in wave_peers {
703                        if transport != PeerTransport::Bluetooth {
704                            self.record_sent(&peer_id, wire_len).await;
705                        }
706                        self.peer_selector
707                            .write()
708                            .await
709                            .record_request(&peer_id, wire_len);
710
711                        let result_tx = result_tx.clone();
712                        let peer_id_for_task = peer_id.clone();
713                        let peer = peer.clone();
714                        let hash_hex = hash_hex.clone();
715                        let per_request_timeout = self.request_timeout;
716                        tokio::spawn(async move {
717                            let started = Instant::now();
718                            let result = peer.request(&hash_hex, per_request_timeout).await;
719                            let _ = result_tx.send((peer_id_for_task, started, result)).await;
720                        });
721                    }
722                    drop(result_tx);
723                    sent
724                }
725            },
726            |wait| {
727                let current_result_rx = current_result_rx.clone();
728                async move {
729                    let mut current_result_rx = current_result_rx.lock().await;
730                    let Some(result_rx) = current_result_rx.as_mut() else {
731                        return HedgedWaveAction::Abort;
732                    };
733                    let deadline = Instant::now() + wait;
734                    loop {
735                        let now = Instant::now();
736                        if now >= deadline {
737                            return HedgedWaveAction::Continue;
738                        }
739                        let remaining = deadline.saturating_duration_since(now);
740                        match tokio::time::timeout(remaining, result_rx.recv()).await {
741                            Ok(Some((peer_id, started, Ok(Some(data))))) => {
742                                let rtt_ms = started.elapsed().as_millis() as u64;
743                                if hashtree_core::sha256(&data) == expected_hash {
744                                    let should_record = {
745                                        let peers = self.runtime.peers.read().await;
746                                        peers
747                                            .get(&peer_id)
748                                            .map(|entry| {
749                                                entry.transport != PeerTransport::Bluetooth
750                                            })
751                                            .unwrap_or(true)
752                                    };
753                                    if should_record {
754                                        self.record_received(&peer_id, data.len() as u64).await;
755                                    }
756                                    self.peer_selector.write().await.record_success(
757                                        &peer_id,
758                                        rtt_ms,
759                                        data.len() as u64,
760                                    );
761                                    return HedgedWaveAction::Success((data, peer_id));
762                                }
763                                self.peer_selector.write().await.record_failure(&peer_id);
764                            }
765                            Ok(Some((peer_id, _, Ok(None)))) | Ok(Some((peer_id, _, Err(_)))) => {
766                                self.peer_selector.write().await.record_timeout(&peer_id);
767                            }
768                            Ok(None) | Err(_) => return HedgedWaveAction::Continue,
769                        }
770                    }
771                }
772            },
773        )
774        .await
775        {
776            debug!(
777                "Got response from peer {} for {}",
778                peer_id,
779                &hash_hex[..8.min(hash_hex.len())]
780            );
781            return Some((data, peer_id));
782        }
783
784        debug!(
785            "No peer had data for {}",
786            &hash_hex[..8.min(hash_hex.len())]
787        );
788        None
789    }
790
791    async fn request_quote_from_peers(
792        &self,
793        hash_bytes: &[u8],
794        requested_mint: String,
795        payment_sat: u64,
796        quote_ttl_ms: u32,
797        ordered_peers: &[ConnectedPeer],
798    ) -> Option<NegotiatedQuote> {
799        if ordered_peers.is_empty() || quote_ttl_ms == 0 {
800            return None;
801        }
802
803        let hash_hex = hex::encode(hash_bytes);
804        let rx = self
805            .cashu_quotes
806            .register_pending_quote(hash_hex.clone(), Some(requested_mint.clone()), payment_sat)
807            .await;
808        let quote_request = DataQuoteRequest {
809            h: hash_bytes.to_vec(),
810            p: payment_sat,
811            t: quote_ttl_ms,
812            m: Some(requested_mint),
813        };
814        let wire = crate::encode_quote_request(&quote_request);
815        let rx = Arc::new(Mutex::new(rx));
816        let result = run_hedged_waves(
817            ordered_peers.len(),
818            self.request_dispatch,
819            self.request_timeout,
820            |range| {
821                let wave_peers = ordered_peers[range].to_vec();
822                let wire = wire.clone();
823                async move {
824                    let mut sent = 0usize;
825                    for (_, _, dc) in wave_peers {
826                        if dc.send(&bytes::Bytes::copy_from_slice(&wire)).await.is_ok() {
827                            sent += 1;
828                        }
829                    }
830                    sent
831                }
832            },
833            |wait| {
834                let rx = rx.clone();
835                async move {
836                    let mut rx = rx.lock().await;
837                    match tokio::time::timeout(wait, &mut *rx).await {
838                        Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
839                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
840                        Err(_) => HedgedWaveAction::Continue,
841                    }
842                }
843            },
844        )
845        .await;
846
847        self.cashu_quotes.clear_pending_quote(&hash_hex).await;
848        result
849    }
850
851    async fn request_from_single_peer(
852        &self,
853        hash_hex: &str,
854        hash_bytes: &[u8],
855        expected_hash: [u8; 32],
856        target_peer_id: &str,
857        quote: Option<&NegotiatedQuote>,
858        ordered_peers: &[ConnectedPeer],
859    ) -> Option<Vec<u8>> {
860        use crate::BLOB_REQUEST_POLICY;
861
862        let (pending_requests, dc) = ordered_peers
863            .iter()
864            .find(|(peer_id, _, _)| peer_id == target_peer_id)
865            .map(|(_, pending_requests, dc)| (pending_requests.clone(), dc.clone()))?;
866
867        let request = DataRequest {
868            h: hash_bytes.to_vec(),
869            htl: BLOB_REQUEST_POLICY.max_htl,
870            q: quote.map(|quote| quote.quote_id),
871        };
872        let wire = crate::encode_request(&request);
873        let wire_len = wire.len() as u64;
874        let sent_at = Instant::now();
875        let (tx, mut rx) = tokio::sync::oneshot::channel();
876
877        {
878            let mut pending = pending_requests.lock().await;
879            pending.insert(
880                hash_hex.to_string(),
881                if let Some(quote) = quote {
882                    PendingRequest::quoted(
883                        hash_bytes.to_vec(),
884                        tx,
885                        quote.quote_id,
886                        quote.mint_url.clone().unwrap_or_default(),
887                        quote.payment_sat,
888                    )
889                } else {
890                    PendingRequest::standard(hash_bytes.to_vec(), tx)
891                },
892            );
893        }
894
895        if dc
896            .send(&bytes::Bytes::copy_from_slice(&wire))
897            .await
898            .is_err()
899        {
900            let mut pending = pending_requests.lock().await;
901            pending.remove(hash_hex);
902            self.peer_selector
903                .write()
904                .await
905                .record_failure(target_peer_id);
906            return None;
907        }
908
909        self.record_sent(target_peer_id, wire_len).await;
910        self.peer_selector
911            .write()
912            .await
913            .record_request(target_peer_id, wire_len);
914
915        let wait_timeout = if let Some(quote) = quote {
916            let multiplier = quote.payment_sat.clamp(1, 32) as u128;
917            let extra_ms = self
918                .cashu_quotes
919                .settlement_timeout()
920                .as_millis()
921                .saturating_mul(multiplier);
922            self.request_timeout + Duration::from_millis(extra_ms.min(u64::MAX as u128) as u64)
923        } else {
924            self.request_timeout
925        };
926
927        match tokio::time::timeout(wait_timeout, &mut rx).await {
928            Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == expected_hash => {
929                let rtt_ms = sent_at.elapsed().as_millis() as u64;
930                self.record_received(target_peer_id, data.len() as u64)
931                    .await;
932                self.peer_selector.write().await.record_success(
933                    target_peer_id,
934                    rtt_ms,
935                    data.len() as u64,
936                );
937                Some(data)
938            }
939            Ok(Ok(Some(_))) => {
940                self.peer_selector
941                    .write()
942                    .await
943                    .record_failure(target_peer_id);
944                let pending = pending_requests.lock().await.remove(hash_hex);
945                if let Some(pending) = pending {
946                    if let Some(quoted) = pending.quoted {
947                        if let Some(in_flight) = quoted.in_flight_payment {
948                            let _ = self
949                                .cashu_quotes
950                                .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
951                                .await;
952                        }
953                    }
954                }
955                None
956            }
957            Ok(Ok(None)) | Ok(Err(_)) | Err(_) => {
958                let pending = pending_requests.lock().await.remove(hash_hex);
959                if let Some(pending) = pending {
960                    if let Some(quoted) = pending.quoted {
961                        if let Some(in_flight) = quoted.in_flight_payment {
962                            let _ = self
963                                .cashu_quotes
964                                .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
965                                .await;
966                        }
967                    }
968                }
969                self.peer_selector
970                    .write()
971                    .await
972                    .record_timeout(target_peer_id);
973                None
974            }
975        }
976    }
977
978    /// Resolve a hashtree root event through connected peers using Nostr REQ/EOSE over WebRTC.
979    pub async fn resolve_root_from_peers(
980        &self,
981        owner_pubkey: &str,
982        tree_name: &str,
983        per_peer_timeout: Duration,
984    ) -> Option<PeerRootEvent> {
985        let peer_refs: Vec<(String, Arc<dyn MeshSession>)> = {
986            let peers = self.runtime.peers.read().await;
987            peers
988                .values()
989                .filter(|entry| entry.state == ConnectionState::Connected)
990                .filter(|entry| {
991                    !bluetooth_nostr_only_mode() || entry.transport != PeerTransport::Bluetooth
992                })
993                .filter_map(|entry| {
994                    let peer = entry.peer.as_ref()?;
995                    Some((
996                        entry.peer_id.short(),
997                        Arc::new(peer.clone()) as Arc<dyn MeshSession>,
998                    ))
999                })
1000                .collect()
1001        };
1002
1003        let resolved =
1004            resolve_root_via_peer_sessions(peer_refs, owner_pubkey, tree_name, per_peer_timeout)
1005                .await;
1006        if let Some(root) = &resolved {
1007            debug!(
1008                "Resolved {}/{} via peer {} event {}",
1009                owner_pubkey, tree_name, root.peer_id, root.event_id
1010            );
1011        }
1012        resolved
1013    }
1014
1015    pub async fn resolve_root_from_local_buses_with_source(
1016        &self,
1017        owner_pubkey: &str,
1018        tree_name: &str,
1019        timeout: Duration,
1020    ) -> Option<(&'static str, PeerRootEvent)> {
1021        self.runtime
1022            .resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
1023            .await
1024    }
1025
1026    pub async fn resolve_root_from_local_buses(
1027        &self,
1028        owner_pubkey: &str,
1029        tree_name: &str,
1030        timeout: Duration,
1031    ) -> Option<PeerRootEvent> {
1032        self.resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
1033            .await
1034            .map(|(_, root)| root)
1035    }
1036
1037    pub async fn resolve_root_from_multicast(
1038        &self,
1039        owner_pubkey: &str,
1040        tree_name: &str,
1041        timeout: Duration,
1042    ) -> Option<PeerRootEvent> {
1043        self.resolve_root_from_local_buses(owner_pubkey, tree_name, timeout)
1044            .await
1045    }
1046}
1047
1048impl Default for WebRTCState {
1049    fn default() -> Self {
1050        Self::new()
1051    }
1052}
1053
1054/// Native mesh manager handles peer discovery and transport fan-out.
1055pub struct WebRTCManager {
1056    config: WebRTCConfig,
1057    my_peer_id: PeerId,
1058    keys: Keys,
1059    state: Arc<WebRTCState>,
1060    shutdown: Arc<tokio::sync::watch::Sender<bool>>,
1061    shutdown_rx: tokio::sync::watch::Receiver<bool>,
1062    /// Channel to send signaling messages to relays
1063    signaling_tx: mpsc::Sender<SignalingMessage>,
1064    signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
1065    /// Optional content store for serving hash requests
1066    store: Option<Arc<dyn ContentStore>>,
1067    /// Peer classifier for pool assignment
1068    peer_classifier: PeerClassifier,
1069    /// Optional Nostr relay for data-channel relay messages
1070    nostr_relay: Option<SharedMeshRelayClient>,
1071    local_buses: Vec<SharedLocalNostrBus>,
1072    /// Channel for peer state events (connection success/failure)
1073    state_event_tx: mpsc::Sender<PeerStateEvent>,
1074    state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
1075    /// Channel for relayless mesh signaling frames received from peers.
1076    mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
1077    mesh_frame_rx: Option<mpsc::Receiver<(PeerId, MeshNostrFrame)>>,
1078    shared_router: Option<Arc<SharedProductionRouter>>,
1079    seen_frame_ids: Arc<Mutex<TimedSeenSet>>,
1080    seen_event_ids: Arc<Mutex<TimedSeenSet>>,
1081}
1082
1083impl WebRTCManager {
1084    /// Create a new WebRTC manager
1085    pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
1086        let pubkey = keys.public_key().to_hex();
1087        let my_peer_id = PeerId::new(pubkey);
1088        let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
1089        let (signaling_tx, signaling_rx) = mpsc::channel(100);
1090        let (state_event_tx, state_event_rx) = mpsc::channel(100);
1091        let (mesh_frame_tx, mesh_frame_rx) = mpsc::channel(256);
1092        let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1093            config.request_selection_strategy,
1094            config.request_fairness_enabled,
1095            config.request_dispatch,
1096            Duration::from_millis(config.message_timeout_ms),
1097            CashuRoutingConfig::default(),
1098            None,
1099            None,
1100        ));
1101
1102        // Default classifier: all peers go to 'other' pool
1103        let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
1104
1105        Self {
1106            config,
1107            my_peer_id,
1108            keys,
1109            state,
1110            shutdown: Arc::new(shutdown),
1111            shutdown_rx,
1112            signaling_tx,
1113            signaling_rx: Some(signaling_rx),
1114            store: None,
1115            peer_classifier,
1116            nostr_relay: None,
1117            local_buses: Vec::new(),
1118            state_event_tx,
1119            state_event_rx: Some(state_event_rx),
1120            mesh_frame_tx,
1121            mesh_frame_rx: Some(mesh_frame_rx),
1122            shared_router: None,
1123            seen_frame_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1124                SEEN_FRAME_CAP,
1125                SEEN_FRAME_TTL,
1126            ))),
1127            seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1128                SEEN_EVENT_CAP,
1129                SEEN_EVENT_TTL,
1130            ))),
1131        }
1132    }
1133
1134    /// Create a new WebRTC manager reusing an existing shared state object.
1135    pub fn new_with_state(keys: Keys, config: WebRTCConfig, state: Arc<WebRTCState>) -> Self {
1136        let mut manager = Self::new(keys, config);
1137        manager.state = state;
1138        manager
1139    }
1140
1141    /// Create a new WebRTC manager with a peer classifier
1142    pub fn new_with_classifier(
1143        keys: Keys,
1144        config: WebRTCConfig,
1145        classifier: PeerClassifier,
1146    ) -> Self {
1147        let mut manager = Self::new(keys, config);
1148        manager.peer_classifier = classifier;
1149        manager
1150    }
1151
1152    /// Create a new WebRTC manager with a content store for serving hash requests
1153    pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
1154        let mut manager = Self::new(keys, config);
1155        manager.store = Some(store);
1156        manager
1157    }
1158
1159    /// Create a new WebRTC manager with store and classifier
1160    pub fn new_with_store_and_classifier(
1161        keys: Keys,
1162        config: WebRTCConfig,
1163        store: Arc<dyn ContentStore>,
1164        classifier: PeerClassifier,
1165    ) -> Self {
1166        Self::new_with_store_and_classifier_and_cashu(
1167            keys,
1168            config,
1169            store,
1170            classifier,
1171            CashuRoutingConfig::default(),
1172            None,
1173            None,
1174        )
1175    }
1176
1177    pub fn new_with_state_and_store_and_classifier(
1178        keys: Keys,
1179        config: WebRTCConfig,
1180        state: Arc<WebRTCState>,
1181        store: Arc<dyn ContentStore>,
1182        classifier: PeerClassifier,
1183    ) -> Self {
1184        let mut manager = Self::new_with_state(keys, config, state);
1185        manager.store = Some(store);
1186        manager.peer_classifier = classifier;
1187        manager
1188    }
1189
1190    pub fn new_with_store_and_classifier_and_cashu(
1191        keys: Keys,
1192        config: WebRTCConfig,
1193        store: Arc<dyn ContentStore>,
1194        classifier: PeerClassifier,
1195        cashu_routing: CashuRoutingConfig,
1196        payment_client: Option<Arc<dyn CashuPaymentClient>>,
1197        mint_metadata: Option<Arc<CashuMintMetadataStore>>,
1198    ) -> Self {
1199        let mut manager = Self::new(keys, config);
1200        manager.state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1201            manager.config.request_selection_strategy,
1202            manager.config.request_fairness_enabled,
1203            manager.config.request_dispatch,
1204            Duration::from_millis(manager.config.message_timeout_ms),
1205            cashu_routing,
1206            payment_client,
1207            mint_metadata,
1208        ));
1209        manager.store = Some(store);
1210        manager.peer_classifier = classifier;
1211        manager
1212    }
1213
1214    /// Set the content store for serving hash requests
1215    pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
1216        self.store = Some(store);
1217    }
1218
1219    /// Set the peer classifier
1220    pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
1221        self.peer_classifier = classifier;
1222    }
1223
1224    /// Set the Nostr relay for data-channel relay messages
1225    pub fn set_nostr_relay(&mut self, relay: SharedMeshRelayClient) {
1226        self.nostr_relay = Some(relay);
1227    }
1228
1229    /// Get my peer ID
1230    pub fn my_peer_id(&self) -> &PeerId {
1231        &self.my_peer_id
1232    }
1233
1234    /// Get shared state for external access
1235    pub fn state(&self) -> Arc<WebRTCState> {
1236        self.state.clone()
1237    }
1238
1239    /// Cloneable shutdown handle for external lifecycle control.
1240    pub fn shutdown_signal(&self) -> Arc<tokio::sync::watch::Sender<bool>> {
1241        self.shutdown.clone()
1242    }
1243
1244    /// Signal shutdown
1245    pub fn shutdown(&self) {
1246        let _ = self.shutdown.send(true);
1247    }
1248
1249    /// Get connected peer count
1250    pub async fn connected_count(&self) -> usize {
1251        self.state
1252            .runtime
1253            .connected_count
1254            .load(std::sync::atomic::Ordering::Relaxed)
1255    }
1256
1257    /// Get all peer statuses
1258    pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
1259        self.state
1260            .runtime
1261            .peers
1262            .read()
1263            .await
1264            .values()
1265            .map(|p| PeerStatus {
1266                peer_id: p.peer_id.to_string(),
1267                pubkey: p.peer_id.pubkey.clone(),
1268                state: p.state.to_string(),
1269                direction: p.direction,
1270                connected_at: Some(p.last_seen),
1271                pool: p.pool,
1272            })
1273            .collect()
1274    }
1275
1276    /// Get pool counts
1277    /// Returns (follows_connected, follows_active, other_connected, other_active)
1278    /// "active" = Connected or Connecting (excludes Discovered and Failed)
1279    pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
1280        let peers = self.state.runtime.peers.read().await;
1281        let mut follows_connected = 0;
1282        let mut follows_active = 0;
1283        let mut other_connected = 0;
1284        let mut other_active = 0;
1285
1286        for entry in peers.values() {
1287            // Only count Connected or Connecting as "active" connections
1288            // Discovered peers are just seen hellos, not real connections
1289            let is_active = entry.state == ConnectionState::Connected
1290                || entry.state == ConnectionState::Connecting;
1291
1292            match entry.pool {
1293                PeerPool::Follows => {
1294                    if is_active {
1295                        follows_active += 1;
1296                    }
1297                    if entry.state == ConnectionState::Connected {
1298                        follows_connected += 1;
1299                    }
1300                }
1301                PeerPool::Other => {
1302                    if is_active {
1303                        other_active += 1;
1304                    }
1305                    if entry.state == ConnectionState::Connected {
1306                        other_connected += 1;
1307                    }
1308                }
1309            }
1310        }
1311
1312        (
1313            follows_connected,
1314            follows_active,
1315            other_connected,
1316            other_active,
1317        )
1318    }
1319
1320    fn local_bus_max_peers(&self, source: &str) -> Option<usize> {
1321        match source {
1322            "multicast" => Some(self.config.multicast.max_peers),
1323            WIFI_AWARE_SOURCE => Some(self.config.wifi_aware.max_peers),
1324            _ => None,
1325        }
1326    }
1327
1328    #[cfg_attr(not(test), allow(dead_code))]
1329    fn can_track_local_bus_peer(
1330        &self,
1331        source: &str,
1332        peer_key: &str,
1333        peers: &HashMap<String, PeerEntry>,
1334    ) -> bool {
1335        can_track_source_peer(source, peer_key, peers, self.local_bus_max_peers(source))
1336    }
1337}
1338
1339mod runtime;