1pub use crate::{ConnectionState, PeerSignalPath, PeerTransport};
13use anyhow::Result;
14use async_trait::async_trait;
15use cashu_service::CashuPaymentClient;
16use nostr_sdk::nostr::Keys;
17#[cfg(test)]
18use nostr_sdk::nostr::Kind;
19use std::collections::{BTreeSet, HashMap};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::{mpsc, Mutex, RwLock};
23use tracing::{debug, error, info, warn};
24
25use crate::bluetooth::{
26 BluetoothConfig, BluetoothMesh, BluetoothPeerRegistrar, BluetoothRuntimeContext,
27};
28use crate::cashu::{CashuMintMetadataStore, CashuQuoteState, CashuRoutingConfig, NegotiatedQuote};
29use crate::local_bus::SharedLocalNostrBus;
30use crate::mesh_session::{
31 resolve_root_from_peer_sessions as resolve_root_via_peer_sessions, MeshSession,
32};
33use crate::mesh_store_core::{
34 run_hedged_waves, sync_selector_peers, HedgedWaveAction, RequestDispatchConfig,
35};
36use crate::multicast::{MulticastConfig, MulticastNostrBus};
37use crate::nostr::NostrRelayTransport;
38use crate::peer::{ContentStore, Peer, PendingRequest};
39use crate::peer_selector::PeerSelector;
40use crate::protocol::{DataQuoteRequest, DataRequest};
41use crate::relay_bridge::SharedMeshRelayClient;
42use crate::root_events::PeerRootEvent;
43use crate::runtime_control::{can_track_source_peer, PeerStateEvent};
44use crate::runtime_peer::{
45 MeshPeerEntry as SharedPeerEntry, PeerClassifier as SharedPeerClassifier,
46};
47use crate::runtime_state::MeshRuntimeState;
48use crate::session::MeshPeer;
49use crate::signaling::MeshRouter;
50use crate::transport::{
51 PeerLink as SharedPeerLink, PeerLinkFactory as SharedPeerLinkFactory,
52 SignalingTransport as SharedSignalingTransport, TransportError as SharedTransportError,
53};
54use crate::types::{
55 validate_mesh_frame, MeshNostrFrame, MeshNostrPayload, SignalingMessage, TimedSeenSet,
56};
57use crate::wifi_aware::{
58 mobile_wifi_aware_bridge, WifiAwareConfig, WifiAwareNostrBus, WIFI_AWARE_SOURCE,
59};
60use crate::{
61 ClassifyRequest as SharedClassifyRequest, IceCandidate as SharedIceCandidate, PeerDirection,
62 PeerId, PeerPool, MESH_SIGNALING_EVENT_KIND,
63};
64
65pub type PeerClassifier = SharedPeerClassifier;
67pub type PeerEntry = SharedPeerEntry<MeshPeer>;
68
69#[derive(Clone)]
70pub struct WebRTCConfig {
71 pub relays: Vec<String>,
72 pub signaling_enabled: bool,
73 pub max_outbound: usize,
74 pub max_inbound: usize,
75 pub hello_interval_ms: u64,
76 pub message_timeout_ms: u64,
77 pub stun_servers: Vec<String>,
78 pub debug: bool,
79 pub multicast: MulticastConfig,
80 pub wifi_aware: WifiAwareConfig,
81 pub bluetooth: BluetoothConfig,
82 pub pools: crate::PoolSettings,
83 pub request_selection_strategy: crate::SelectionStrategy,
84 pub request_fairness_enabled: bool,
85 pub request_dispatch: RequestDispatchConfig,
86}
87
88impl Default for WebRTCConfig {
89 fn default() -> Self {
90 Self {
91 relays: vec![
92 "wss://relay.damus.io".to_string(),
93 "wss://relay.primal.net".to_string(),
94 "wss://temp.iris.to".to_string(),
95 "wss://relay.snort.social".to_string(),
96 ],
97 signaling_enabled: true,
98 max_outbound: 6,
99 max_inbound: 6,
100 hello_interval_ms: 3000,
101 message_timeout_ms: 15000,
102 stun_servers: vec![
103 "stun:stun.iris.to:3478".to_string(),
104 "stun:stun.l.google.com:19302".to_string(),
105 "stun:stun.cloudflare.com:3478".to_string(),
106 ],
107 debug: false,
108 multicast: MulticastConfig::default(),
109 wifi_aware: WifiAwareConfig::default(),
110 bluetooth: BluetoothConfig::default(),
111 pools: crate::PoolSettings::default(),
112 request_selection_strategy: crate::SelectionStrategy::TitForTat,
113 request_fairness_enabled: true,
114 request_dispatch: RequestDispatchConfig {
115 initial_fanout: 2,
116 hedge_fanout: 1,
117 max_fanout: 8,
118 hedge_interval_ms: 120,
119 },
120 }
121 }
122}
123
124#[derive(Debug, Clone)]
125pub struct PeerStatus {
126 pub peer_id: String,
127 pub pubkey: String,
128 pub state: String,
129 pub direction: PeerDirection,
130 pub connected_at: Option<std::time::Instant>,
131 pub pool: PeerPool,
132}
133
134fn bluetooth_nostr_only_mode() -> bool {
135 matches!(
136 std::env::var("HTREE_BLUETOOTH_NOSTR_ONLY").ok().as_deref(),
137 Some("1" | "true" | "TRUE" | "yes" | "YES")
138 )
139}
140
141pub struct WebRTCState {
143 pub runtime: MeshRuntimeState<MeshPeer>,
144 peer_selector: Arc<RwLock<PeerSelector>>,
146 request_dispatch: RequestDispatchConfig,
148 request_timeout: Duration,
150 cashu_quotes: Arc<CashuQuoteState>,
152}
153const SEEN_FRAME_CAP: usize = 4096;
154const SEEN_FRAME_TTL: Duration = Duration::from_secs(120);
155const SEEN_EVENT_CAP: usize = 8192;
156const SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
157
158type PendingRequestsMap = Arc<Mutex<HashMap<String, PendingRequest>>>;
159type ConnectedPeer = (
160 String,
161 PendingRequestsMap,
162 Arc<webrtc::data_channel::RTCDataChannel>,
163);
164type ConnectedSession = (String, MeshPeer, PeerTransport);
165type SharedProductionRouter = MeshRouter<RouterSignalingBridge, SharedRouterPeerFactory>;
166
167#[derive(Clone)]
168struct RouterSignalingBridge {
169 peer_id: String,
170 signaling_tx: mpsc::Sender<SignalingMessage>,
171}
172
173impl RouterSignalingBridge {
174 fn new(peer_id: String, signaling_tx: mpsc::Sender<SignalingMessage>) -> Self {
175 Self {
176 peer_id,
177 signaling_tx,
178 }
179 }
180}
181
182#[async_trait]
183impl SharedSignalingTransport for RouterSignalingBridge {
184 async fn connect(&self, _relays: &[String]) -> Result<(), SharedTransportError> {
185 Ok(())
186 }
187
188 async fn disconnect(&self) {}
189
190 async fn publish(&self, msg: SignalingMessage) -> Result<(), SharedTransportError> {
191 self.signaling_tx
192 .send(msg)
193 .await
194 .map_err(|e| SharedTransportError::SendFailed(e.to_string()))
195 }
196
197 async fn recv(&self) -> Option<SignalingMessage> {
198 None
199 }
200
201 fn try_recv(&self) -> Option<SignalingMessage> {
202 None
203 }
204
205 fn peer_id(&self) -> &str {
206 &self.peer_id
207 }
208}
209
210struct SharedRouterPeerFactory {
211 my_peer_id: PeerId,
212 signaling_tx: mpsc::Sender<SignalingMessage>,
213 stun_servers: Vec<String>,
214 store: Option<Arc<dyn ContentStore>>,
215 state: Arc<WebRTCState>,
216 state_event_tx: mpsc::Sender<PeerStateEvent>,
217 nostr_relay: Option<SharedMeshRelayClient>,
218 mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
219 peer_classifier: PeerClassifier,
220 peers: RwLock<HashMap<String, Arc<Peer>>>,
221}
222
223impl SharedRouterPeerFactory {
224 fn new(
225 my_peer_id: PeerId,
226 signaling_tx: mpsc::Sender<SignalingMessage>,
227 stun_servers: Vec<String>,
228 store: Option<Arc<dyn ContentStore>>,
229 state: Arc<WebRTCState>,
230 state_event_tx: mpsc::Sender<PeerStateEvent>,
231 nostr_relay: Option<SharedMeshRelayClient>,
232 mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
233 peer_classifier: PeerClassifier,
234 ) -> Self {
235 Self {
236 my_peer_id,
237 signaling_tx,
238 stun_servers,
239 store,
240 state,
241 state_event_tx,
242 nostr_relay,
243 mesh_frame_tx,
244 peer_classifier,
245 peers: RwLock::new(HashMap::new()),
246 }
247 }
248
249 async fn register_peer(&self, peer_id: PeerId, direction: PeerDirection, peer: Arc<Peer>) {
250 let peer_key = peer_id.to_string();
251 let pool = (self.peer_classifier)(&peer_id.pubkey);
252 self.peers
253 .write()
254 .await
255 .insert(peer_key.clone(), peer.clone());
256
257 let mut peers = self.state.runtime.peers.write().await;
258 peers.insert(
259 peer_key,
260 PeerEntry {
261 peer_id,
262 direction,
263 state: ConnectionState::Connecting,
264 last_seen: Instant::now(),
265 peer: Some(MeshPeer::WebRtc(peer)),
266 pool,
267 transport: PeerTransport::WebRtc,
268 signal_paths: BTreeSet::from([PeerSignalPath::Relay]),
269 bytes_sent: 0,
270 bytes_received: 0,
271 },
272 );
273 }
274
275 async fn create_peer(
276 &self,
277 peer_id: PeerId,
278 direction: PeerDirection,
279 ) -> Result<Peer, SharedTransportError> {
280 Peer::new_with_store_and_events(
281 peer_id,
282 direction,
283 self.my_peer_id.clone(),
284 self.signaling_tx.clone(),
285 self.stun_servers.clone(),
286 self.store.clone(),
287 Some(self.state_event_tx.clone()),
288 self.nostr_relay.clone(),
289 Some(self.mesh_frame_tx.clone()),
290 Some(self.state.cashu_quotes.clone()),
291 )
292 .await
293 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))
294 }
295}
296
297#[async_trait]
298impl SharedPeerLinkFactory for SharedRouterPeerFactory {
299 async fn create_offer(
300 &self,
301 target_peer_id: &str,
302 ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
303 let target_peer = PeerId::from_string(target_peer_id).ok_or_else(|| {
304 SharedTransportError::ConnectionFailed(format!("invalid peer id {target_peer_id}"))
305 })?;
306 let peer = Arc::new(
307 self.create_peer(target_peer.clone(), PeerDirection::Outbound)
308 .await?,
309 );
310 peer.setup_handlers()
311 .await
312 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
313 let offer = peer
314 .connect()
315 .await
316 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
317 let sdp = offer
318 .get("sdp")
319 .and_then(|value| value.as_str())
320 .ok_or_else(|| {
321 SharedTransportError::ConnectionFailed("missing SDP in CLI peer offer".to_string())
322 })?
323 .to_string();
324 self.register_peer(target_peer, PeerDirection::Outbound, peer.clone())
325 .await;
326 Ok((peer as Arc<dyn SharedPeerLink>, sdp))
327 }
328
329 async fn accept_offer(
330 &self,
331 from_peer_id: &str,
332 offer_sdp: &str,
333 ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
334 let from_peer = PeerId::from_string(from_peer_id).ok_or_else(|| {
335 SharedTransportError::ConnectionFailed(format!("invalid peer id {from_peer_id}"))
336 })?;
337 let peer = Arc::new(
338 self.create_peer(from_peer.clone(), PeerDirection::Inbound)
339 .await?,
340 );
341 peer.setup_handlers()
342 .await
343 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
344 let answer = peer
345 .handle_offer(serde_json::json!({ "type": "offer", "sdp": offer_sdp }))
346 .await
347 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
348 let sdp = answer
349 .get("sdp")
350 .and_then(|value| value.as_str())
351 .ok_or_else(|| {
352 SharedTransportError::ConnectionFailed("missing SDP in CLI peer answer".to_string())
353 })?
354 .to_string();
355 self.register_peer(from_peer, PeerDirection::Inbound, peer.clone())
356 .await;
357 Ok((peer as Arc<dyn SharedPeerLink>, sdp))
358 }
359
360 async fn handle_answer(
361 &self,
362 target_peer_id: &str,
363 answer_sdp: &str,
364 ) -> Result<Arc<dyn SharedPeerLink>, SharedTransportError> {
365 let peer = self
366 .peers
367 .read()
368 .await
369 .get(target_peer_id)
370 .cloned()
371 .ok_or_else(|| {
372 SharedTransportError::ConnectionFailed(format!(
373 "missing outbound peer for {target_peer_id}"
374 ))
375 })?;
376 peer.handle_answer(serde_json::json!({ "type": "answer", "sdp": answer_sdp }))
377 .await
378 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
379 Ok(peer as Arc<dyn SharedPeerLink>)
380 }
381
382 async fn handle_candidate(
383 &self,
384 peer_id: &str,
385 candidate: SharedIceCandidate,
386 ) -> Result<(), SharedTransportError> {
387 let peer = self.peers.read().await.get(peer_id).cloned();
388 if let Some(peer) = peer {
389 peer.handle_candidate(serde_json::json!({
390 "candidate": candidate.candidate,
391 "sdpMLineIndex": candidate.sdp_m_line_index,
392 "sdpMid": candidate.sdp_mid,
393 }))
394 .await
395 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
396 }
397 Ok(())
398 }
399
400 async fn remove_peer(&self, peer_id: &str) -> Result<(), SharedTransportError> {
401 self.peers.write().await.remove(peer_id);
402 Ok(())
403 }
404}
405
406impl WebRTCState {
407 pub fn new() -> Self {
408 let cfg = WebRTCConfig::default();
409 Self::new_with_routing_and_cashu(
410 cfg.request_selection_strategy,
411 cfg.request_fairness_enabled,
412 cfg.request_dispatch,
413 Duration::from_millis(cfg.message_timeout_ms),
414 CashuRoutingConfig::default(),
415 None,
416 None,
417 )
418 }
419
420 pub fn new_with_routing(
421 selection_strategy: crate::SelectionStrategy,
422 fairness_enabled: bool,
423 request_dispatch: RequestDispatchConfig,
424 ) -> Self {
425 let cfg = WebRTCConfig::default();
426 Self::new_with_routing_and_cashu(
427 selection_strategy,
428 fairness_enabled,
429 request_dispatch,
430 Duration::from_millis(cfg.message_timeout_ms),
431 CashuRoutingConfig::default(),
432 None,
433 None,
434 )
435 }
436
437 pub fn new_with_routing_and_cashu(
438 selection_strategy: crate::SelectionStrategy,
439 fairness_enabled: bool,
440 request_dispatch: RequestDispatchConfig,
441 request_timeout: Duration,
442 cashu_routing: CashuRoutingConfig,
443 payment_client: Option<Arc<dyn CashuPaymentClient>>,
444 mint_metadata: Option<Arc<CashuMintMetadataStore>>,
445 ) -> Self {
446 let mut selector = PeerSelector::with_strategy(selection_strategy);
447 selector.set_fairness(fairness_enabled);
448 let peer_selector = Arc::new(RwLock::new(selector));
449 let cashu_quotes = Arc::new(if let Some(mint_metadata) = mint_metadata {
450 CashuQuoteState::new_with_mint_metadata(
451 cashu_routing,
452 peer_selector.clone(),
453 payment_client,
454 mint_metadata,
455 )
456 } else {
457 CashuQuoteState::new(cashu_routing, peer_selector.clone(), payment_client)
458 });
459 Self {
460 runtime: MeshRuntimeState::new(),
461 peer_selector,
462 request_dispatch,
463 request_timeout,
464 cashu_quotes,
465 }
466 }
467
468 pub async fn set_local_buses(&self, buses: Vec<SharedLocalNostrBus>) {
469 self.runtime.set_local_buses(buses).await;
470 }
471
472 pub async fn add_local_bus(&self, bus: SharedLocalNostrBus) {
473 self.runtime.add_local_bus(bus).await;
474 }
475
476 pub async fn set_multicast_bus(&self, bus: Option<Arc<MulticastNostrBus>>) {
477 let buses = bus
478 .into_iter()
479 .map(|bus| bus as SharedLocalNostrBus)
480 .collect();
481 self.set_local_buses(buses).await;
482 }
483
484 pub async fn reset_runtime_state(&self) {
487 self.runtime.reset().await;
488 }
489
490 pub fn get_bandwidth(&self) -> (u64, u64) {
492 self.runtime.get_bandwidth()
493 }
494
495 pub fn get_mesh_stats(&self) -> (u64, u64, u64) {
496 self.runtime.get_mesh_stats()
497 }
498
499 pub fn record_mesh_received(&self) {
500 self.runtime.record_mesh_received();
501 }
502
503 pub fn record_mesh_forwarded(&self, count: u64) {
504 self.runtime.record_mesh_forwarded(count);
505 }
506
507 pub fn record_mesh_duplicate_drop(&self) {
508 self.runtime.record_mesh_duplicate_drop();
509 }
510
511 pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
513 self.runtime.record_sent(peer_id, bytes).await;
514 }
515
516 pub async fn record_received(&self, peer_id: &str, bytes: u64) {
518 self.runtime.record_received(peer_id, bytes).await;
519 }
520
521 pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
525 self.request_from_peers_with_source(hash_hex)
526 .await
527 .map(|(data, _peer_id)| data)
528 }
529
530 pub async fn request_from_peers_with_source(
532 &self,
533 hash_hex: &str,
534 ) -> Option<(Vec<u8>, String)> {
535 use crate::BLOB_REQUEST_POLICY;
536
537 let peers = self.runtime.peers.read().await;
538
539 let peer_refs: Vec<_> = peers
540 .values()
541 .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
542 .filter_map(|p| {
543 p.peer
544 .clone()
545 .map(|peer| (p.peer_id.to_string(), peer, p.transport))
546 })
547 .collect();
548
549 drop(peers); let mut connected_peers: Vec<ConnectedPeer> = Vec::new();
552 let mut connected_sessions: Vec<ConnectedSession> = Vec::new();
553 for (peer_id, peer, transport) in peer_refs {
554 if !peer.is_ready() {
555 continue;
556 }
557 if bluetooth_nostr_only_mode() && transport == PeerTransport::Bluetooth {
558 continue;
559 }
560 if let Some(webrtc_peer) = peer.as_webrtc() {
561 let dc_guard = webrtc_peer.data_channel.lock().await;
562 if let Some(dc) = dc_guard.as_ref() {
563 connected_peers.push((
564 peer_id.clone(),
565 webrtc_peer.pending_requests.clone(),
566 dc.clone(),
567 ));
568 }
569 }
570 connected_sessions.push((peer_id, peer, transport));
571 }
572
573 if connected_sessions.is_empty() {
574 debug!(
575 "No connected peers to query for {}",
576 &hash_hex[..8.min(hash_hex.len())]
577 );
578 return None;
579 }
580
581 let hash_bytes = match hex::decode(hash_hex) {
583 Ok(b) => b,
584 Err(_) => return None,
585 };
586
587 let expected_hash: [u8; 32] = match hash_bytes.as_slice().try_into() {
588 Ok(h) => h,
589 Err(_) => {
590 debug!(
591 "Invalid hash length {}, expected 32 bytes",
592 hash_bytes.len()
593 );
594 return None;
595 }
596 };
597
598 let connected_peer_ids: Vec<String> = connected_sessions
599 .iter()
600 .map(|(peer_id, _, _)| peer_id.clone())
601 .collect();
602 sync_selector_peers(self.peer_selector.as_ref(), &connected_peer_ids).await;
603
604 let ordered_peer_ids = self.peer_selector.write().await.select_peers();
605 let mut quote_by_peer: HashMap<
606 String,
607 (
608 PendingRequestsMap,
609 Arc<webrtc::data_channel::RTCDataChannel>,
610 ),
611 > = connected_peers
612 .iter()
613 .cloned()
614 .map(|(peer_id, pending, dc)| (peer_id, (pending, dc)))
615 .collect();
616 let mut ordered_quote_peers: Vec<ConnectedPeer> = Vec::new();
617 for peer_id in &ordered_peer_ids {
618 if let Some((pending, dc)) = quote_by_peer.remove(peer_id) {
619 ordered_quote_peers.push((peer_id.clone(), pending, dc));
620 }
621 }
622 for (peer_id, (pending, dc)) in quote_by_peer {
623 ordered_quote_peers.push((peer_id, pending, dc));
624 }
625
626 let mut by_peer: HashMap<String, (MeshPeer, PeerTransport)> = connected_sessions
627 .into_iter()
628 .map(|(peer_id, peer, transport)| (peer_id, (peer, transport)))
629 .collect();
630
631 let mut ordered_peers: Vec<ConnectedSession> = Vec::new();
632 for peer_id in ordered_peer_ids {
633 if let Some((peer, transport)) = by_peer.remove(&peer_id) {
634 ordered_peers.push((peer_id, peer, transport));
635 }
636 }
637 for (peer_id, (peer, transport)) in by_peer {
638 ordered_peers.push((peer_id, peer, transport));
639 }
640
641 debug!(
642 "Querying {} peers for {} with shared hedged scheduler",
643 ordered_peers.len(),
644 &hash_hex[..8.min(hash_hex.len())],
645 );
646
647 if let Some((requested_mint, payment_sat, quote_ttl_ms)) =
648 self.cashu_quotes.requester_quote_terms().await
649 {
650 if let Some(quote) = self
651 .request_quote_from_peers(
652 &hash_bytes,
653 requested_mint,
654 payment_sat,
655 quote_ttl_ms,
656 &ordered_quote_peers,
657 )
658 .await
659 {
660 if let Some(data) = self
661 .request_from_single_peer(
662 hash_hex,
663 &hash_bytes,
664 expected_hash,
665 "e.peer_id,
666 Some("e),
667 &ordered_quote_peers,
668 )
669 .await
670 {
671 debug!(
672 "Got quoted response from peer {} for {}",
673 quote.peer_id,
674 &hash_hex[..8.min(hash_hex.len())]
675 );
676 return Some((data, quote.peer_id));
677 }
678 }
679 }
680
681 let request = DataRequest {
682 h: hash_bytes.clone(),
683 htl: BLOB_REQUEST_POLICY.max_htl,
684 q: None,
685 };
686 let wire = crate::encode_request(&request);
687 let wire_len = wire.len() as u64;
688 let current_result_rx = Arc::new(Mutex::new(None));
689 if let Some((data, peer_id)) = run_hedged_waves(
690 ordered_peers.len(),
691 self.request_dispatch,
692 self.request_timeout,
693 |range| {
694 let wave_peers = ordered_peers[range].to_vec();
695 let (result_tx, result_rx) =
696 mpsc::channel::<(String, Instant, Result<Option<Vec<u8>>>)>(wave_peers.len());
697 let current_result_rx = current_result_rx.clone();
698 let hash_hex = hash_hex.to_string();
699 async move {
700 *current_result_rx.lock().await = Some(result_rx);
701 let sent = wave_peers.len();
702 for (peer_id, peer, transport) in wave_peers {
703 if transport != PeerTransport::Bluetooth {
704 self.record_sent(&peer_id, wire_len).await;
705 }
706 self.peer_selector
707 .write()
708 .await
709 .record_request(&peer_id, wire_len);
710
711 let result_tx = result_tx.clone();
712 let peer_id_for_task = peer_id.clone();
713 let peer = peer.clone();
714 let hash_hex = hash_hex.clone();
715 let per_request_timeout = self.request_timeout;
716 tokio::spawn(async move {
717 let started = Instant::now();
718 let result = peer.request(&hash_hex, per_request_timeout).await;
719 let _ = result_tx.send((peer_id_for_task, started, result)).await;
720 });
721 }
722 drop(result_tx);
723 sent
724 }
725 },
726 |wait| {
727 let current_result_rx = current_result_rx.clone();
728 async move {
729 let mut current_result_rx = current_result_rx.lock().await;
730 let Some(result_rx) = current_result_rx.as_mut() else {
731 return HedgedWaveAction::Abort;
732 };
733 let deadline = Instant::now() + wait;
734 loop {
735 let now = Instant::now();
736 if now >= deadline {
737 return HedgedWaveAction::Continue;
738 }
739 let remaining = deadline.saturating_duration_since(now);
740 match tokio::time::timeout(remaining, result_rx.recv()).await {
741 Ok(Some((peer_id, started, Ok(Some(data))))) => {
742 let rtt_ms = started.elapsed().as_millis() as u64;
743 if hashtree_core::sha256(&data) == expected_hash {
744 let should_record = {
745 let peers = self.runtime.peers.read().await;
746 peers
747 .get(&peer_id)
748 .map(|entry| {
749 entry.transport != PeerTransport::Bluetooth
750 })
751 .unwrap_or(true)
752 };
753 if should_record {
754 self.record_received(&peer_id, data.len() as u64).await;
755 }
756 self.peer_selector.write().await.record_success(
757 &peer_id,
758 rtt_ms,
759 data.len() as u64,
760 );
761 return HedgedWaveAction::Success((data, peer_id));
762 }
763 self.peer_selector.write().await.record_failure(&peer_id);
764 }
765 Ok(Some((peer_id, _, Ok(None)))) | Ok(Some((peer_id, _, Err(_)))) => {
766 self.peer_selector.write().await.record_timeout(&peer_id);
767 }
768 Ok(None) | Err(_) => return HedgedWaveAction::Continue,
769 }
770 }
771 }
772 },
773 )
774 .await
775 {
776 debug!(
777 "Got response from peer {} for {}",
778 peer_id,
779 &hash_hex[..8.min(hash_hex.len())]
780 );
781 return Some((data, peer_id));
782 }
783
784 debug!(
785 "No peer had data for {}",
786 &hash_hex[..8.min(hash_hex.len())]
787 );
788 None
789 }
790
791 async fn request_quote_from_peers(
792 &self,
793 hash_bytes: &[u8],
794 requested_mint: String,
795 payment_sat: u64,
796 quote_ttl_ms: u32,
797 ordered_peers: &[ConnectedPeer],
798 ) -> Option<NegotiatedQuote> {
799 if ordered_peers.is_empty() || quote_ttl_ms == 0 {
800 return None;
801 }
802
803 let hash_hex = hex::encode(hash_bytes);
804 let rx = self
805 .cashu_quotes
806 .register_pending_quote(hash_hex.clone(), Some(requested_mint.clone()), payment_sat)
807 .await;
808 let quote_request = DataQuoteRequest {
809 h: hash_bytes.to_vec(),
810 p: payment_sat,
811 t: quote_ttl_ms,
812 m: Some(requested_mint),
813 };
814 let wire = crate::encode_quote_request("e_request);
815 let rx = Arc::new(Mutex::new(rx));
816 let result = run_hedged_waves(
817 ordered_peers.len(),
818 self.request_dispatch,
819 self.request_timeout,
820 |range| {
821 let wave_peers = ordered_peers[range].to_vec();
822 let wire = wire.clone();
823 async move {
824 let mut sent = 0usize;
825 for (_, _, dc) in wave_peers {
826 if dc.send(&bytes::Bytes::copy_from_slice(&wire)).await.is_ok() {
827 sent += 1;
828 }
829 }
830 sent
831 }
832 },
833 |wait| {
834 let rx = rx.clone();
835 async move {
836 let mut rx = rx.lock().await;
837 match tokio::time::timeout(wait, &mut *rx).await {
838 Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
839 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
840 Err(_) => HedgedWaveAction::Continue,
841 }
842 }
843 },
844 )
845 .await;
846
847 self.cashu_quotes.clear_pending_quote(&hash_hex).await;
848 result
849 }
850
851 async fn request_from_single_peer(
852 &self,
853 hash_hex: &str,
854 hash_bytes: &[u8],
855 expected_hash: [u8; 32],
856 target_peer_id: &str,
857 quote: Option<&NegotiatedQuote>,
858 ordered_peers: &[ConnectedPeer],
859 ) -> Option<Vec<u8>> {
860 use crate::BLOB_REQUEST_POLICY;
861
862 let (pending_requests, dc) = ordered_peers
863 .iter()
864 .find(|(peer_id, _, _)| peer_id == target_peer_id)
865 .map(|(_, pending_requests, dc)| (pending_requests.clone(), dc.clone()))?;
866
867 let request = DataRequest {
868 h: hash_bytes.to_vec(),
869 htl: BLOB_REQUEST_POLICY.max_htl,
870 q: quote.map(|quote| quote.quote_id),
871 };
872 let wire = crate::encode_request(&request);
873 let wire_len = wire.len() as u64;
874 let sent_at = Instant::now();
875 let (tx, mut rx) = tokio::sync::oneshot::channel();
876
877 {
878 let mut pending = pending_requests.lock().await;
879 pending.insert(
880 hash_hex.to_string(),
881 if let Some(quote) = quote {
882 PendingRequest::quoted(
883 hash_bytes.to_vec(),
884 tx,
885 quote.quote_id,
886 quote.mint_url.clone().unwrap_or_default(),
887 quote.payment_sat,
888 )
889 } else {
890 PendingRequest::standard(hash_bytes.to_vec(), tx)
891 },
892 );
893 }
894
895 if dc
896 .send(&bytes::Bytes::copy_from_slice(&wire))
897 .await
898 .is_err()
899 {
900 let mut pending = pending_requests.lock().await;
901 pending.remove(hash_hex);
902 self.peer_selector
903 .write()
904 .await
905 .record_failure(target_peer_id);
906 return None;
907 }
908
909 self.record_sent(target_peer_id, wire_len).await;
910 self.peer_selector
911 .write()
912 .await
913 .record_request(target_peer_id, wire_len);
914
915 let wait_timeout = if let Some(quote) = quote {
916 let multiplier = quote.payment_sat.clamp(1, 32) as u128;
917 let extra_ms = self
918 .cashu_quotes
919 .settlement_timeout()
920 .as_millis()
921 .saturating_mul(multiplier);
922 self.request_timeout + Duration::from_millis(extra_ms.min(u64::MAX as u128) as u64)
923 } else {
924 self.request_timeout
925 };
926
927 match tokio::time::timeout(wait_timeout, &mut rx).await {
928 Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == expected_hash => {
929 let rtt_ms = sent_at.elapsed().as_millis() as u64;
930 self.record_received(target_peer_id, data.len() as u64)
931 .await;
932 self.peer_selector.write().await.record_success(
933 target_peer_id,
934 rtt_ms,
935 data.len() as u64,
936 );
937 Some(data)
938 }
939 Ok(Ok(Some(_))) => {
940 self.peer_selector
941 .write()
942 .await
943 .record_failure(target_peer_id);
944 let pending = pending_requests.lock().await.remove(hash_hex);
945 if let Some(pending) = pending {
946 if let Some(quoted) = pending.quoted {
947 if let Some(in_flight) = quoted.in_flight_payment {
948 let _ = self
949 .cashu_quotes
950 .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
951 .await;
952 }
953 }
954 }
955 None
956 }
957 Ok(Ok(None)) | Ok(Err(_)) | Err(_) => {
958 let pending = pending_requests.lock().await.remove(hash_hex);
959 if let Some(pending) = pending {
960 if let Some(quoted) = pending.quoted {
961 if let Some(in_flight) = quoted.in_flight_payment {
962 let _ = self
963 .cashu_quotes
964 .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
965 .await;
966 }
967 }
968 }
969 self.peer_selector
970 .write()
971 .await
972 .record_timeout(target_peer_id);
973 None
974 }
975 }
976 }
977
978 pub async fn resolve_root_from_peers(
980 &self,
981 owner_pubkey: &str,
982 tree_name: &str,
983 per_peer_timeout: Duration,
984 ) -> Option<PeerRootEvent> {
985 let peer_refs: Vec<(String, Arc<dyn MeshSession>)> = {
986 let peers = self.runtime.peers.read().await;
987 peers
988 .values()
989 .filter(|entry| entry.state == ConnectionState::Connected)
990 .filter(|entry| {
991 !bluetooth_nostr_only_mode() || entry.transport != PeerTransport::Bluetooth
992 })
993 .filter_map(|entry| {
994 let peer = entry.peer.as_ref()?;
995 Some((
996 entry.peer_id.short(),
997 Arc::new(peer.clone()) as Arc<dyn MeshSession>,
998 ))
999 })
1000 .collect()
1001 };
1002
1003 let resolved =
1004 resolve_root_via_peer_sessions(peer_refs, owner_pubkey, tree_name, per_peer_timeout)
1005 .await;
1006 if let Some(root) = &resolved {
1007 debug!(
1008 "Resolved {}/{} via peer {} event {}",
1009 owner_pubkey, tree_name, root.peer_id, root.event_id
1010 );
1011 }
1012 resolved
1013 }
1014
1015 pub async fn resolve_root_from_local_buses_with_source(
1016 &self,
1017 owner_pubkey: &str,
1018 tree_name: &str,
1019 timeout: Duration,
1020 ) -> Option<(&'static str, PeerRootEvent)> {
1021 self.runtime
1022 .resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
1023 .await
1024 }
1025
1026 pub async fn resolve_root_from_local_buses(
1027 &self,
1028 owner_pubkey: &str,
1029 tree_name: &str,
1030 timeout: Duration,
1031 ) -> Option<PeerRootEvent> {
1032 self.resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
1033 .await
1034 .map(|(_, root)| root)
1035 }
1036
1037 pub async fn resolve_root_from_multicast(
1038 &self,
1039 owner_pubkey: &str,
1040 tree_name: &str,
1041 timeout: Duration,
1042 ) -> Option<PeerRootEvent> {
1043 self.resolve_root_from_local_buses(owner_pubkey, tree_name, timeout)
1044 .await
1045 }
1046}
1047
1048impl Default for WebRTCState {
1049 fn default() -> Self {
1050 Self::new()
1051 }
1052}
1053
1054pub struct WebRTCManager {
1056 config: WebRTCConfig,
1057 my_peer_id: PeerId,
1058 keys: Keys,
1059 state: Arc<WebRTCState>,
1060 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
1061 shutdown_rx: tokio::sync::watch::Receiver<bool>,
1062 signaling_tx: mpsc::Sender<SignalingMessage>,
1064 signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
1065 store: Option<Arc<dyn ContentStore>>,
1067 peer_classifier: PeerClassifier,
1069 nostr_relay: Option<SharedMeshRelayClient>,
1071 local_buses: Vec<SharedLocalNostrBus>,
1072 state_event_tx: mpsc::Sender<PeerStateEvent>,
1074 state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
1075 mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
1077 mesh_frame_rx: Option<mpsc::Receiver<(PeerId, MeshNostrFrame)>>,
1078 shared_router: Option<Arc<SharedProductionRouter>>,
1079 seen_frame_ids: Arc<Mutex<TimedSeenSet>>,
1080 seen_event_ids: Arc<Mutex<TimedSeenSet>>,
1081}
1082
1083impl WebRTCManager {
1084 pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
1086 let pubkey = keys.public_key().to_hex();
1087 let my_peer_id = PeerId::new(pubkey);
1088 let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
1089 let (signaling_tx, signaling_rx) = mpsc::channel(100);
1090 let (state_event_tx, state_event_rx) = mpsc::channel(100);
1091 let (mesh_frame_tx, mesh_frame_rx) = mpsc::channel(256);
1092 let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1093 config.request_selection_strategy,
1094 config.request_fairness_enabled,
1095 config.request_dispatch,
1096 Duration::from_millis(config.message_timeout_ms),
1097 CashuRoutingConfig::default(),
1098 None,
1099 None,
1100 ));
1101
1102 let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
1104
1105 Self {
1106 config,
1107 my_peer_id,
1108 keys,
1109 state,
1110 shutdown: Arc::new(shutdown),
1111 shutdown_rx,
1112 signaling_tx,
1113 signaling_rx: Some(signaling_rx),
1114 store: None,
1115 peer_classifier,
1116 nostr_relay: None,
1117 local_buses: Vec::new(),
1118 state_event_tx,
1119 state_event_rx: Some(state_event_rx),
1120 mesh_frame_tx,
1121 mesh_frame_rx: Some(mesh_frame_rx),
1122 shared_router: None,
1123 seen_frame_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1124 SEEN_FRAME_CAP,
1125 SEEN_FRAME_TTL,
1126 ))),
1127 seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1128 SEEN_EVENT_CAP,
1129 SEEN_EVENT_TTL,
1130 ))),
1131 }
1132 }
1133
1134 pub fn new_with_state(keys: Keys, config: WebRTCConfig, state: Arc<WebRTCState>) -> Self {
1136 let mut manager = Self::new(keys, config);
1137 manager.state = state;
1138 manager
1139 }
1140
1141 pub fn new_with_classifier(
1143 keys: Keys,
1144 config: WebRTCConfig,
1145 classifier: PeerClassifier,
1146 ) -> Self {
1147 let mut manager = Self::new(keys, config);
1148 manager.peer_classifier = classifier;
1149 manager
1150 }
1151
1152 pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
1154 let mut manager = Self::new(keys, config);
1155 manager.store = Some(store);
1156 manager
1157 }
1158
1159 pub fn new_with_store_and_classifier(
1161 keys: Keys,
1162 config: WebRTCConfig,
1163 store: Arc<dyn ContentStore>,
1164 classifier: PeerClassifier,
1165 ) -> Self {
1166 Self::new_with_store_and_classifier_and_cashu(
1167 keys,
1168 config,
1169 store,
1170 classifier,
1171 CashuRoutingConfig::default(),
1172 None,
1173 None,
1174 )
1175 }
1176
1177 pub fn new_with_state_and_store_and_classifier(
1178 keys: Keys,
1179 config: WebRTCConfig,
1180 state: Arc<WebRTCState>,
1181 store: Arc<dyn ContentStore>,
1182 classifier: PeerClassifier,
1183 ) -> Self {
1184 let mut manager = Self::new_with_state(keys, config, state);
1185 manager.store = Some(store);
1186 manager.peer_classifier = classifier;
1187 manager
1188 }
1189
1190 pub fn new_with_store_and_classifier_and_cashu(
1191 keys: Keys,
1192 config: WebRTCConfig,
1193 store: Arc<dyn ContentStore>,
1194 classifier: PeerClassifier,
1195 cashu_routing: CashuRoutingConfig,
1196 payment_client: Option<Arc<dyn CashuPaymentClient>>,
1197 mint_metadata: Option<Arc<CashuMintMetadataStore>>,
1198 ) -> Self {
1199 let mut manager = Self::new(keys, config);
1200 manager.state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1201 manager.config.request_selection_strategy,
1202 manager.config.request_fairness_enabled,
1203 manager.config.request_dispatch,
1204 Duration::from_millis(manager.config.message_timeout_ms),
1205 cashu_routing,
1206 payment_client,
1207 mint_metadata,
1208 ));
1209 manager.store = Some(store);
1210 manager.peer_classifier = classifier;
1211 manager
1212 }
1213
1214 pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
1216 self.store = Some(store);
1217 }
1218
1219 pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
1221 self.peer_classifier = classifier;
1222 }
1223
1224 pub fn set_nostr_relay(&mut self, relay: SharedMeshRelayClient) {
1226 self.nostr_relay = Some(relay);
1227 }
1228
1229 pub fn my_peer_id(&self) -> &PeerId {
1231 &self.my_peer_id
1232 }
1233
1234 pub fn state(&self) -> Arc<WebRTCState> {
1236 self.state.clone()
1237 }
1238
1239 pub fn shutdown_signal(&self) -> Arc<tokio::sync::watch::Sender<bool>> {
1241 self.shutdown.clone()
1242 }
1243
1244 pub fn shutdown(&self) {
1246 let _ = self.shutdown.send(true);
1247 }
1248
1249 pub async fn connected_count(&self) -> usize {
1251 self.state
1252 .runtime
1253 .connected_count
1254 .load(std::sync::atomic::Ordering::Relaxed)
1255 }
1256
1257 pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
1259 self.state
1260 .runtime
1261 .peers
1262 .read()
1263 .await
1264 .values()
1265 .map(|p| PeerStatus {
1266 peer_id: p.peer_id.to_string(),
1267 pubkey: p.peer_id.pubkey.clone(),
1268 state: p.state.to_string(),
1269 direction: p.direction,
1270 connected_at: Some(p.last_seen),
1271 pool: p.pool,
1272 })
1273 .collect()
1274 }
1275
1276 pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
1280 let peers = self.state.runtime.peers.read().await;
1281 let mut follows_connected = 0;
1282 let mut follows_active = 0;
1283 let mut other_connected = 0;
1284 let mut other_active = 0;
1285
1286 for entry in peers.values() {
1287 let is_active = entry.state == ConnectionState::Connected
1290 || entry.state == ConnectionState::Connecting;
1291
1292 match entry.pool {
1293 PeerPool::Follows => {
1294 if is_active {
1295 follows_active += 1;
1296 }
1297 if entry.state == ConnectionState::Connected {
1298 follows_connected += 1;
1299 }
1300 }
1301 PeerPool::Other => {
1302 if is_active {
1303 other_active += 1;
1304 }
1305 if entry.state == ConnectionState::Connected {
1306 other_connected += 1;
1307 }
1308 }
1309 }
1310 }
1311
1312 (
1313 follows_connected,
1314 follows_active,
1315 other_connected,
1316 other_active,
1317 )
1318 }
1319
1320 fn local_bus_max_peers(&self, source: &str) -> Option<usize> {
1321 match source {
1322 "multicast" => Some(self.config.multicast.max_peers),
1323 WIFI_AWARE_SOURCE => Some(self.config.wifi_aware.max_peers),
1324 _ => None,
1325 }
1326 }
1327
1328 #[cfg_attr(not(test), allow(dead_code))]
1329 fn can_track_local_bus_peer(
1330 &self,
1331 source: &str,
1332 peer_key: &str,
1333 peers: &HashMap<String, PeerEntry>,
1334 ) -> bool {
1335 can_track_source_peer(source, peer_key, peers, self.local_bus_max_peers(source))
1336 }
1337}
1338
1339mod runtime;