1pub use crate::{ConnectionState, PeerSignalPath, PeerTransport};
13use anyhow::Result;
14use async_trait::async_trait;
15use cashu_service::CashuPaymentClient;
16#[cfg(test)]
17use nostr_sdk::nostr::Kind;
18use nostr_sdk::nostr::{Event, Keys};
19use std::collections::{BTreeSet, HashMap};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::{mpsc, Mutex, RwLock};
23use tracing::{debug, error, info, warn};
24
25use crate::bluetooth::{
26 BluetoothConfig, BluetoothMesh, BluetoothPeerRegistrar, BluetoothRuntimeContext,
27};
28use crate::cashu::{CashuMintMetadataStore, CashuQuoteState, CashuRoutingConfig, NegotiatedQuote};
29use crate::local_bus::SharedLocalNostrBus;
30use crate::mesh_session::{
31 resolve_root_from_peer_sessions as resolve_root_via_peer_sessions, MeshSession,
32};
33use crate::mesh_store_core::{
34 run_hedged_waves, sync_selector_peers, HedgedWaveAction, RequestDispatchConfig,
35};
36use crate::multicast::{MulticastConfig, MulticastNostrBus};
37use crate::nostr::NostrRelayTransport;
38use crate::peer::{ContentStore, Peer, PendingRequest};
39use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectorSummary};
40use crate::protocol::{DataQuoteRequest, DataRequest};
41use crate::relay_bridge::SharedMeshRelayClient;
42use crate::root_events::PeerRootEvent;
43use crate::runtime_control::{can_track_source_peer, PeerStateEvent};
44use crate::runtime_peer::{
45 MeshPeerEntry as SharedPeerEntry, PeerClassifier as SharedPeerClassifier,
46};
47use crate::runtime_state::MeshRuntimeState;
48use crate::session::MeshPeer;
49use crate::signaling::MeshRouter;
50use crate::transport::{
51 PeerLink as SharedPeerLink, PeerLinkFactory as SharedPeerLinkFactory,
52 SignalingTransport as SharedSignalingTransport, TransportError as SharedTransportError,
53};
54use crate::types::{
55 validate_mesh_frame, KnownPeerRecord, KnownPeerSnapshot, MeshNostrFrame, MeshNostrPayload,
56 SignalingMessage, TimedSeenSet,
57};
58use crate::wifi_aware::{
59 mobile_wifi_aware_bridge, WifiAwareConfig, WifiAwareNostrBus, WIFI_AWARE_SOURCE,
60};
61use crate::{
62 ClassifyRequest as SharedClassifyRequest, IceCandidate as SharedIceCandidate, PeerDirection,
63 PeerId, PeerPool, MESH_SIGNALING_EVENT_KIND,
64};
65
66pub type PeerClassifier = SharedPeerClassifier;
68pub type PeerEntry = SharedPeerEntry<MeshPeer>;
69
70#[derive(Clone)]
71pub struct WebRTCConfig {
72 pub relays: Vec<String>,
73 pub signaling_enabled: bool,
74 pub hash_get_enabled: bool,
75 pub signal_urls: Vec<String>,
76 pub max_outbound: usize,
77 pub max_inbound: usize,
78 pub hello_interval_ms: u64,
79 pub message_timeout_ms: u64,
80 pub stun_servers: Vec<String>,
81 pub debug: bool,
82 pub multicast: MulticastConfig,
83 pub wifi_aware: WifiAwareConfig,
84 pub bluetooth: BluetoothConfig,
85 pub pools: crate::PoolSettings,
86 pub request_selection_strategy: crate::SelectionStrategy,
87 pub request_fairness_enabled: bool,
88 pub request_dispatch: RequestDispatchConfig,
89}
90
91impl Default for WebRTCConfig {
92 fn default() -> Self {
93 Self {
94 relays: vec![
95 "wss://relay.damus.io".to_string(),
96 "wss://relay.primal.net".to_string(),
97 "wss://temp.iris.to".to_string(),
98 "wss://relay.snort.social".to_string(),
99 ],
100 signaling_enabled: true,
101 hash_get_enabled: true,
102 signal_urls: Vec::new(),
103 max_outbound: 6,
104 max_inbound: 6,
105 hello_interval_ms: 3000,
106 message_timeout_ms: 15000,
107 stun_servers: vec![
108 "stun:stun.iris.to:3478".to_string(),
109 "stun:stun.l.google.com:19302".to_string(),
110 "stun:stun.cloudflare.com:3478".to_string(),
111 ],
112 debug: false,
113 multicast: MulticastConfig::default(),
114 wifi_aware: WifiAwareConfig::default(),
115 bluetooth: BluetoothConfig::default(),
116 pools: crate::PoolSettings::default(),
117 request_selection_strategy: crate::SelectionStrategy::Weighted,
118 request_fairness_enabled: true,
119 request_dispatch: RequestDispatchConfig {
120 initial_fanout: 2,
121 hedge_fanout: 1,
122 max_fanout: 8,
123 hedge_interval_ms: 120,
124 },
125 }
126 }
127}
128
129#[derive(Debug, Clone)]
130pub struct PeerStatus {
131 pub peer_id: String,
132 pub pubkey: String,
133 pub state: String,
134 pub direction: PeerDirection,
135 pub connected_at: Option<std::time::Instant>,
136 pub pool: PeerPool,
137}
138
139fn bluetooth_nostr_only_mode() -> bool {
140 matches!(
141 std::env::var("HTREE_BLUETOOTH_NOSTR_ONLY").ok().as_deref(),
142 Some("1" | "true" | "TRUE" | "yes" | "YES")
143 )
144}
145
146pub struct WebRTCState {
148 pub runtime: MeshRuntimeState<MeshPeer>,
149 peer_selector: Arc<RwLock<PeerSelector>>,
151 direct_signaling_tx: RwLock<Option<mpsc::Sender<(String, Event)>>>,
152 request_dispatch: RequestDispatchConfig,
154 request_timeout: Duration,
156 cashu_quotes: Arc<CashuQuoteState>,
158}
159const SEEN_FRAME_CAP: usize = 4096;
160const SEEN_FRAME_TTL: Duration = Duration::from_secs(120);
161const SEEN_EVENT_CAP: usize = 8192;
162const SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
163
164type PendingRequestsMap = Arc<Mutex<HashMap<String, PendingRequest>>>;
165type ConnectedPeer = (
166 String,
167 PendingRequestsMap,
168 Arc<webrtc::data_channel::RTCDataChannel>,
169);
170type ConnectedSession = (String, MeshPeer, PeerTransport);
171type SharedProductionRouter = MeshRouter<RouterSignalingBridge, SharedRouterPeerFactory>;
172
173#[derive(Clone)]
174struct RouterSignalingBridge {
175 peer_id: String,
176 signaling_tx: mpsc::Sender<SignalingMessage>,
177}
178
179impl RouterSignalingBridge {
180 fn new(peer_id: String, signaling_tx: mpsc::Sender<SignalingMessage>) -> Self {
181 Self {
182 peer_id,
183 signaling_tx,
184 }
185 }
186}
187
188#[async_trait]
189impl SharedSignalingTransport for RouterSignalingBridge {
190 async fn connect(&self, _relays: &[String]) -> Result<(), SharedTransportError> {
191 Ok(())
192 }
193
194 async fn disconnect(&self) {}
195
196 async fn publish(&self, msg: SignalingMessage) -> Result<(), SharedTransportError> {
197 self.signaling_tx
198 .send(msg)
199 .await
200 .map_err(|e| SharedTransportError::SendFailed(e.to_string()))
201 }
202
203 async fn recv(&self) -> Option<SignalingMessage> {
204 None
205 }
206
207 fn try_recv(&self) -> Option<SignalingMessage> {
208 None
209 }
210
211 fn peer_id(&self) -> &str {
212 &self.peer_id
213 }
214}
215
216struct SharedRouterPeerFactory {
217 my_peer_id: PeerId,
218 signaling_tx: mpsc::Sender<SignalingMessage>,
219 stun_servers: Vec<String>,
220 store: Option<Arc<dyn ContentStore>>,
221 state: Arc<WebRTCState>,
222 state_event_tx: mpsc::Sender<PeerStateEvent>,
223 nostr_relay: Option<SharedMeshRelayClient>,
224 mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
225 signal_urls: Vec<String>,
226 peer_classifier: PeerClassifier,
227 peers: RwLock<HashMap<String, Arc<Peer>>>,
228}
229
230impl SharedRouterPeerFactory {
231 #[allow(clippy::too_many_arguments)]
232 fn new(
233 my_peer_id: PeerId,
234 signaling_tx: mpsc::Sender<SignalingMessage>,
235 stun_servers: Vec<String>,
236 store: Option<Arc<dyn ContentStore>>,
237 state: Arc<WebRTCState>,
238 state_event_tx: mpsc::Sender<PeerStateEvent>,
239 nostr_relay: Option<SharedMeshRelayClient>,
240 mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
241 signal_urls: Vec<String>,
242 peer_classifier: PeerClassifier,
243 ) -> Self {
244 Self {
245 my_peer_id,
246 signaling_tx,
247 stun_servers,
248 store,
249 state,
250 state_event_tx,
251 nostr_relay,
252 mesh_frame_tx,
253 signal_urls,
254 peer_classifier,
255 peers: RwLock::new(HashMap::new()),
256 }
257 }
258
259 async fn register_peer(&self, peer_id: PeerId, direction: PeerDirection, peer: Arc<Peer>) {
260 let peer_key = peer_id.to_string();
261 let pool = (self.peer_classifier)(&peer_id.pubkey);
262 self.peers
263 .write()
264 .await
265 .insert(peer_key.clone(), peer.clone());
266
267 let mut peers = self.state.runtime.peers.write().await;
268 peers.insert(
269 peer_key,
270 PeerEntry {
271 peer_id,
272 direction,
273 state: ConnectionState::Connecting,
274 last_seen: Instant::now(),
275 peer: Some(MeshPeer::WebRtc(peer)),
276 pool,
277 transport: PeerTransport::WebRtc,
278 signal_paths: BTreeSet::from([PeerSignalPath::Relay]),
279 bytes_sent: 0,
280 bytes_received: 0,
281 },
282 );
283 }
284
285 async fn create_peer(
286 &self,
287 peer_id: PeerId,
288 direction: PeerDirection,
289 ) -> Result<Peer, SharedTransportError> {
290 let mut peer = Peer::new_with_store_and_events(
291 peer_id,
292 direction,
293 self.my_peer_id.clone(),
294 self.signaling_tx.clone(),
295 self.stun_servers.clone(),
296 self.store.clone(),
297 Some(self.state_event_tx.clone()),
298 self.nostr_relay.clone(),
299 Some(self.mesh_frame_tx.clone()),
300 Some(self.state.cashu_quotes.clone()),
301 )
302 .await
303 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
304 peer.set_signal_urls(self.signal_urls.clone());
305 Ok(peer)
306 }
307}
308
309#[async_trait]
310impl SharedPeerLinkFactory for SharedRouterPeerFactory {
311 async fn create_offer(
312 &self,
313 target_peer_id: &str,
314 ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
315 let target_peer = PeerId::from_string(target_peer_id).ok_or_else(|| {
316 SharedTransportError::ConnectionFailed(format!("invalid peer id {target_peer_id}"))
317 })?;
318 let peer = Arc::new(
319 self.create_peer(target_peer.clone(), PeerDirection::Outbound)
320 .await?,
321 );
322 peer.setup_handlers()
323 .await
324 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
325 let offer = peer
326 .connect()
327 .await
328 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
329 let sdp = offer
330 .get("sdp")
331 .and_then(|value| value.as_str())
332 .ok_or_else(|| {
333 SharedTransportError::ConnectionFailed("missing SDP in CLI peer offer".to_string())
334 })?
335 .to_string();
336 self.register_peer(target_peer, PeerDirection::Outbound, peer.clone())
337 .await;
338 Ok((peer as Arc<dyn SharedPeerLink>, sdp))
339 }
340
341 async fn accept_offer(
342 &self,
343 from_peer_id: &str,
344 offer_sdp: &str,
345 ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
346 let from_peer = PeerId::from_string(from_peer_id).ok_or_else(|| {
347 SharedTransportError::ConnectionFailed(format!("invalid peer id {from_peer_id}"))
348 })?;
349 let peer = Arc::new(
350 self.create_peer(from_peer.clone(), PeerDirection::Inbound)
351 .await?,
352 );
353 peer.setup_handlers()
354 .await
355 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
356 let answer = peer
357 .handle_offer(serde_json::json!({ "type": "offer", "sdp": offer_sdp }))
358 .await
359 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
360 let sdp = answer
361 .get("sdp")
362 .and_then(|value| value.as_str())
363 .ok_or_else(|| {
364 SharedTransportError::ConnectionFailed("missing SDP in CLI peer answer".to_string())
365 })?
366 .to_string();
367 self.register_peer(from_peer, PeerDirection::Inbound, peer.clone())
368 .await;
369 Ok((peer as Arc<dyn SharedPeerLink>, sdp))
370 }
371
372 async fn handle_answer(
373 &self,
374 target_peer_id: &str,
375 answer_sdp: &str,
376 ) -> Result<Arc<dyn SharedPeerLink>, SharedTransportError> {
377 let peer = self
378 .peers
379 .read()
380 .await
381 .get(target_peer_id)
382 .cloned()
383 .ok_or_else(|| {
384 SharedTransportError::ConnectionFailed(format!(
385 "missing outbound peer for {target_peer_id}"
386 ))
387 })?;
388 peer.handle_answer(serde_json::json!({ "type": "answer", "sdp": answer_sdp }))
389 .await
390 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
391 Ok(peer as Arc<dyn SharedPeerLink>)
392 }
393
394 async fn handle_candidate(
395 &self,
396 peer_id: &str,
397 candidate: SharedIceCandidate,
398 ) -> Result<(), SharedTransportError> {
399 let peer = self.peers.read().await.get(peer_id).cloned();
400 if let Some(peer) = peer {
401 peer.handle_candidate(serde_json::json!({
402 "candidate": candidate.candidate,
403 "sdpMLineIndex": candidate.sdp_m_line_index,
404 "sdpMid": candidate.sdp_mid,
405 }))
406 .await
407 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
408 }
409 Ok(())
410 }
411
412 async fn remove_peer(&self, peer_id: &str) -> Result<(), SharedTransportError> {
413 self.peers.write().await.remove(peer_id);
414 Ok(())
415 }
416}
417
418impl WebRTCState {
419 pub fn new() -> Self {
420 let cfg = WebRTCConfig::default();
421 Self::new_with_routing_and_cashu(
422 cfg.request_selection_strategy,
423 cfg.request_fairness_enabled,
424 cfg.request_dispatch,
425 Duration::from_millis(cfg.message_timeout_ms),
426 CashuRoutingConfig::default(),
427 None,
428 None,
429 )
430 }
431
432 pub fn new_with_routing(
433 selection_strategy: crate::SelectionStrategy,
434 fairness_enabled: bool,
435 request_dispatch: RequestDispatchConfig,
436 ) -> Self {
437 let cfg = WebRTCConfig::default();
438 Self::new_with_routing_and_cashu(
439 selection_strategy,
440 fairness_enabled,
441 request_dispatch,
442 Duration::from_millis(cfg.message_timeout_ms),
443 CashuRoutingConfig::default(),
444 None,
445 None,
446 )
447 }
448
449 pub fn new_with_routing_and_cashu(
450 selection_strategy: crate::SelectionStrategy,
451 fairness_enabled: bool,
452 request_dispatch: RequestDispatchConfig,
453 request_timeout: Duration,
454 cashu_routing: CashuRoutingConfig,
455 payment_client: Option<Arc<dyn CashuPaymentClient>>,
456 mint_metadata: Option<Arc<CashuMintMetadataStore>>,
457 ) -> Self {
458 let mut selector = PeerSelector::with_strategy(selection_strategy);
459 selector.set_fairness(fairness_enabled);
460 let peer_selector = Arc::new(RwLock::new(selector));
461 let cashu_quotes = Arc::new(if let Some(mint_metadata) = mint_metadata {
462 CashuQuoteState::new_with_mint_metadata(
463 cashu_routing,
464 peer_selector.clone(),
465 payment_client,
466 mint_metadata,
467 )
468 } else {
469 CashuQuoteState::new(cashu_routing, peer_selector.clone(), payment_client)
470 });
471 Self {
472 runtime: MeshRuntimeState::new(),
473 peer_selector,
474 direct_signaling_tx: RwLock::new(None),
475 request_dispatch,
476 request_timeout,
477 cashu_quotes,
478 }
479 }
480
481 pub async fn set_local_buses(&self, buses: Vec<SharedLocalNostrBus>) {
482 self.runtime.set_local_buses(buses).await;
483 }
484
485 pub async fn add_local_bus(&self, bus: SharedLocalNostrBus) {
486 self.runtime.add_local_bus(bus).await;
487 }
488
489 pub async fn set_multicast_bus(&self, bus: Option<Arc<MulticastNostrBus>>) {
490 let buses = bus
491 .into_iter()
492 .map(|bus| bus as SharedLocalNostrBus)
493 .collect();
494 self.set_local_buses(buses).await;
495 }
496
497 pub async fn reset_runtime_state(&self) {
500 self.runtime.reset().await;
501 }
502
503 pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
504 self.peer_selector
505 .read()
506 .await
507 .export_peer_metadata_snapshot()
508 }
509
510 pub async fn import_peer_metadata_snapshot(&self, snapshot: &PeerMetadataSnapshot) {
511 self.peer_selector
512 .write()
513 .await
514 .import_peer_metadata_snapshot(snapshot);
515 }
516
517 pub async fn selector_summary(&self) -> SelectorSummary {
518 self.peer_selector.read().await.summary()
519 }
520
521 pub async fn known_peer_snapshot(&self) -> KnownPeerSnapshot {
522 self.runtime.known_peer_snapshot().await
523 }
524
525 pub async fn import_known_peer_snapshot(&self, snapshot: &KnownPeerSnapshot) {
526 self.runtime.import_known_peer_snapshot(snapshot).await;
527 }
528
529 pub async fn ordered_known_peers(&self) -> Vec<KnownPeerRecord> {
530 let snapshot = self.runtime.known_peer_snapshot().await;
531 let mut by_peer = snapshot
532 .peers
533 .into_iter()
534 .map(|peer| (peer.peer_id.clone(), peer))
535 .collect::<HashMap<_, _>>();
536
537 let ordered_ids = {
538 let mut selector = self.peer_selector.write().await;
539 for peer_id in by_peer.keys() {
540 selector.add_peer(peer_id);
541 }
542 selector.select_peers()
543 };
544
545 let mut ordered = Vec::new();
546 for peer_id in ordered_ids {
547 if let Some(peer) = by_peer.remove(&peer_id) {
548 ordered.push(peer);
549 }
550 }
551 let mut remaining = by_peer.into_values().collect::<Vec<_>>();
552 remaining.sort_by(|a, b| a.peer_id.cmp(&b.peer_id));
553 ordered.extend(remaining);
554 ordered
555 }
556
557 pub async fn set_direct_signaling_sender(&self, tx: Option<mpsc::Sender<(String, Event)>>) {
558 *self.direct_signaling_tx.write().await = tx;
559 }
560
561 pub async fn submit_direct_signaling_event(&self, source: String, event: Event) -> bool {
562 let tx = self.direct_signaling_tx.read().await.clone();
563 let Some(tx) = tx else {
564 return false;
565 };
566 tx.send((source, event)).await.is_ok()
567 }
568
569 pub fn get_bandwidth(&self) -> (u64, u64) {
571 self.runtime.get_bandwidth()
572 }
573
574 pub fn get_mesh_stats(&self) -> (u64, u64, u64) {
575 self.runtime.get_mesh_stats()
576 }
577
578 pub fn record_mesh_received(&self) {
579 self.runtime.record_mesh_received();
580 }
581
582 pub fn record_mesh_forwarded(&self, count: u64) {
583 self.runtime.record_mesh_forwarded(count);
584 }
585
586 pub fn record_mesh_duplicate_drop(&self) {
587 self.runtime.record_mesh_duplicate_drop();
588 }
589
590 pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
592 self.runtime.record_sent(peer_id, bytes).await;
593 }
594
595 pub async fn record_received(&self, peer_id: &str, bytes: u64) {
597 self.runtime.record_received(peer_id, bytes).await;
598 }
599
600 pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
604 self.request_from_peers_with_source(hash_hex)
605 .await
606 .map(|(data, _peer_id)| data)
607 }
608
609 pub async fn request_from_peers_with_source(
611 &self,
612 hash_hex: &str,
613 ) -> Option<(Vec<u8>, String)> {
614 use crate::BLOB_REQUEST_POLICY;
615
616 let peer_hash_get = self.runtime.peer_hash_get_snapshot().await;
617 let peers = self.runtime.peers.read().await;
618
619 let peer_refs: Vec<_> = peers
620 .values()
621 .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
622 .filter_map(|p| {
623 if !peer_hash_get
624 .get(&p.peer_id.to_string())
625 .copied()
626 .unwrap_or(true)
627 {
628 return None;
629 }
630 p.peer
631 .clone()
632 .map(|peer| (p.peer_id.to_string(), peer, p.transport))
633 })
634 .collect();
635
636 drop(peers); let mut connected_peers: Vec<ConnectedPeer> = Vec::new();
639 let mut connected_sessions: Vec<ConnectedSession> = Vec::new();
640 for (peer_id, peer, transport) in peer_refs {
641 if !peer.is_ready() {
642 continue;
643 }
644 if bluetooth_nostr_only_mode() && transport == PeerTransport::Bluetooth {
645 continue;
646 }
647 if let Some(webrtc_peer) = peer.as_webrtc() {
648 let dc_guard = webrtc_peer.data_channel.lock().await;
649 if let Some(dc) = dc_guard.as_ref() {
650 connected_peers.push((
651 peer_id.clone(),
652 webrtc_peer.pending_requests.clone(),
653 dc.clone(),
654 ));
655 }
656 }
657 connected_sessions.push((peer_id, peer, transport));
658 }
659
660 if connected_sessions.is_empty() {
661 debug!(
662 "No connected peers to query for {}",
663 &hash_hex[..8.min(hash_hex.len())]
664 );
665 return None;
666 }
667
668 let hash_bytes = match hex::decode(hash_hex) {
670 Ok(b) => b,
671 Err(_) => return None,
672 };
673
674 let expected_hash: [u8; 32] = match hash_bytes.as_slice().try_into() {
675 Ok(h) => h,
676 Err(_) => {
677 debug!(
678 "Invalid hash length {}, expected 32 bytes",
679 hash_bytes.len()
680 );
681 return None;
682 }
683 };
684
685 let connected_peer_ids: Vec<String> = connected_sessions
686 .iter()
687 .map(|(peer_id, _, _)| peer_id.clone())
688 .collect();
689 sync_selector_peers(self.peer_selector.as_ref(), &connected_peer_ids).await;
690
691 let ordered_peer_ids = self.peer_selector.write().await.select_peers();
692 let mut quote_by_peer: HashMap<
693 String,
694 (
695 PendingRequestsMap,
696 Arc<webrtc::data_channel::RTCDataChannel>,
697 ),
698 > = connected_peers
699 .iter()
700 .cloned()
701 .map(|(peer_id, pending, dc)| (peer_id, (pending, dc)))
702 .collect();
703 let mut ordered_quote_peers: Vec<ConnectedPeer> = Vec::new();
704 for peer_id in &ordered_peer_ids {
705 if let Some((pending, dc)) = quote_by_peer.remove(peer_id) {
706 ordered_quote_peers.push((peer_id.clone(), pending, dc));
707 }
708 }
709 for (peer_id, (pending, dc)) in quote_by_peer {
710 ordered_quote_peers.push((peer_id, pending, dc));
711 }
712
713 let mut by_peer: HashMap<String, (MeshPeer, PeerTransport)> = connected_sessions
714 .into_iter()
715 .map(|(peer_id, peer, transport)| (peer_id, (peer, transport)))
716 .collect();
717
718 let mut ordered_peers: Vec<ConnectedSession> = Vec::new();
719 for peer_id in ordered_peer_ids {
720 if let Some((peer, transport)) = by_peer.remove(&peer_id) {
721 ordered_peers.push((peer_id, peer, transport));
722 }
723 }
724 for (peer_id, (peer, transport)) in by_peer {
725 ordered_peers.push((peer_id, peer, transport));
726 }
727
728 debug!(
729 "Querying {} peers for {} with shared hedged scheduler",
730 ordered_peers.len(),
731 &hash_hex[..8.min(hash_hex.len())],
732 );
733
734 if let Some((requested_mint, payment_sat, quote_ttl_ms)) =
735 self.cashu_quotes.requester_quote_terms().await
736 {
737 if let Some(quote) = self
738 .request_quote_from_peers(
739 &hash_bytes,
740 requested_mint,
741 payment_sat,
742 quote_ttl_ms,
743 &ordered_quote_peers,
744 )
745 .await
746 {
747 if let Some(data) = self
748 .request_from_single_peer(
749 hash_hex,
750 &hash_bytes,
751 expected_hash,
752 "e.peer_id,
753 Some("e),
754 &ordered_quote_peers,
755 )
756 .await
757 {
758 debug!(
759 "Got quoted response from peer {} for {}",
760 quote.peer_id,
761 &hash_hex[..8.min(hash_hex.len())]
762 );
763 return Some((data, quote.peer_id));
764 }
765 }
766 }
767
768 let request = DataRequest {
769 h: hash_bytes.clone(),
770 htl: BLOB_REQUEST_POLICY.max_htl,
771 q: None,
772 };
773 let wire = crate::encode_request(&request);
774 let wire_len = wire.len() as u64;
775 let current_result_rx = Arc::new(Mutex::new(None));
776 if let Some((data, peer_id)) = run_hedged_waves(
777 ordered_peers.len(),
778 self.request_dispatch,
779 self.request_timeout,
780 |range| {
781 let wave_peers = ordered_peers[range].to_vec();
782 let (result_tx, result_rx) =
783 mpsc::channel::<(String, Instant, Result<Option<Vec<u8>>>)>(wave_peers.len());
784 let current_result_rx = current_result_rx.clone();
785 let hash_hex = hash_hex.to_string();
786 async move {
787 *current_result_rx.lock().await = Some(result_rx);
788 let sent = wave_peers.len();
789 for (peer_id, peer, transport) in wave_peers {
790 if transport != PeerTransport::Bluetooth {
791 self.record_sent(&peer_id, wire_len).await;
792 }
793 self.peer_selector
794 .write()
795 .await
796 .record_request(&peer_id, wire_len);
797
798 let result_tx = result_tx.clone();
799 let peer_id_for_task = peer_id.clone();
800 let peer = peer.clone();
801 let hash_hex = hash_hex.clone();
802 let per_request_timeout = self.request_timeout;
803 tokio::spawn(async move {
804 let started = Instant::now();
805 let result = peer.request(&hash_hex, per_request_timeout).await;
806 let _ = result_tx.send((peer_id_for_task, started, result)).await;
807 });
808 }
809 drop(result_tx);
810 sent
811 }
812 },
813 |wait| {
814 let current_result_rx = current_result_rx.clone();
815 async move {
816 let mut current_result_rx = current_result_rx.lock().await;
817 let Some(result_rx) = current_result_rx.as_mut() else {
818 return HedgedWaveAction::Abort;
819 };
820 let deadline = Instant::now() + wait;
821 loop {
822 let now = Instant::now();
823 if now >= deadline {
824 return HedgedWaveAction::Continue;
825 }
826 let remaining = deadline.saturating_duration_since(now);
827 match tokio::time::timeout(remaining, result_rx.recv()).await {
828 Ok(Some((peer_id, started, Ok(Some(data))))) => {
829 let rtt_ms = started.elapsed().as_millis() as u64;
830 if hashtree_core::sha256(&data) == expected_hash {
831 let should_record = {
832 let peers = self.runtime.peers.read().await;
833 peers
834 .get(&peer_id)
835 .map(|entry| {
836 entry.transport != PeerTransport::Bluetooth
837 })
838 .unwrap_or(true)
839 };
840 if should_record {
841 self.record_received(&peer_id, data.len() as u64).await;
842 }
843 self.peer_selector.write().await.record_success(
844 &peer_id,
845 rtt_ms,
846 data.len() as u64,
847 );
848 return HedgedWaveAction::Success((data, peer_id));
849 }
850 self.peer_selector.write().await.record_failure(&peer_id);
851 }
852 Ok(Some((peer_id, _, Ok(None)))) | Ok(Some((peer_id, _, Err(_)))) => {
853 self.peer_selector.write().await.record_timeout(&peer_id);
854 }
855 Ok(None) | Err(_) => return HedgedWaveAction::Continue,
856 }
857 }
858 }
859 },
860 )
861 .await
862 {
863 debug!(
864 "Got response from peer {} for {}",
865 peer_id,
866 &hash_hex[..8.min(hash_hex.len())]
867 );
868 return Some((data, peer_id));
869 }
870
871 debug!(
872 "No peer had data for {}",
873 &hash_hex[..8.min(hash_hex.len())]
874 );
875 None
876 }
877
878 async fn request_quote_from_peers(
879 &self,
880 hash_bytes: &[u8],
881 requested_mint: String,
882 payment_sat: u64,
883 quote_ttl_ms: u32,
884 ordered_peers: &[ConnectedPeer],
885 ) -> Option<NegotiatedQuote> {
886 if ordered_peers.is_empty() || quote_ttl_ms == 0 {
887 return None;
888 }
889
890 let hash_hex = hex::encode(hash_bytes);
891 let rx = self
892 .cashu_quotes
893 .register_pending_quote(hash_hex.clone(), Some(requested_mint.clone()), payment_sat)
894 .await;
895 let quote_request = DataQuoteRequest {
896 h: hash_bytes.to_vec(),
897 p: payment_sat,
898 t: quote_ttl_ms,
899 m: Some(requested_mint),
900 };
901 let wire = crate::encode_quote_request("e_request);
902 let rx = Arc::new(Mutex::new(rx));
903 let result = run_hedged_waves(
904 ordered_peers.len(),
905 self.request_dispatch,
906 self.request_timeout,
907 |range| {
908 let wave_peers = ordered_peers[range].to_vec();
909 let wire = wire.clone();
910 async move {
911 let mut sent = 0usize;
912 for (_, _, dc) in wave_peers {
913 if dc.send(&bytes::Bytes::copy_from_slice(&wire)).await.is_ok() {
914 sent += 1;
915 }
916 }
917 sent
918 }
919 },
920 |wait| {
921 let rx = rx.clone();
922 async move {
923 let mut rx = rx.lock().await;
924 match tokio::time::timeout(wait, &mut *rx).await {
925 Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
926 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
927 Err(_) => HedgedWaveAction::Continue,
928 }
929 }
930 },
931 )
932 .await;
933
934 self.cashu_quotes.clear_pending_quote(&hash_hex).await;
935 result
936 }
937
938 async fn request_from_single_peer(
939 &self,
940 hash_hex: &str,
941 hash_bytes: &[u8],
942 expected_hash: [u8; 32],
943 target_peer_id: &str,
944 quote: Option<&NegotiatedQuote>,
945 ordered_peers: &[ConnectedPeer],
946 ) -> Option<Vec<u8>> {
947 use crate::BLOB_REQUEST_POLICY;
948
949 let (pending_requests, dc) = ordered_peers
950 .iter()
951 .find(|(peer_id, _, _)| peer_id == target_peer_id)
952 .map(|(_, pending_requests, dc)| (pending_requests.clone(), dc.clone()))?;
953
954 let request = DataRequest {
955 h: hash_bytes.to_vec(),
956 htl: BLOB_REQUEST_POLICY.max_htl,
957 q: quote.map(|quote| quote.quote_id),
958 };
959 let wire = crate::encode_request(&request);
960 let wire_len = wire.len() as u64;
961 let sent_at = Instant::now();
962 let (tx, mut rx) = tokio::sync::oneshot::channel();
963
964 {
965 let mut pending = pending_requests.lock().await;
966 pending.insert(
967 hash_hex.to_string(),
968 if let Some(quote) = quote {
969 PendingRequest::quoted(
970 hash_bytes.to_vec(),
971 tx,
972 quote.quote_id,
973 quote.mint_url.clone().unwrap_or_default(),
974 quote.payment_sat,
975 )
976 } else {
977 PendingRequest::standard(hash_bytes.to_vec(), tx)
978 },
979 );
980 }
981
982 if dc
983 .send(&bytes::Bytes::copy_from_slice(&wire))
984 .await
985 .is_err()
986 {
987 let mut pending = pending_requests.lock().await;
988 pending.remove(hash_hex);
989 self.peer_selector
990 .write()
991 .await
992 .record_failure(target_peer_id);
993 return None;
994 }
995
996 self.record_sent(target_peer_id, wire_len).await;
997 self.peer_selector
998 .write()
999 .await
1000 .record_request(target_peer_id, wire_len);
1001
1002 let wait_timeout = if let Some(quote) = quote {
1003 let multiplier = quote.payment_sat.clamp(1, 32) as u128;
1004 let extra_ms = self
1005 .cashu_quotes
1006 .settlement_timeout()
1007 .as_millis()
1008 .saturating_mul(multiplier);
1009 self.request_timeout + Duration::from_millis(extra_ms.min(u64::MAX as u128) as u64)
1010 } else {
1011 self.request_timeout
1012 };
1013
1014 match tokio::time::timeout(wait_timeout, &mut rx).await {
1015 Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == expected_hash => {
1016 let rtt_ms = sent_at.elapsed().as_millis() as u64;
1017 self.record_received(target_peer_id, data.len() as u64)
1018 .await;
1019 self.peer_selector.write().await.record_success(
1020 target_peer_id,
1021 rtt_ms,
1022 data.len() as u64,
1023 );
1024 Some(data)
1025 }
1026 Ok(Ok(Some(_))) => {
1027 self.peer_selector
1028 .write()
1029 .await
1030 .record_failure(target_peer_id);
1031 let pending = pending_requests.lock().await.remove(hash_hex);
1032 if let Some(pending) = pending {
1033 if let Some(quoted) = pending.quoted {
1034 if let Some(in_flight) = quoted.in_flight_payment {
1035 let _ = self
1036 .cashu_quotes
1037 .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
1038 .await;
1039 }
1040 }
1041 }
1042 None
1043 }
1044 Ok(Ok(None)) | Ok(Err(_)) | Err(_) => {
1045 let pending = pending_requests.lock().await.remove(hash_hex);
1046 if let Some(pending) = pending {
1047 if let Some(quoted) = pending.quoted {
1048 if let Some(in_flight) = quoted.in_flight_payment {
1049 let _ = self
1050 .cashu_quotes
1051 .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
1052 .await;
1053 }
1054 }
1055 }
1056 self.peer_selector
1057 .write()
1058 .await
1059 .record_timeout(target_peer_id);
1060 None
1061 }
1062 }
1063 }
1064
1065 pub async fn resolve_root_from_peers(
1067 &self,
1068 owner_pubkey: &str,
1069 tree_name: &str,
1070 per_peer_timeout: Duration,
1071 ) -> Option<PeerRootEvent> {
1072 let peer_refs: Vec<(String, Arc<dyn MeshSession>)> = {
1073 let peers = self.runtime.peers.read().await;
1074 peers
1075 .values()
1076 .filter(|entry| entry.state == ConnectionState::Connected)
1077 .filter(|entry| {
1078 !bluetooth_nostr_only_mode() || entry.transport != PeerTransport::Bluetooth
1079 })
1080 .filter_map(|entry| {
1081 let peer = entry.peer.as_ref()?;
1082 Some((
1083 entry.peer_id.short(),
1084 Arc::new(peer.clone()) as Arc<dyn MeshSession>,
1085 ))
1086 })
1087 .collect()
1088 };
1089
1090 let resolved =
1091 resolve_root_via_peer_sessions(peer_refs, owner_pubkey, tree_name, per_peer_timeout)
1092 .await;
1093 if let Some(root) = &resolved {
1094 debug!(
1095 "Resolved {}/{} via peer {} event {}",
1096 owner_pubkey, tree_name, root.peer_id, root.event_id
1097 );
1098 }
1099 resolved
1100 }
1101
1102 pub async fn resolve_root_from_local_buses_with_source(
1103 &self,
1104 owner_pubkey: &str,
1105 tree_name: &str,
1106 timeout: Duration,
1107 ) -> Option<(&'static str, PeerRootEvent)> {
1108 self.runtime
1109 .resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
1110 .await
1111 }
1112
1113 pub async fn resolve_root_from_local_buses(
1114 &self,
1115 owner_pubkey: &str,
1116 tree_name: &str,
1117 timeout: Duration,
1118 ) -> Option<PeerRootEvent> {
1119 self.resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
1120 .await
1121 .map(|(_, root)| root)
1122 }
1123
1124 pub async fn resolve_root_from_multicast(
1125 &self,
1126 owner_pubkey: &str,
1127 tree_name: &str,
1128 timeout: Duration,
1129 ) -> Option<PeerRootEvent> {
1130 self.resolve_root_from_local_buses(owner_pubkey, tree_name, timeout)
1131 .await
1132 }
1133}
1134
1135impl Default for WebRTCState {
1136 fn default() -> Self {
1137 Self::new()
1138 }
1139}
1140
1141pub struct WebRTCManager {
1143 config: WebRTCConfig,
1144 my_peer_id: PeerId,
1145 keys: Keys,
1146 state: Arc<WebRTCState>,
1147 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
1148 shutdown_rx: tokio::sync::watch::Receiver<bool>,
1149 signaling_tx: mpsc::Sender<SignalingMessage>,
1151 signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
1152 store: Option<Arc<dyn ContentStore>>,
1154 peer_classifier: PeerClassifier,
1156 nostr_relay: Option<SharedMeshRelayClient>,
1158 local_buses: Vec<SharedLocalNostrBus>,
1159 state_event_tx: mpsc::Sender<PeerStateEvent>,
1161 state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
1162 mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
1164 mesh_frame_rx: Option<mpsc::Receiver<(PeerId, MeshNostrFrame)>>,
1165 shared_router: Option<Arc<SharedProductionRouter>>,
1166 seen_frame_ids: Arc<Mutex<TimedSeenSet>>,
1167 seen_event_ids: Arc<Mutex<TimedSeenSet>>,
1168}
1169
1170impl WebRTCManager {
1171 pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
1173 let pubkey = keys.public_key().to_hex();
1174 let my_peer_id = PeerId::new(pubkey);
1175 let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
1176 let (signaling_tx, signaling_rx) = mpsc::channel(100);
1177 let (state_event_tx, state_event_rx) = mpsc::channel(100);
1178 let (mesh_frame_tx, mesh_frame_rx) = mpsc::channel(256);
1179 let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1180 config.request_selection_strategy,
1181 config.request_fairness_enabled,
1182 config.request_dispatch,
1183 Duration::from_millis(config.message_timeout_ms),
1184 CashuRoutingConfig::default(),
1185 None,
1186 None,
1187 ));
1188
1189 let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
1191
1192 Self {
1193 config,
1194 my_peer_id,
1195 keys,
1196 state,
1197 shutdown: Arc::new(shutdown),
1198 shutdown_rx,
1199 signaling_tx,
1200 signaling_rx: Some(signaling_rx),
1201 store: None,
1202 peer_classifier,
1203 nostr_relay: None,
1204 local_buses: Vec::new(),
1205 state_event_tx,
1206 state_event_rx: Some(state_event_rx),
1207 mesh_frame_tx,
1208 mesh_frame_rx: Some(mesh_frame_rx),
1209 shared_router: None,
1210 seen_frame_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1211 SEEN_FRAME_CAP,
1212 SEEN_FRAME_TTL,
1213 ))),
1214 seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1215 SEEN_EVENT_CAP,
1216 SEEN_EVENT_TTL,
1217 ))),
1218 }
1219 }
1220
1221 pub fn new_with_state(keys: Keys, config: WebRTCConfig, state: Arc<WebRTCState>) -> Self {
1223 let mut manager = Self::new(keys, config);
1224 manager.state = state;
1225 manager
1226 }
1227
1228 pub fn new_with_classifier(
1230 keys: Keys,
1231 config: WebRTCConfig,
1232 classifier: PeerClassifier,
1233 ) -> Self {
1234 let mut manager = Self::new(keys, config);
1235 manager.peer_classifier = classifier;
1236 manager
1237 }
1238
1239 pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
1241 let mut manager = Self::new(keys, config);
1242 manager.store = Some(store);
1243 manager
1244 }
1245
1246 pub fn new_with_store_and_classifier(
1248 keys: Keys,
1249 config: WebRTCConfig,
1250 store: Arc<dyn ContentStore>,
1251 classifier: PeerClassifier,
1252 ) -> Self {
1253 Self::new_with_store_and_classifier_and_cashu(
1254 keys,
1255 config,
1256 store,
1257 classifier,
1258 CashuRoutingConfig::default(),
1259 None,
1260 None,
1261 )
1262 }
1263
1264 pub fn new_with_state_and_store_and_classifier(
1265 keys: Keys,
1266 config: WebRTCConfig,
1267 state: Arc<WebRTCState>,
1268 store: Arc<dyn ContentStore>,
1269 classifier: PeerClassifier,
1270 ) -> Self {
1271 let mut manager = Self::new_with_state(keys, config, state);
1272 manager.store = Some(store);
1273 manager.peer_classifier = classifier;
1274 manager
1275 }
1276
1277 pub fn new_with_store_and_classifier_and_cashu(
1278 keys: Keys,
1279 config: WebRTCConfig,
1280 store: Arc<dyn ContentStore>,
1281 classifier: PeerClassifier,
1282 cashu_routing: CashuRoutingConfig,
1283 payment_client: Option<Arc<dyn CashuPaymentClient>>,
1284 mint_metadata: Option<Arc<CashuMintMetadataStore>>,
1285 ) -> Self {
1286 let mut manager = Self::new(keys, config);
1287 manager.state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1288 manager.config.request_selection_strategy,
1289 manager.config.request_fairness_enabled,
1290 manager.config.request_dispatch,
1291 Duration::from_millis(manager.config.message_timeout_ms),
1292 cashu_routing,
1293 payment_client,
1294 mint_metadata,
1295 ));
1296 manager.store = Some(store);
1297 manager.peer_classifier = classifier;
1298 manager
1299 }
1300
1301 pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
1303 self.store = Some(store);
1304 }
1305
1306 pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
1308 self.peer_classifier = classifier;
1309 }
1310
1311 pub fn set_nostr_relay(&mut self, relay: SharedMeshRelayClient) {
1313 self.nostr_relay = Some(relay);
1314 }
1315
1316 pub fn my_peer_id(&self) -> &PeerId {
1318 &self.my_peer_id
1319 }
1320
1321 pub fn state(&self) -> Arc<WebRTCState> {
1323 self.state.clone()
1324 }
1325
1326 pub fn shutdown_signal(&self) -> Arc<tokio::sync::watch::Sender<bool>> {
1328 self.shutdown.clone()
1329 }
1330
1331 pub fn shutdown(&self) {
1333 let _ = self.shutdown.send(true);
1334 }
1335
1336 pub async fn connected_count(&self) -> usize {
1338 self.state
1339 .runtime
1340 .connected_count
1341 .load(std::sync::atomic::Ordering::Relaxed)
1342 }
1343
1344 pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
1346 self.state
1347 .runtime
1348 .peers
1349 .read()
1350 .await
1351 .values()
1352 .map(|p| PeerStatus {
1353 peer_id: p.peer_id.to_string(),
1354 pubkey: p.peer_id.pubkey.clone(),
1355 state: p.state.to_string(),
1356 direction: p.direction,
1357 connected_at: Some(p.last_seen),
1358 pool: p.pool,
1359 })
1360 .collect()
1361 }
1362
1363 pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
1367 let peers = self.state.runtime.peers.read().await;
1368 let mut follows_connected = 0;
1369 let mut follows_active = 0;
1370 let mut other_connected = 0;
1371 let mut other_active = 0;
1372
1373 for entry in peers.values() {
1374 let is_active = entry.state == ConnectionState::Connected
1377 || entry.state == ConnectionState::Connecting;
1378
1379 match entry.pool {
1380 PeerPool::Follows => {
1381 if is_active {
1382 follows_active += 1;
1383 }
1384 if entry.state == ConnectionState::Connected {
1385 follows_connected += 1;
1386 }
1387 }
1388 PeerPool::Other => {
1389 if is_active {
1390 other_active += 1;
1391 }
1392 if entry.state == ConnectionState::Connected {
1393 other_connected += 1;
1394 }
1395 }
1396 }
1397 }
1398
1399 (
1400 follows_connected,
1401 follows_active,
1402 other_connected,
1403 other_active,
1404 )
1405 }
1406
1407 fn local_bus_max_peers(&self, source: &str) -> Option<usize> {
1408 match source {
1409 "multicast" => Some(self.config.multicast.max_peers),
1410 WIFI_AWARE_SOURCE => Some(self.config.wifi_aware.max_peers),
1411 _ => None,
1412 }
1413 }
1414
1415 #[cfg_attr(not(test), allow(dead_code))]
1416 fn can_track_local_bus_peer(
1417 &self,
1418 source: &str,
1419 peer_key: &str,
1420 peers: &HashMap<String, PeerEntry>,
1421 ) -> bool {
1422 can_track_source_peer(source, peer_key, peers, self.local_bus_max_peers(source))
1423 }
1424}
1425
1426mod runtime;