1pub use crate::{ConnectionState, PeerSignalPath, PeerTransport};
13use anyhow::Result;
14use async_trait::async_trait;
15use cashu_service::CashuPaymentClient;
16use nostr_sdk::nostr::Keys;
17#[cfg(test)]
18use nostr_sdk::nostr::Kind;
19use std::collections::{BTreeSet, HashMap};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::{mpsc, Mutex, RwLock};
23use tracing::{debug, error, info, warn};
24
25use crate::bluetooth::{
26 BluetoothConfig, BluetoothMesh, BluetoothPeerRegistrar, BluetoothRuntimeContext,
27};
28use crate::cashu::{CashuMintMetadataStore, CashuQuoteState, CashuRoutingConfig, NegotiatedQuote};
29use crate::local_bus::SharedLocalNostrBus;
30use crate::mesh_session::{
31 resolve_root_from_peer_sessions as resolve_root_via_peer_sessions, MeshSession,
32};
33use crate::mesh_store_core::{
34 run_hedged_waves, sync_selector_peers, HedgedWaveAction, RequestDispatchConfig,
35};
36use crate::multicast::{MulticastConfig, MulticastNostrBus};
37use crate::nostr::NostrRelayTransport;
38use crate::peer::{ContentStore, Peer, PendingRequest};
39use crate::peer_selector::PeerSelector;
40use crate::protocol::{DataQuoteRequest, DataRequest};
41use crate::relay_bridge::SharedMeshRelayClient;
42use crate::root_events::PeerRootEvent;
43use crate::runtime_control::{can_track_source_peer, PeerStateEvent};
44use crate::runtime_peer::{
45 MeshPeerEntry as SharedPeerEntry, PeerClassifier as SharedPeerClassifier,
46};
47use crate::runtime_state::MeshRuntimeState;
48use crate::session::MeshPeer;
49use crate::signaling::MeshRouter;
50use crate::transport::{
51 PeerLink as SharedPeerLink, PeerLinkFactory as SharedPeerLinkFactory,
52 SignalingTransport as SharedSignalingTransport, TransportError as SharedTransportError,
53};
54use crate::types::{
55 validate_mesh_frame, MeshNostrFrame, MeshNostrPayload, SignalingMessage, TimedSeenSet,
56};
57use crate::wifi_aware::{
58 mobile_wifi_aware_bridge, WifiAwareConfig, WifiAwareNostrBus, WIFI_AWARE_SOURCE,
59};
60use crate::{
61 ClassifyRequest as SharedClassifyRequest, IceCandidate as SharedIceCandidate, PeerDirection,
62 PeerId, PeerPool, MESH_SIGNALING_EVENT_KIND,
63};
64
65pub type PeerClassifier = SharedPeerClassifier;
67pub type PeerEntry = SharedPeerEntry<MeshPeer>;
68
69#[derive(Clone)]
70pub struct WebRTCConfig {
71 pub relays: Vec<String>,
72 pub signaling_enabled: bool,
73 pub hash_get_enabled: bool,
74 pub max_outbound: usize,
75 pub max_inbound: usize,
76 pub hello_interval_ms: u64,
77 pub message_timeout_ms: u64,
78 pub stun_servers: Vec<String>,
79 pub debug: bool,
80 pub multicast: MulticastConfig,
81 pub wifi_aware: WifiAwareConfig,
82 pub bluetooth: BluetoothConfig,
83 pub pools: crate::PoolSettings,
84 pub request_selection_strategy: crate::SelectionStrategy,
85 pub request_fairness_enabled: bool,
86 pub request_dispatch: RequestDispatchConfig,
87}
88
89impl Default for WebRTCConfig {
90 fn default() -> Self {
91 Self {
92 relays: vec![
93 "wss://relay.damus.io".to_string(),
94 "wss://relay.primal.net".to_string(),
95 "wss://temp.iris.to".to_string(),
96 "wss://relay.snort.social".to_string(),
97 ],
98 signaling_enabled: true,
99 hash_get_enabled: true,
100 max_outbound: 6,
101 max_inbound: 6,
102 hello_interval_ms: 3000,
103 message_timeout_ms: 15000,
104 stun_servers: vec![
105 "stun:stun.iris.to:3478".to_string(),
106 "stun:stun.l.google.com:19302".to_string(),
107 "stun:stun.cloudflare.com:3478".to_string(),
108 ],
109 debug: false,
110 multicast: MulticastConfig::default(),
111 wifi_aware: WifiAwareConfig::default(),
112 bluetooth: BluetoothConfig::default(),
113 pools: crate::PoolSettings::default(),
114 request_selection_strategy: crate::SelectionStrategy::Weighted,
115 request_fairness_enabled: true,
116 request_dispatch: RequestDispatchConfig {
117 initial_fanout: 2,
118 hedge_fanout: 1,
119 max_fanout: 8,
120 hedge_interval_ms: 120,
121 },
122 }
123 }
124}
125
126#[derive(Debug, Clone)]
127pub struct PeerStatus {
128 pub peer_id: String,
129 pub pubkey: String,
130 pub state: String,
131 pub direction: PeerDirection,
132 pub connected_at: Option<std::time::Instant>,
133 pub pool: PeerPool,
134}
135
136fn bluetooth_nostr_only_mode() -> bool {
137 matches!(
138 std::env::var("HTREE_BLUETOOTH_NOSTR_ONLY").ok().as_deref(),
139 Some("1" | "true" | "TRUE" | "yes" | "YES")
140 )
141}
142
143pub struct WebRTCState {
145 pub runtime: MeshRuntimeState<MeshPeer>,
146 peer_selector: Arc<RwLock<PeerSelector>>,
148 request_dispatch: RequestDispatchConfig,
150 request_timeout: Duration,
152 cashu_quotes: Arc<CashuQuoteState>,
154}
155const SEEN_FRAME_CAP: usize = 4096;
156const SEEN_FRAME_TTL: Duration = Duration::from_secs(120);
157const SEEN_EVENT_CAP: usize = 8192;
158const SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
159
160type PendingRequestsMap = Arc<Mutex<HashMap<String, PendingRequest>>>;
161type ConnectedPeer = (
162 String,
163 PendingRequestsMap,
164 Arc<webrtc::data_channel::RTCDataChannel>,
165);
166type ConnectedSession = (String, MeshPeer, PeerTransport);
167type SharedProductionRouter = MeshRouter<RouterSignalingBridge, SharedRouterPeerFactory>;
168
169#[derive(Clone)]
170struct RouterSignalingBridge {
171 peer_id: String,
172 signaling_tx: mpsc::Sender<SignalingMessage>,
173}
174
175impl RouterSignalingBridge {
176 fn new(peer_id: String, signaling_tx: mpsc::Sender<SignalingMessage>) -> Self {
177 Self {
178 peer_id,
179 signaling_tx,
180 }
181 }
182}
183
184#[async_trait]
185impl SharedSignalingTransport for RouterSignalingBridge {
186 async fn connect(&self, _relays: &[String]) -> Result<(), SharedTransportError> {
187 Ok(())
188 }
189
190 async fn disconnect(&self) {}
191
192 async fn publish(&self, msg: SignalingMessage) -> Result<(), SharedTransportError> {
193 self.signaling_tx
194 .send(msg)
195 .await
196 .map_err(|e| SharedTransportError::SendFailed(e.to_string()))
197 }
198
199 async fn recv(&self) -> Option<SignalingMessage> {
200 None
201 }
202
203 fn try_recv(&self) -> Option<SignalingMessage> {
204 None
205 }
206
207 fn peer_id(&self) -> &str {
208 &self.peer_id
209 }
210}
211
212struct SharedRouterPeerFactory {
213 my_peer_id: PeerId,
214 signaling_tx: mpsc::Sender<SignalingMessage>,
215 stun_servers: Vec<String>,
216 store: Option<Arc<dyn ContentStore>>,
217 state: Arc<WebRTCState>,
218 state_event_tx: mpsc::Sender<PeerStateEvent>,
219 nostr_relay: Option<SharedMeshRelayClient>,
220 mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
221 peer_classifier: PeerClassifier,
222 peers: RwLock<HashMap<String, Arc<Peer>>>,
223}
224
225impl SharedRouterPeerFactory {
226 fn new(
227 my_peer_id: PeerId,
228 signaling_tx: mpsc::Sender<SignalingMessage>,
229 stun_servers: Vec<String>,
230 store: Option<Arc<dyn ContentStore>>,
231 state: Arc<WebRTCState>,
232 state_event_tx: mpsc::Sender<PeerStateEvent>,
233 nostr_relay: Option<SharedMeshRelayClient>,
234 mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
235 peer_classifier: PeerClassifier,
236 ) -> Self {
237 Self {
238 my_peer_id,
239 signaling_tx,
240 stun_servers,
241 store,
242 state,
243 state_event_tx,
244 nostr_relay,
245 mesh_frame_tx,
246 peer_classifier,
247 peers: RwLock::new(HashMap::new()),
248 }
249 }
250
251 async fn register_peer(&self, peer_id: PeerId, direction: PeerDirection, peer: Arc<Peer>) {
252 let peer_key = peer_id.to_string();
253 let pool = (self.peer_classifier)(&peer_id.pubkey);
254 self.peers
255 .write()
256 .await
257 .insert(peer_key.clone(), peer.clone());
258
259 let mut peers = self.state.runtime.peers.write().await;
260 peers.insert(
261 peer_key,
262 PeerEntry {
263 peer_id,
264 direction,
265 state: ConnectionState::Connecting,
266 last_seen: Instant::now(),
267 peer: Some(MeshPeer::WebRtc(peer)),
268 pool,
269 transport: PeerTransport::WebRtc,
270 signal_paths: BTreeSet::from([PeerSignalPath::Relay]),
271 bytes_sent: 0,
272 bytes_received: 0,
273 },
274 );
275 }
276
277 async fn create_peer(
278 &self,
279 peer_id: PeerId,
280 direction: PeerDirection,
281 ) -> Result<Peer, SharedTransportError> {
282 Peer::new_with_store_and_events(
283 peer_id,
284 direction,
285 self.my_peer_id.clone(),
286 self.signaling_tx.clone(),
287 self.stun_servers.clone(),
288 self.store.clone(),
289 Some(self.state_event_tx.clone()),
290 self.nostr_relay.clone(),
291 Some(self.mesh_frame_tx.clone()),
292 Some(self.state.cashu_quotes.clone()),
293 )
294 .await
295 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))
296 }
297}
298
299#[async_trait]
300impl SharedPeerLinkFactory for SharedRouterPeerFactory {
301 async fn create_offer(
302 &self,
303 target_peer_id: &str,
304 ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
305 let target_peer = PeerId::from_string(target_peer_id).ok_or_else(|| {
306 SharedTransportError::ConnectionFailed(format!("invalid peer id {target_peer_id}"))
307 })?;
308 let peer = Arc::new(
309 self.create_peer(target_peer.clone(), PeerDirection::Outbound)
310 .await?,
311 );
312 peer.setup_handlers()
313 .await
314 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
315 let offer = peer
316 .connect()
317 .await
318 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
319 let sdp = offer
320 .get("sdp")
321 .and_then(|value| value.as_str())
322 .ok_or_else(|| {
323 SharedTransportError::ConnectionFailed("missing SDP in CLI peer offer".to_string())
324 })?
325 .to_string();
326 self.register_peer(target_peer, PeerDirection::Outbound, peer.clone())
327 .await;
328 Ok((peer as Arc<dyn SharedPeerLink>, sdp))
329 }
330
331 async fn accept_offer(
332 &self,
333 from_peer_id: &str,
334 offer_sdp: &str,
335 ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
336 let from_peer = PeerId::from_string(from_peer_id).ok_or_else(|| {
337 SharedTransportError::ConnectionFailed(format!("invalid peer id {from_peer_id}"))
338 })?;
339 let peer = Arc::new(
340 self.create_peer(from_peer.clone(), PeerDirection::Inbound)
341 .await?,
342 );
343 peer.setup_handlers()
344 .await
345 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
346 let answer = peer
347 .handle_offer(serde_json::json!({ "type": "offer", "sdp": offer_sdp }))
348 .await
349 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
350 let sdp = answer
351 .get("sdp")
352 .and_then(|value| value.as_str())
353 .ok_or_else(|| {
354 SharedTransportError::ConnectionFailed("missing SDP in CLI peer answer".to_string())
355 })?
356 .to_string();
357 self.register_peer(from_peer, PeerDirection::Inbound, peer.clone())
358 .await;
359 Ok((peer as Arc<dyn SharedPeerLink>, sdp))
360 }
361
362 async fn handle_answer(
363 &self,
364 target_peer_id: &str,
365 answer_sdp: &str,
366 ) -> Result<Arc<dyn SharedPeerLink>, SharedTransportError> {
367 let peer = self
368 .peers
369 .read()
370 .await
371 .get(target_peer_id)
372 .cloned()
373 .ok_or_else(|| {
374 SharedTransportError::ConnectionFailed(format!(
375 "missing outbound peer for {target_peer_id}"
376 ))
377 })?;
378 peer.handle_answer(serde_json::json!({ "type": "answer", "sdp": answer_sdp }))
379 .await
380 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
381 Ok(peer as Arc<dyn SharedPeerLink>)
382 }
383
384 async fn handle_candidate(
385 &self,
386 peer_id: &str,
387 candidate: SharedIceCandidate,
388 ) -> Result<(), SharedTransportError> {
389 let peer = self.peers.read().await.get(peer_id).cloned();
390 if let Some(peer) = peer {
391 peer.handle_candidate(serde_json::json!({
392 "candidate": candidate.candidate,
393 "sdpMLineIndex": candidate.sdp_m_line_index,
394 "sdpMid": candidate.sdp_mid,
395 }))
396 .await
397 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
398 }
399 Ok(())
400 }
401
402 async fn remove_peer(&self, peer_id: &str) -> Result<(), SharedTransportError> {
403 self.peers.write().await.remove(peer_id);
404 Ok(())
405 }
406}
407
408impl WebRTCState {
409 pub fn new() -> Self {
410 let cfg = WebRTCConfig::default();
411 Self::new_with_routing_and_cashu(
412 cfg.request_selection_strategy,
413 cfg.request_fairness_enabled,
414 cfg.request_dispatch,
415 Duration::from_millis(cfg.message_timeout_ms),
416 CashuRoutingConfig::default(),
417 None,
418 None,
419 )
420 }
421
422 pub fn new_with_routing(
423 selection_strategy: crate::SelectionStrategy,
424 fairness_enabled: bool,
425 request_dispatch: RequestDispatchConfig,
426 ) -> Self {
427 let cfg = WebRTCConfig::default();
428 Self::new_with_routing_and_cashu(
429 selection_strategy,
430 fairness_enabled,
431 request_dispatch,
432 Duration::from_millis(cfg.message_timeout_ms),
433 CashuRoutingConfig::default(),
434 None,
435 None,
436 )
437 }
438
439 pub fn new_with_routing_and_cashu(
440 selection_strategy: crate::SelectionStrategy,
441 fairness_enabled: bool,
442 request_dispatch: RequestDispatchConfig,
443 request_timeout: Duration,
444 cashu_routing: CashuRoutingConfig,
445 payment_client: Option<Arc<dyn CashuPaymentClient>>,
446 mint_metadata: Option<Arc<CashuMintMetadataStore>>,
447 ) -> Self {
448 let mut selector = PeerSelector::with_strategy(selection_strategy);
449 selector.set_fairness(fairness_enabled);
450 let peer_selector = Arc::new(RwLock::new(selector));
451 let cashu_quotes = Arc::new(if let Some(mint_metadata) = mint_metadata {
452 CashuQuoteState::new_with_mint_metadata(
453 cashu_routing,
454 peer_selector.clone(),
455 payment_client,
456 mint_metadata,
457 )
458 } else {
459 CashuQuoteState::new(cashu_routing, peer_selector.clone(), payment_client)
460 });
461 Self {
462 runtime: MeshRuntimeState::new(),
463 peer_selector,
464 request_dispatch,
465 request_timeout,
466 cashu_quotes,
467 }
468 }
469
470 pub async fn set_local_buses(&self, buses: Vec<SharedLocalNostrBus>) {
471 self.runtime.set_local_buses(buses).await;
472 }
473
474 pub async fn add_local_bus(&self, bus: SharedLocalNostrBus) {
475 self.runtime.add_local_bus(bus).await;
476 }
477
478 pub async fn set_multicast_bus(&self, bus: Option<Arc<MulticastNostrBus>>) {
479 let buses = bus
480 .into_iter()
481 .map(|bus| bus as SharedLocalNostrBus)
482 .collect();
483 self.set_local_buses(buses).await;
484 }
485
486 pub async fn reset_runtime_state(&self) {
489 self.runtime.reset().await;
490 }
491
492 pub fn get_bandwidth(&self) -> (u64, u64) {
494 self.runtime.get_bandwidth()
495 }
496
497 pub fn get_mesh_stats(&self) -> (u64, u64, u64) {
498 self.runtime.get_mesh_stats()
499 }
500
501 pub fn record_mesh_received(&self) {
502 self.runtime.record_mesh_received();
503 }
504
505 pub fn record_mesh_forwarded(&self, count: u64) {
506 self.runtime.record_mesh_forwarded(count);
507 }
508
509 pub fn record_mesh_duplicate_drop(&self) {
510 self.runtime.record_mesh_duplicate_drop();
511 }
512
513 pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
515 self.runtime.record_sent(peer_id, bytes).await;
516 }
517
518 pub async fn record_received(&self, peer_id: &str, bytes: u64) {
520 self.runtime.record_received(peer_id, bytes).await;
521 }
522
523 pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
527 self.request_from_peers_with_source(hash_hex)
528 .await
529 .map(|(data, _peer_id)| data)
530 }
531
532 pub async fn request_from_peers_with_source(
534 &self,
535 hash_hex: &str,
536 ) -> Option<(Vec<u8>, String)> {
537 use crate::BLOB_REQUEST_POLICY;
538
539 let peer_hash_get = self.runtime.peer_hash_get_snapshot().await;
540 let peers = self.runtime.peers.read().await;
541
542 let peer_refs: Vec<_> = peers
543 .values()
544 .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
545 .filter_map(|p| {
546 if !peer_hash_get
547 .get(&p.peer_id.to_string())
548 .copied()
549 .unwrap_or(true)
550 {
551 return None;
552 }
553 p.peer
554 .clone()
555 .map(|peer| (p.peer_id.to_string(), peer, p.transport))
556 })
557 .collect();
558
559 drop(peers); let mut connected_peers: Vec<ConnectedPeer> = Vec::new();
562 let mut connected_sessions: Vec<ConnectedSession> = Vec::new();
563 for (peer_id, peer, transport) in peer_refs {
564 if !peer.is_ready() {
565 continue;
566 }
567 if bluetooth_nostr_only_mode() && transport == PeerTransport::Bluetooth {
568 continue;
569 }
570 if let Some(webrtc_peer) = peer.as_webrtc() {
571 let dc_guard = webrtc_peer.data_channel.lock().await;
572 if let Some(dc) = dc_guard.as_ref() {
573 connected_peers.push((
574 peer_id.clone(),
575 webrtc_peer.pending_requests.clone(),
576 dc.clone(),
577 ));
578 }
579 }
580 connected_sessions.push((peer_id, peer, transport));
581 }
582
583 if connected_sessions.is_empty() {
584 debug!(
585 "No connected peers to query for {}",
586 &hash_hex[..8.min(hash_hex.len())]
587 );
588 return None;
589 }
590
591 let hash_bytes = match hex::decode(hash_hex) {
593 Ok(b) => b,
594 Err(_) => return None,
595 };
596
597 let expected_hash: [u8; 32] = match hash_bytes.as_slice().try_into() {
598 Ok(h) => h,
599 Err(_) => {
600 debug!(
601 "Invalid hash length {}, expected 32 bytes",
602 hash_bytes.len()
603 );
604 return None;
605 }
606 };
607
608 let connected_peer_ids: Vec<String> = connected_sessions
609 .iter()
610 .map(|(peer_id, _, _)| peer_id.clone())
611 .collect();
612 sync_selector_peers(self.peer_selector.as_ref(), &connected_peer_ids).await;
613
614 let ordered_peer_ids = self.peer_selector.write().await.select_peers();
615 let mut quote_by_peer: HashMap<
616 String,
617 (
618 PendingRequestsMap,
619 Arc<webrtc::data_channel::RTCDataChannel>,
620 ),
621 > = connected_peers
622 .iter()
623 .cloned()
624 .map(|(peer_id, pending, dc)| (peer_id, (pending, dc)))
625 .collect();
626 let mut ordered_quote_peers: Vec<ConnectedPeer> = Vec::new();
627 for peer_id in &ordered_peer_ids {
628 if let Some((pending, dc)) = quote_by_peer.remove(peer_id) {
629 ordered_quote_peers.push((peer_id.clone(), pending, dc));
630 }
631 }
632 for (peer_id, (pending, dc)) in quote_by_peer {
633 ordered_quote_peers.push((peer_id, pending, dc));
634 }
635
636 let mut by_peer: HashMap<String, (MeshPeer, PeerTransport)> = connected_sessions
637 .into_iter()
638 .map(|(peer_id, peer, transport)| (peer_id, (peer, transport)))
639 .collect();
640
641 let mut ordered_peers: Vec<ConnectedSession> = Vec::new();
642 for peer_id in ordered_peer_ids {
643 if let Some((peer, transport)) = by_peer.remove(&peer_id) {
644 ordered_peers.push((peer_id, peer, transport));
645 }
646 }
647 for (peer_id, (peer, transport)) in by_peer {
648 ordered_peers.push((peer_id, peer, transport));
649 }
650
651 debug!(
652 "Querying {} peers for {} with shared hedged scheduler",
653 ordered_peers.len(),
654 &hash_hex[..8.min(hash_hex.len())],
655 );
656
657 if let Some((requested_mint, payment_sat, quote_ttl_ms)) =
658 self.cashu_quotes.requester_quote_terms().await
659 {
660 if let Some(quote) = self
661 .request_quote_from_peers(
662 &hash_bytes,
663 requested_mint,
664 payment_sat,
665 quote_ttl_ms,
666 &ordered_quote_peers,
667 )
668 .await
669 {
670 if let Some(data) = self
671 .request_from_single_peer(
672 hash_hex,
673 &hash_bytes,
674 expected_hash,
675 "e.peer_id,
676 Some("e),
677 &ordered_quote_peers,
678 )
679 .await
680 {
681 debug!(
682 "Got quoted response from peer {} for {}",
683 quote.peer_id,
684 &hash_hex[..8.min(hash_hex.len())]
685 );
686 return Some((data, quote.peer_id));
687 }
688 }
689 }
690
691 let request = DataRequest {
692 h: hash_bytes.clone(),
693 htl: BLOB_REQUEST_POLICY.max_htl,
694 q: None,
695 };
696 let wire = crate::encode_request(&request);
697 let wire_len = wire.len() as u64;
698 let current_result_rx = Arc::new(Mutex::new(None));
699 if let Some((data, peer_id)) = run_hedged_waves(
700 ordered_peers.len(),
701 self.request_dispatch,
702 self.request_timeout,
703 |range| {
704 let wave_peers = ordered_peers[range].to_vec();
705 let (result_tx, result_rx) =
706 mpsc::channel::<(String, Instant, Result<Option<Vec<u8>>>)>(wave_peers.len());
707 let current_result_rx = current_result_rx.clone();
708 let hash_hex = hash_hex.to_string();
709 async move {
710 *current_result_rx.lock().await = Some(result_rx);
711 let sent = wave_peers.len();
712 for (peer_id, peer, transport) in wave_peers {
713 if transport != PeerTransport::Bluetooth {
714 self.record_sent(&peer_id, wire_len).await;
715 }
716 self.peer_selector
717 .write()
718 .await
719 .record_request(&peer_id, wire_len);
720
721 let result_tx = result_tx.clone();
722 let peer_id_for_task = peer_id.clone();
723 let peer = peer.clone();
724 let hash_hex = hash_hex.clone();
725 let per_request_timeout = self.request_timeout;
726 tokio::spawn(async move {
727 let started = Instant::now();
728 let result = peer.request(&hash_hex, per_request_timeout).await;
729 let _ = result_tx.send((peer_id_for_task, started, result)).await;
730 });
731 }
732 drop(result_tx);
733 sent
734 }
735 },
736 |wait| {
737 let current_result_rx = current_result_rx.clone();
738 async move {
739 let mut current_result_rx = current_result_rx.lock().await;
740 let Some(result_rx) = current_result_rx.as_mut() else {
741 return HedgedWaveAction::Abort;
742 };
743 let deadline = Instant::now() + wait;
744 loop {
745 let now = Instant::now();
746 if now >= deadline {
747 return HedgedWaveAction::Continue;
748 }
749 let remaining = deadline.saturating_duration_since(now);
750 match tokio::time::timeout(remaining, result_rx.recv()).await {
751 Ok(Some((peer_id, started, Ok(Some(data))))) => {
752 let rtt_ms = started.elapsed().as_millis() as u64;
753 if hashtree_core::sha256(&data) == expected_hash {
754 let should_record = {
755 let peers = self.runtime.peers.read().await;
756 peers
757 .get(&peer_id)
758 .map(|entry| {
759 entry.transport != PeerTransport::Bluetooth
760 })
761 .unwrap_or(true)
762 };
763 if should_record {
764 self.record_received(&peer_id, data.len() as u64).await;
765 }
766 self.peer_selector.write().await.record_success(
767 &peer_id,
768 rtt_ms,
769 data.len() as u64,
770 );
771 return HedgedWaveAction::Success((data, peer_id));
772 }
773 self.peer_selector.write().await.record_failure(&peer_id);
774 }
775 Ok(Some((peer_id, _, Ok(None)))) | Ok(Some((peer_id, _, Err(_)))) => {
776 self.peer_selector.write().await.record_timeout(&peer_id);
777 }
778 Ok(None) | Err(_) => return HedgedWaveAction::Continue,
779 }
780 }
781 }
782 },
783 )
784 .await
785 {
786 debug!(
787 "Got response from peer {} for {}",
788 peer_id,
789 &hash_hex[..8.min(hash_hex.len())]
790 );
791 return Some((data, peer_id));
792 }
793
794 debug!(
795 "No peer had data for {}",
796 &hash_hex[..8.min(hash_hex.len())]
797 );
798 None
799 }
800
801 async fn request_quote_from_peers(
802 &self,
803 hash_bytes: &[u8],
804 requested_mint: String,
805 payment_sat: u64,
806 quote_ttl_ms: u32,
807 ordered_peers: &[ConnectedPeer],
808 ) -> Option<NegotiatedQuote> {
809 if ordered_peers.is_empty() || quote_ttl_ms == 0 {
810 return None;
811 }
812
813 let hash_hex = hex::encode(hash_bytes);
814 let rx = self
815 .cashu_quotes
816 .register_pending_quote(hash_hex.clone(), Some(requested_mint.clone()), payment_sat)
817 .await;
818 let quote_request = DataQuoteRequest {
819 h: hash_bytes.to_vec(),
820 p: payment_sat,
821 t: quote_ttl_ms,
822 m: Some(requested_mint),
823 };
824 let wire = crate::encode_quote_request("e_request);
825 let rx = Arc::new(Mutex::new(rx));
826 let result = run_hedged_waves(
827 ordered_peers.len(),
828 self.request_dispatch,
829 self.request_timeout,
830 |range| {
831 let wave_peers = ordered_peers[range].to_vec();
832 let wire = wire.clone();
833 async move {
834 let mut sent = 0usize;
835 for (_, _, dc) in wave_peers {
836 if dc.send(&bytes::Bytes::copy_from_slice(&wire)).await.is_ok() {
837 sent += 1;
838 }
839 }
840 sent
841 }
842 },
843 |wait| {
844 let rx = rx.clone();
845 async move {
846 let mut rx = rx.lock().await;
847 match tokio::time::timeout(wait, &mut *rx).await {
848 Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
849 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
850 Err(_) => HedgedWaveAction::Continue,
851 }
852 }
853 },
854 )
855 .await;
856
857 self.cashu_quotes.clear_pending_quote(&hash_hex).await;
858 result
859 }
860
861 async fn request_from_single_peer(
862 &self,
863 hash_hex: &str,
864 hash_bytes: &[u8],
865 expected_hash: [u8; 32],
866 target_peer_id: &str,
867 quote: Option<&NegotiatedQuote>,
868 ordered_peers: &[ConnectedPeer],
869 ) -> Option<Vec<u8>> {
870 use crate::BLOB_REQUEST_POLICY;
871
872 let (pending_requests, dc) = ordered_peers
873 .iter()
874 .find(|(peer_id, _, _)| peer_id == target_peer_id)
875 .map(|(_, pending_requests, dc)| (pending_requests.clone(), dc.clone()))?;
876
877 let request = DataRequest {
878 h: hash_bytes.to_vec(),
879 htl: BLOB_REQUEST_POLICY.max_htl,
880 q: quote.map(|quote| quote.quote_id),
881 };
882 let wire = crate::encode_request(&request);
883 let wire_len = wire.len() as u64;
884 let sent_at = Instant::now();
885 let (tx, mut rx) = tokio::sync::oneshot::channel();
886
887 {
888 let mut pending = pending_requests.lock().await;
889 pending.insert(
890 hash_hex.to_string(),
891 if let Some(quote) = quote {
892 PendingRequest::quoted(
893 hash_bytes.to_vec(),
894 tx,
895 quote.quote_id,
896 quote.mint_url.clone().unwrap_or_default(),
897 quote.payment_sat,
898 )
899 } else {
900 PendingRequest::standard(hash_bytes.to_vec(), tx)
901 },
902 );
903 }
904
905 if dc
906 .send(&bytes::Bytes::copy_from_slice(&wire))
907 .await
908 .is_err()
909 {
910 let mut pending = pending_requests.lock().await;
911 pending.remove(hash_hex);
912 self.peer_selector
913 .write()
914 .await
915 .record_failure(target_peer_id);
916 return None;
917 }
918
919 self.record_sent(target_peer_id, wire_len).await;
920 self.peer_selector
921 .write()
922 .await
923 .record_request(target_peer_id, wire_len);
924
925 let wait_timeout = if let Some(quote) = quote {
926 let multiplier = quote.payment_sat.clamp(1, 32) as u128;
927 let extra_ms = self
928 .cashu_quotes
929 .settlement_timeout()
930 .as_millis()
931 .saturating_mul(multiplier);
932 self.request_timeout + Duration::from_millis(extra_ms.min(u64::MAX as u128) as u64)
933 } else {
934 self.request_timeout
935 };
936
937 match tokio::time::timeout(wait_timeout, &mut rx).await {
938 Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == expected_hash => {
939 let rtt_ms = sent_at.elapsed().as_millis() as u64;
940 self.record_received(target_peer_id, data.len() as u64)
941 .await;
942 self.peer_selector.write().await.record_success(
943 target_peer_id,
944 rtt_ms,
945 data.len() as u64,
946 );
947 Some(data)
948 }
949 Ok(Ok(Some(_))) => {
950 self.peer_selector
951 .write()
952 .await
953 .record_failure(target_peer_id);
954 let pending = pending_requests.lock().await.remove(hash_hex);
955 if let Some(pending) = pending {
956 if let Some(quoted) = pending.quoted {
957 if let Some(in_flight) = quoted.in_flight_payment {
958 let _ = self
959 .cashu_quotes
960 .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
961 .await;
962 }
963 }
964 }
965 None
966 }
967 Ok(Ok(None)) | Ok(Err(_)) | Err(_) => {
968 let pending = pending_requests.lock().await.remove(hash_hex);
969 if let Some(pending) = pending {
970 if let Some(quoted) = pending.quoted {
971 if let Some(in_flight) = quoted.in_flight_payment {
972 let _ = self
973 .cashu_quotes
974 .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
975 .await;
976 }
977 }
978 }
979 self.peer_selector
980 .write()
981 .await
982 .record_timeout(target_peer_id);
983 None
984 }
985 }
986 }
987
988 pub async fn resolve_root_from_peers(
990 &self,
991 owner_pubkey: &str,
992 tree_name: &str,
993 per_peer_timeout: Duration,
994 ) -> Option<PeerRootEvent> {
995 let peer_refs: Vec<(String, Arc<dyn MeshSession>)> = {
996 let peers = self.runtime.peers.read().await;
997 peers
998 .values()
999 .filter(|entry| entry.state == ConnectionState::Connected)
1000 .filter(|entry| {
1001 !bluetooth_nostr_only_mode() || entry.transport != PeerTransport::Bluetooth
1002 })
1003 .filter_map(|entry| {
1004 let peer = entry.peer.as_ref()?;
1005 Some((
1006 entry.peer_id.short(),
1007 Arc::new(peer.clone()) as Arc<dyn MeshSession>,
1008 ))
1009 })
1010 .collect()
1011 };
1012
1013 let resolved =
1014 resolve_root_via_peer_sessions(peer_refs, owner_pubkey, tree_name, per_peer_timeout)
1015 .await;
1016 if let Some(root) = &resolved {
1017 debug!(
1018 "Resolved {}/{} via peer {} event {}",
1019 owner_pubkey, tree_name, root.peer_id, root.event_id
1020 );
1021 }
1022 resolved
1023 }
1024
1025 pub async fn resolve_root_from_local_buses_with_source(
1026 &self,
1027 owner_pubkey: &str,
1028 tree_name: &str,
1029 timeout: Duration,
1030 ) -> Option<(&'static str, PeerRootEvent)> {
1031 self.runtime
1032 .resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
1033 .await
1034 }
1035
1036 pub async fn resolve_root_from_local_buses(
1037 &self,
1038 owner_pubkey: &str,
1039 tree_name: &str,
1040 timeout: Duration,
1041 ) -> Option<PeerRootEvent> {
1042 self.resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
1043 .await
1044 .map(|(_, root)| root)
1045 }
1046
1047 pub async fn resolve_root_from_multicast(
1048 &self,
1049 owner_pubkey: &str,
1050 tree_name: &str,
1051 timeout: Duration,
1052 ) -> Option<PeerRootEvent> {
1053 self.resolve_root_from_local_buses(owner_pubkey, tree_name, timeout)
1054 .await
1055 }
1056}
1057
1058impl Default for WebRTCState {
1059 fn default() -> Self {
1060 Self::new()
1061 }
1062}
1063
1064pub struct WebRTCManager {
1066 config: WebRTCConfig,
1067 my_peer_id: PeerId,
1068 keys: Keys,
1069 state: Arc<WebRTCState>,
1070 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
1071 shutdown_rx: tokio::sync::watch::Receiver<bool>,
1072 signaling_tx: mpsc::Sender<SignalingMessage>,
1074 signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
1075 store: Option<Arc<dyn ContentStore>>,
1077 peer_classifier: PeerClassifier,
1079 nostr_relay: Option<SharedMeshRelayClient>,
1081 local_buses: Vec<SharedLocalNostrBus>,
1082 state_event_tx: mpsc::Sender<PeerStateEvent>,
1084 state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
1085 mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
1087 mesh_frame_rx: Option<mpsc::Receiver<(PeerId, MeshNostrFrame)>>,
1088 shared_router: Option<Arc<SharedProductionRouter>>,
1089 seen_frame_ids: Arc<Mutex<TimedSeenSet>>,
1090 seen_event_ids: Arc<Mutex<TimedSeenSet>>,
1091}
1092
1093impl WebRTCManager {
1094 pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
1096 let pubkey = keys.public_key().to_hex();
1097 let my_peer_id = PeerId::new(pubkey);
1098 let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
1099 let (signaling_tx, signaling_rx) = mpsc::channel(100);
1100 let (state_event_tx, state_event_rx) = mpsc::channel(100);
1101 let (mesh_frame_tx, mesh_frame_rx) = mpsc::channel(256);
1102 let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1103 config.request_selection_strategy,
1104 config.request_fairness_enabled,
1105 config.request_dispatch,
1106 Duration::from_millis(config.message_timeout_ms),
1107 CashuRoutingConfig::default(),
1108 None,
1109 None,
1110 ));
1111
1112 let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
1114
1115 Self {
1116 config,
1117 my_peer_id,
1118 keys,
1119 state,
1120 shutdown: Arc::new(shutdown),
1121 shutdown_rx,
1122 signaling_tx,
1123 signaling_rx: Some(signaling_rx),
1124 store: None,
1125 peer_classifier,
1126 nostr_relay: None,
1127 local_buses: Vec::new(),
1128 state_event_tx,
1129 state_event_rx: Some(state_event_rx),
1130 mesh_frame_tx,
1131 mesh_frame_rx: Some(mesh_frame_rx),
1132 shared_router: None,
1133 seen_frame_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1134 SEEN_FRAME_CAP,
1135 SEEN_FRAME_TTL,
1136 ))),
1137 seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1138 SEEN_EVENT_CAP,
1139 SEEN_EVENT_TTL,
1140 ))),
1141 }
1142 }
1143
1144 pub fn new_with_state(keys: Keys, config: WebRTCConfig, state: Arc<WebRTCState>) -> Self {
1146 let mut manager = Self::new(keys, config);
1147 manager.state = state;
1148 manager
1149 }
1150
1151 pub fn new_with_classifier(
1153 keys: Keys,
1154 config: WebRTCConfig,
1155 classifier: PeerClassifier,
1156 ) -> Self {
1157 let mut manager = Self::new(keys, config);
1158 manager.peer_classifier = classifier;
1159 manager
1160 }
1161
1162 pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
1164 let mut manager = Self::new(keys, config);
1165 manager.store = Some(store);
1166 manager
1167 }
1168
1169 pub fn new_with_store_and_classifier(
1171 keys: Keys,
1172 config: WebRTCConfig,
1173 store: Arc<dyn ContentStore>,
1174 classifier: PeerClassifier,
1175 ) -> Self {
1176 Self::new_with_store_and_classifier_and_cashu(
1177 keys,
1178 config,
1179 store,
1180 classifier,
1181 CashuRoutingConfig::default(),
1182 None,
1183 None,
1184 )
1185 }
1186
1187 pub fn new_with_state_and_store_and_classifier(
1188 keys: Keys,
1189 config: WebRTCConfig,
1190 state: Arc<WebRTCState>,
1191 store: Arc<dyn ContentStore>,
1192 classifier: PeerClassifier,
1193 ) -> Self {
1194 let mut manager = Self::new_with_state(keys, config, state);
1195 manager.store = Some(store);
1196 manager.peer_classifier = classifier;
1197 manager
1198 }
1199
1200 pub fn new_with_store_and_classifier_and_cashu(
1201 keys: Keys,
1202 config: WebRTCConfig,
1203 store: Arc<dyn ContentStore>,
1204 classifier: PeerClassifier,
1205 cashu_routing: CashuRoutingConfig,
1206 payment_client: Option<Arc<dyn CashuPaymentClient>>,
1207 mint_metadata: Option<Arc<CashuMintMetadataStore>>,
1208 ) -> Self {
1209 let mut manager = Self::new(keys, config);
1210 manager.state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1211 manager.config.request_selection_strategy,
1212 manager.config.request_fairness_enabled,
1213 manager.config.request_dispatch,
1214 Duration::from_millis(manager.config.message_timeout_ms),
1215 cashu_routing,
1216 payment_client,
1217 mint_metadata,
1218 ));
1219 manager.store = Some(store);
1220 manager.peer_classifier = classifier;
1221 manager
1222 }
1223
1224 pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
1226 self.store = Some(store);
1227 }
1228
1229 pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
1231 self.peer_classifier = classifier;
1232 }
1233
1234 pub fn set_nostr_relay(&mut self, relay: SharedMeshRelayClient) {
1236 self.nostr_relay = Some(relay);
1237 }
1238
1239 pub fn my_peer_id(&self) -> &PeerId {
1241 &self.my_peer_id
1242 }
1243
1244 pub fn state(&self) -> Arc<WebRTCState> {
1246 self.state.clone()
1247 }
1248
1249 pub fn shutdown_signal(&self) -> Arc<tokio::sync::watch::Sender<bool>> {
1251 self.shutdown.clone()
1252 }
1253
1254 pub fn shutdown(&self) {
1256 let _ = self.shutdown.send(true);
1257 }
1258
1259 pub async fn connected_count(&self) -> usize {
1261 self.state
1262 .runtime
1263 .connected_count
1264 .load(std::sync::atomic::Ordering::Relaxed)
1265 }
1266
1267 pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
1269 self.state
1270 .runtime
1271 .peers
1272 .read()
1273 .await
1274 .values()
1275 .map(|p| PeerStatus {
1276 peer_id: p.peer_id.to_string(),
1277 pubkey: p.peer_id.pubkey.clone(),
1278 state: p.state.to_string(),
1279 direction: p.direction,
1280 connected_at: Some(p.last_seen),
1281 pool: p.pool,
1282 })
1283 .collect()
1284 }
1285
1286 pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
1290 let peers = self.state.runtime.peers.read().await;
1291 let mut follows_connected = 0;
1292 let mut follows_active = 0;
1293 let mut other_connected = 0;
1294 let mut other_active = 0;
1295
1296 for entry in peers.values() {
1297 let is_active = entry.state == ConnectionState::Connected
1300 || entry.state == ConnectionState::Connecting;
1301
1302 match entry.pool {
1303 PeerPool::Follows => {
1304 if is_active {
1305 follows_active += 1;
1306 }
1307 if entry.state == ConnectionState::Connected {
1308 follows_connected += 1;
1309 }
1310 }
1311 PeerPool::Other => {
1312 if is_active {
1313 other_active += 1;
1314 }
1315 if entry.state == ConnectionState::Connected {
1316 other_connected += 1;
1317 }
1318 }
1319 }
1320 }
1321
1322 (
1323 follows_connected,
1324 follows_active,
1325 other_connected,
1326 other_active,
1327 )
1328 }
1329
1330 fn local_bus_max_peers(&self, source: &str) -> Option<usize> {
1331 match source {
1332 "multicast" => Some(self.config.multicast.max_peers),
1333 WIFI_AWARE_SOURCE => Some(self.config.wifi_aware.max_peers),
1334 _ => None,
1335 }
1336 }
1337
1338 #[cfg_attr(not(test), allow(dead_code))]
1339 fn can_track_local_bus_peer(
1340 &self,
1341 source: &str,
1342 peer_key: &str,
1343 peers: &HashMap<String, PeerEntry>,
1344 ) -> bool {
1345 can_track_source_peer(source, peer_key, peers, self.local_bus_max_peers(source))
1346 }
1347}
1348
1349mod runtime;