1pub use crate::{ConnectionState, PeerSignalPath, PeerTransport};
13use anyhow::Result;
14use async_trait::async_trait;
15use cashu_service::CashuPaymentClient;
16#[cfg(test)]
17use nostr_sdk::nostr::Kind;
18use nostr_sdk::nostr::{Event, Keys};
19use std::collections::{BTreeSet, HashMap};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::{mpsc, Mutex, RwLock};
23use tracing::{debug, error, info, warn};
24
25use crate::bluetooth::{
26 BluetoothConfig, BluetoothMesh, BluetoothPeerRegistrar, BluetoothRuntimeContext,
27};
28use crate::cashu::{CashuMintMetadataStore, CashuQuoteState, CashuRoutingConfig, NegotiatedQuote};
29use crate::local_bus::SharedLocalNostrBus;
30use crate::mesh_session::{
31 resolve_root_from_peer_sessions as resolve_root_via_peer_sessions, MeshSession,
32};
33use crate::mesh_store_core::{
34 run_hedged_waves, sync_selector_peers, HedgedWaveAction, RequestDispatchConfig,
35};
36use crate::multicast::{MulticastConfig, MulticastNostrBus};
37use crate::nostr::NostrRelayTransport;
38use crate::peer::{ContentStore, Peer, PendingRequest};
39use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectorSummary};
40use crate::protocol::{DataQuoteRequest, DataRequest};
41use crate::relay_bridge::SharedMeshRelayClient;
42use crate::root_events::PeerRootEvent;
43use crate::runtime_control::{can_track_source_peer, PeerStateEvent};
44use crate::runtime_peer::{
45 MeshPeerEntry as SharedPeerEntry, PeerClassifier as SharedPeerClassifier,
46};
47use crate::runtime_state::MeshRuntimeState;
48use crate::session::MeshPeer;
49use crate::signaling::MeshRouter;
50use crate::transport::{
51 PeerLink as SharedPeerLink, PeerLinkFactory as SharedPeerLinkFactory,
52 SignalingTransport as SharedSignalingTransport, TransportError as SharedTransportError,
53};
54use crate::types::{
55 validate_mesh_frame, KnownPeerRecord, KnownPeerSnapshot, MeshNostrFrame, MeshNostrPayload,
56 SignalingMessage, TimedSeenSet,
57};
58use crate::wifi_aware::{
59 mobile_wifi_aware_bridge, WifiAwareConfig, WifiAwareNostrBus, WIFI_AWARE_SOURCE,
60};
61use crate::{
62 ClassifyRequest as SharedClassifyRequest, IceCandidate as SharedIceCandidate, PeerDirection,
63 PeerId, PeerPool, MESH_SIGNALING_EVENT_KIND,
64};
65
66pub type PeerClassifier = SharedPeerClassifier;
68pub type PeerEntry = SharedPeerEntry<MeshPeer>;
69
70#[derive(Clone)]
71pub struct WebRTCConfig {
72 pub relays: Vec<String>,
73 pub signaling_enabled: bool,
74 pub hash_get_enabled: bool,
75 pub signal_urls: Vec<String>,
76 pub max_outbound: usize,
77 pub max_inbound: usize,
78 pub hello_interval_ms: u64,
79 pub message_timeout_ms: u64,
80 pub stun_servers: Vec<String>,
81 pub debug: bool,
82 pub multicast: MulticastConfig,
83 pub wifi_aware: WifiAwareConfig,
84 pub bluetooth: BluetoothConfig,
85 pub pools: crate::PoolSettings,
86 pub request_selection_strategy: crate::SelectionStrategy,
87 pub request_fairness_enabled: bool,
88 pub request_dispatch: RequestDispatchConfig,
89}
90
91impl Default for WebRTCConfig {
92 fn default() -> Self {
93 Self {
94 relays: vec![
95 "wss://relay.damus.io".to_string(),
96 "wss://relay.primal.net".to_string(),
97 "wss://temp.iris.to".to_string(),
98 "wss://relay.snort.social".to_string(),
99 ],
100 signaling_enabled: true,
101 hash_get_enabled: true,
102 signal_urls: Vec::new(),
103 max_outbound: 6,
104 max_inbound: 6,
105 hello_interval_ms: 3000,
106 message_timeout_ms: 15000,
107 stun_servers: vec![
108 "stun:stun.iris.to:3478".to_string(),
109 "stun:stun.l.google.com:19302".to_string(),
110 "stun:stun.cloudflare.com:3478".to_string(),
111 ],
112 debug: false,
113 multicast: MulticastConfig::default(),
114 wifi_aware: WifiAwareConfig::default(),
115 bluetooth: BluetoothConfig::default(),
116 pools: crate::PoolSettings::default(),
117 request_selection_strategy: crate::SelectionStrategy::Weighted,
118 request_fairness_enabled: true,
119 request_dispatch: RequestDispatchConfig {
120 initial_fanout: 2,
121 hedge_fanout: 1,
122 max_fanout: 8,
123 hedge_interval_ms: 120,
124 },
125 }
126 }
127}
128
129#[derive(Debug, Clone)]
130pub struct PeerStatus {
131 pub peer_id: String,
132 pub pubkey: String,
133 pub state: String,
134 pub direction: PeerDirection,
135 pub connected_at: Option<std::time::Instant>,
136 pub pool: PeerPool,
137}
138
139fn bluetooth_nostr_only_mode() -> bool {
140 matches!(
141 std::env::var("HTREE_BLUETOOTH_NOSTR_ONLY").ok().as_deref(),
142 Some("1" | "true" | "TRUE" | "yes" | "YES")
143 )
144}
145
146pub struct WebRTCState {
148 pub runtime: MeshRuntimeState<MeshPeer>,
149 peer_selector: Arc<RwLock<PeerSelector>>,
151 direct_signaling_tx: RwLock<Option<mpsc::Sender<(String, Event)>>>,
152 request_dispatch: RequestDispatchConfig,
154 request_timeout: Duration,
156 cashu_quotes: Arc<CashuQuoteState>,
158}
159const SEEN_FRAME_CAP: usize = 4096;
160const SEEN_FRAME_TTL: Duration = Duration::from_secs(120);
161const SEEN_EVENT_CAP: usize = 8192;
162const SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
163
164type PendingRequestsMap = Arc<Mutex<HashMap<String, PendingRequest>>>;
165type ConnectedPeer = (
166 String,
167 PendingRequestsMap,
168 Arc<webrtc::data_channel::RTCDataChannel>,
169);
170type ConnectedSession = (String, MeshPeer, PeerTransport);
171type SharedProductionRouter = MeshRouter<RouterSignalingBridge, SharedRouterPeerFactory>;
172
173#[derive(Clone)]
174struct RouterSignalingBridge {
175 peer_id: String,
176 signaling_tx: mpsc::Sender<SignalingMessage>,
177}
178
179impl RouterSignalingBridge {
180 fn new(peer_id: String, signaling_tx: mpsc::Sender<SignalingMessage>) -> Self {
181 Self {
182 peer_id,
183 signaling_tx,
184 }
185 }
186}
187
188#[async_trait]
189impl SharedSignalingTransport for RouterSignalingBridge {
190 async fn connect(&self, _relays: &[String]) -> Result<(), SharedTransportError> {
191 Ok(())
192 }
193
194 async fn disconnect(&self) {}
195
196 async fn publish(&self, msg: SignalingMessage) -> Result<(), SharedTransportError> {
197 self.signaling_tx
198 .send(msg)
199 .await
200 .map_err(|e| SharedTransportError::SendFailed(e.to_string()))
201 }
202
203 async fn recv(&self) -> Option<SignalingMessage> {
204 None
205 }
206
207 fn try_recv(&self) -> Option<SignalingMessage> {
208 None
209 }
210
211 fn peer_id(&self) -> &str {
212 &self.peer_id
213 }
214}
215
216struct SharedRouterPeerFactory {
217 my_peer_id: PeerId,
218 signaling_tx: mpsc::Sender<SignalingMessage>,
219 stun_servers: Vec<String>,
220 store: Option<Arc<dyn ContentStore>>,
221 state: Arc<WebRTCState>,
222 state_event_tx: mpsc::Sender<PeerStateEvent>,
223 nostr_relay: Option<SharedMeshRelayClient>,
224 mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
225 signal_urls: Vec<String>,
226 peer_classifier: PeerClassifier,
227 peers: RwLock<HashMap<String, Arc<Peer>>>,
228}
229
230impl SharedRouterPeerFactory {
231 fn new(
232 my_peer_id: PeerId,
233 signaling_tx: mpsc::Sender<SignalingMessage>,
234 stun_servers: Vec<String>,
235 store: Option<Arc<dyn ContentStore>>,
236 state: Arc<WebRTCState>,
237 state_event_tx: mpsc::Sender<PeerStateEvent>,
238 nostr_relay: Option<SharedMeshRelayClient>,
239 mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
240 signal_urls: Vec<String>,
241 peer_classifier: PeerClassifier,
242 ) -> Self {
243 Self {
244 my_peer_id,
245 signaling_tx,
246 stun_servers,
247 store,
248 state,
249 state_event_tx,
250 nostr_relay,
251 mesh_frame_tx,
252 signal_urls,
253 peer_classifier,
254 peers: RwLock::new(HashMap::new()),
255 }
256 }
257
258 async fn register_peer(&self, peer_id: PeerId, direction: PeerDirection, peer: Arc<Peer>) {
259 let peer_key = peer_id.to_string();
260 let pool = (self.peer_classifier)(&peer_id.pubkey);
261 self.peers
262 .write()
263 .await
264 .insert(peer_key.clone(), peer.clone());
265
266 let mut peers = self.state.runtime.peers.write().await;
267 peers.insert(
268 peer_key,
269 PeerEntry {
270 peer_id,
271 direction,
272 state: ConnectionState::Connecting,
273 last_seen: Instant::now(),
274 peer: Some(MeshPeer::WebRtc(peer)),
275 pool,
276 transport: PeerTransport::WebRtc,
277 signal_paths: BTreeSet::from([PeerSignalPath::Relay]),
278 bytes_sent: 0,
279 bytes_received: 0,
280 },
281 );
282 }
283
284 async fn create_peer(
285 &self,
286 peer_id: PeerId,
287 direction: PeerDirection,
288 ) -> Result<Peer, SharedTransportError> {
289 let mut peer = Peer::new_with_store_and_events(
290 peer_id,
291 direction,
292 self.my_peer_id.clone(),
293 self.signaling_tx.clone(),
294 self.stun_servers.clone(),
295 self.store.clone(),
296 Some(self.state_event_tx.clone()),
297 self.nostr_relay.clone(),
298 Some(self.mesh_frame_tx.clone()),
299 Some(self.state.cashu_quotes.clone()),
300 )
301 .await
302 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
303 peer.set_signal_urls(self.signal_urls.clone());
304 Ok(peer)
305 }
306}
307
308#[async_trait]
309impl SharedPeerLinkFactory for SharedRouterPeerFactory {
310 async fn create_offer(
311 &self,
312 target_peer_id: &str,
313 ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
314 let target_peer = PeerId::from_string(target_peer_id).ok_or_else(|| {
315 SharedTransportError::ConnectionFailed(format!("invalid peer id {target_peer_id}"))
316 })?;
317 let peer = Arc::new(
318 self.create_peer(target_peer.clone(), PeerDirection::Outbound)
319 .await?,
320 );
321 peer.setup_handlers()
322 .await
323 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
324 let offer = peer
325 .connect()
326 .await
327 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
328 let sdp = offer
329 .get("sdp")
330 .and_then(|value| value.as_str())
331 .ok_or_else(|| {
332 SharedTransportError::ConnectionFailed("missing SDP in CLI peer offer".to_string())
333 })?
334 .to_string();
335 self.register_peer(target_peer, PeerDirection::Outbound, peer.clone())
336 .await;
337 Ok((peer as Arc<dyn SharedPeerLink>, sdp))
338 }
339
340 async fn accept_offer(
341 &self,
342 from_peer_id: &str,
343 offer_sdp: &str,
344 ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
345 let from_peer = PeerId::from_string(from_peer_id).ok_or_else(|| {
346 SharedTransportError::ConnectionFailed(format!("invalid peer id {from_peer_id}"))
347 })?;
348 let peer = Arc::new(
349 self.create_peer(from_peer.clone(), PeerDirection::Inbound)
350 .await?,
351 );
352 peer.setup_handlers()
353 .await
354 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
355 let answer = peer
356 .handle_offer(serde_json::json!({ "type": "offer", "sdp": offer_sdp }))
357 .await
358 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
359 let sdp = answer
360 .get("sdp")
361 .and_then(|value| value.as_str())
362 .ok_or_else(|| {
363 SharedTransportError::ConnectionFailed("missing SDP in CLI peer answer".to_string())
364 })?
365 .to_string();
366 self.register_peer(from_peer, PeerDirection::Inbound, peer.clone())
367 .await;
368 Ok((peer as Arc<dyn SharedPeerLink>, sdp))
369 }
370
371 async fn handle_answer(
372 &self,
373 target_peer_id: &str,
374 answer_sdp: &str,
375 ) -> Result<Arc<dyn SharedPeerLink>, SharedTransportError> {
376 let peer = self
377 .peers
378 .read()
379 .await
380 .get(target_peer_id)
381 .cloned()
382 .ok_or_else(|| {
383 SharedTransportError::ConnectionFailed(format!(
384 "missing outbound peer for {target_peer_id}"
385 ))
386 })?;
387 peer.handle_answer(serde_json::json!({ "type": "answer", "sdp": answer_sdp }))
388 .await
389 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
390 Ok(peer as Arc<dyn SharedPeerLink>)
391 }
392
393 async fn handle_candidate(
394 &self,
395 peer_id: &str,
396 candidate: SharedIceCandidate,
397 ) -> Result<(), SharedTransportError> {
398 let peer = self.peers.read().await.get(peer_id).cloned();
399 if let Some(peer) = peer {
400 peer.handle_candidate(serde_json::json!({
401 "candidate": candidate.candidate,
402 "sdpMLineIndex": candidate.sdp_m_line_index,
403 "sdpMid": candidate.sdp_mid,
404 }))
405 .await
406 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
407 }
408 Ok(())
409 }
410
411 async fn remove_peer(&self, peer_id: &str) -> Result<(), SharedTransportError> {
412 self.peers.write().await.remove(peer_id);
413 Ok(())
414 }
415}
416
417impl WebRTCState {
418 pub fn new() -> Self {
419 let cfg = WebRTCConfig::default();
420 Self::new_with_routing_and_cashu(
421 cfg.request_selection_strategy,
422 cfg.request_fairness_enabled,
423 cfg.request_dispatch,
424 Duration::from_millis(cfg.message_timeout_ms),
425 CashuRoutingConfig::default(),
426 None,
427 None,
428 )
429 }
430
431 pub fn new_with_routing(
432 selection_strategy: crate::SelectionStrategy,
433 fairness_enabled: bool,
434 request_dispatch: RequestDispatchConfig,
435 ) -> Self {
436 let cfg = WebRTCConfig::default();
437 Self::new_with_routing_and_cashu(
438 selection_strategy,
439 fairness_enabled,
440 request_dispatch,
441 Duration::from_millis(cfg.message_timeout_ms),
442 CashuRoutingConfig::default(),
443 None,
444 None,
445 )
446 }
447
448 pub fn new_with_routing_and_cashu(
449 selection_strategy: crate::SelectionStrategy,
450 fairness_enabled: bool,
451 request_dispatch: RequestDispatchConfig,
452 request_timeout: Duration,
453 cashu_routing: CashuRoutingConfig,
454 payment_client: Option<Arc<dyn CashuPaymentClient>>,
455 mint_metadata: Option<Arc<CashuMintMetadataStore>>,
456 ) -> Self {
457 let mut selector = PeerSelector::with_strategy(selection_strategy);
458 selector.set_fairness(fairness_enabled);
459 let peer_selector = Arc::new(RwLock::new(selector));
460 let cashu_quotes = Arc::new(if let Some(mint_metadata) = mint_metadata {
461 CashuQuoteState::new_with_mint_metadata(
462 cashu_routing,
463 peer_selector.clone(),
464 payment_client,
465 mint_metadata,
466 )
467 } else {
468 CashuQuoteState::new(cashu_routing, peer_selector.clone(), payment_client)
469 });
470 Self {
471 runtime: MeshRuntimeState::new(),
472 peer_selector,
473 direct_signaling_tx: RwLock::new(None),
474 request_dispatch,
475 request_timeout,
476 cashu_quotes,
477 }
478 }
479
480 pub async fn set_local_buses(&self, buses: Vec<SharedLocalNostrBus>) {
481 self.runtime.set_local_buses(buses).await;
482 }
483
484 pub async fn add_local_bus(&self, bus: SharedLocalNostrBus) {
485 self.runtime.add_local_bus(bus).await;
486 }
487
488 pub async fn set_multicast_bus(&self, bus: Option<Arc<MulticastNostrBus>>) {
489 let buses = bus
490 .into_iter()
491 .map(|bus| bus as SharedLocalNostrBus)
492 .collect();
493 self.set_local_buses(buses).await;
494 }
495
496 pub async fn reset_runtime_state(&self) {
499 self.runtime.reset().await;
500 }
501
502 pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
503 self.peer_selector
504 .read()
505 .await
506 .export_peer_metadata_snapshot()
507 }
508
509 pub async fn import_peer_metadata_snapshot(&self, snapshot: &PeerMetadataSnapshot) {
510 self.peer_selector
511 .write()
512 .await
513 .import_peer_metadata_snapshot(snapshot);
514 }
515
516 pub async fn selector_summary(&self) -> SelectorSummary {
517 self.peer_selector.read().await.summary()
518 }
519
520 pub async fn known_peer_snapshot(&self) -> KnownPeerSnapshot {
521 self.runtime.known_peer_snapshot().await
522 }
523
524 pub async fn import_known_peer_snapshot(&self, snapshot: &KnownPeerSnapshot) {
525 self.runtime.import_known_peer_snapshot(snapshot).await;
526 }
527
528 pub async fn ordered_known_peers(&self) -> Vec<KnownPeerRecord> {
529 let snapshot = self.runtime.known_peer_snapshot().await;
530 let mut by_peer = snapshot
531 .peers
532 .into_iter()
533 .map(|peer| (peer.peer_id.clone(), peer))
534 .collect::<HashMap<_, _>>();
535
536 let ordered_ids = {
537 let mut selector = self.peer_selector.write().await;
538 for peer_id in by_peer.keys() {
539 selector.add_peer(peer_id);
540 }
541 selector.select_peers()
542 };
543
544 let mut ordered = Vec::new();
545 for peer_id in ordered_ids {
546 if let Some(peer) = by_peer.remove(&peer_id) {
547 ordered.push(peer);
548 }
549 }
550 let mut remaining = by_peer.into_values().collect::<Vec<_>>();
551 remaining.sort_by(|a, b| a.peer_id.cmp(&b.peer_id));
552 ordered.extend(remaining);
553 ordered
554 }
555
556 pub async fn set_direct_signaling_sender(&self, tx: Option<mpsc::Sender<(String, Event)>>) {
557 *self.direct_signaling_tx.write().await = tx;
558 }
559
560 pub async fn submit_direct_signaling_event(&self, source: String, event: Event) -> bool {
561 let tx = self.direct_signaling_tx.read().await.clone();
562 let Some(tx) = tx else {
563 return false;
564 };
565 tx.send((source, event)).await.is_ok()
566 }
567
568 pub fn get_bandwidth(&self) -> (u64, u64) {
570 self.runtime.get_bandwidth()
571 }
572
573 pub fn get_mesh_stats(&self) -> (u64, u64, u64) {
574 self.runtime.get_mesh_stats()
575 }
576
577 pub fn record_mesh_received(&self) {
578 self.runtime.record_mesh_received();
579 }
580
581 pub fn record_mesh_forwarded(&self, count: u64) {
582 self.runtime.record_mesh_forwarded(count);
583 }
584
585 pub fn record_mesh_duplicate_drop(&self) {
586 self.runtime.record_mesh_duplicate_drop();
587 }
588
589 pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
591 self.runtime.record_sent(peer_id, bytes).await;
592 }
593
594 pub async fn record_received(&self, peer_id: &str, bytes: u64) {
596 self.runtime.record_received(peer_id, bytes).await;
597 }
598
599 pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
603 self.request_from_peers_with_source(hash_hex)
604 .await
605 .map(|(data, _peer_id)| data)
606 }
607
608 pub async fn request_from_peers_with_source(
610 &self,
611 hash_hex: &str,
612 ) -> Option<(Vec<u8>, String)> {
613 use crate::BLOB_REQUEST_POLICY;
614
615 let peer_hash_get = self.runtime.peer_hash_get_snapshot().await;
616 let peers = self.runtime.peers.read().await;
617
618 let peer_refs: Vec<_> = peers
619 .values()
620 .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
621 .filter_map(|p| {
622 if !peer_hash_get
623 .get(&p.peer_id.to_string())
624 .copied()
625 .unwrap_or(true)
626 {
627 return None;
628 }
629 p.peer
630 .clone()
631 .map(|peer| (p.peer_id.to_string(), peer, p.transport))
632 })
633 .collect();
634
635 drop(peers); let mut connected_peers: Vec<ConnectedPeer> = Vec::new();
638 let mut connected_sessions: Vec<ConnectedSession> = Vec::new();
639 for (peer_id, peer, transport) in peer_refs {
640 if !peer.is_ready() {
641 continue;
642 }
643 if bluetooth_nostr_only_mode() && transport == PeerTransport::Bluetooth {
644 continue;
645 }
646 if let Some(webrtc_peer) = peer.as_webrtc() {
647 let dc_guard = webrtc_peer.data_channel.lock().await;
648 if let Some(dc) = dc_guard.as_ref() {
649 connected_peers.push((
650 peer_id.clone(),
651 webrtc_peer.pending_requests.clone(),
652 dc.clone(),
653 ));
654 }
655 }
656 connected_sessions.push((peer_id, peer, transport));
657 }
658
659 if connected_sessions.is_empty() {
660 debug!(
661 "No connected peers to query for {}",
662 &hash_hex[..8.min(hash_hex.len())]
663 );
664 return None;
665 }
666
667 let hash_bytes = match hex::decode(hash_hex) {
669 Ok(b) => b,
670 Err(_) => return None,
671 };
672
673 let expected_hash: [u8; 32] = match hash_bytes.as_slice().try_into() {
674 Ok(h) => h,
675 Err(_) => {
676 debug!(
677 "Invalid hash length {}, expected 32 bytes",
678 hash_bytes.len()
679 );
680 return None;
681 }
682 };
683
684 let connected_peer_ids: Vec<String> = connected_sessions
685 .iter()
686 .map(|(peer_id, _, _)| peer_id.clone())
687 .collect();
688 sync_selector_peers(self.peer_selector.as_ref(), &connected_peer_ids).await;
689
690 let ordered_peer_ids = self.peer_selector.write().await.select_peers();
691 let mut quote_by_peer: HashMap<
692 String,
693 (
694 PendingRequestsMap,
695 Arc<webrtc::data_channel::RTCDataChannel>,
696 ),
697 > = connected_peers
698 .iter()
699 .cloned()
700 .map(|(peer_id, pending, dc)| (peer_id, (pending, dc)))
701 .collect();
702 let mut ordered_quote_peers: Vec<ConnectedPeer> = Vec::new();
703 for peer_id in &ordered_peer_ids {
704 if let Some((pending, dc)) = quote_by_peer.remove(peer_id) {
705 ordered_quote_peers.push((peer_id.clone(), pending, dc));
706 }
707 }
708 for (peer_id, (pending, dc)) in quote_by_peer {
709 ordered_quote_peers.push((peer_id, pending, dc));
710 }
711
712 let mut by_peer: HashMap<String, (MeshPeer, PeerTransport)> = connected_sessions
713 .into_iter()
714 .map(|(peer_id, peer, transport)| (peer_id, (peer, transport)))
715 .collect();
716
717 let mut ordered_peers: Vec<ConnectedSession> = Vec::new();
718 for peer_id in ordered_peer_ids {
719 if let Some((peer, transport)) = by_peer.remove(&peer_id) {
720 ordered_peers.push((peer_id, peer, transport));
721 }
722 }
723 for (peer_id, (peer, transport)) in by_peer {
724 ordered_peers.push((peer_id, peer, transport));
725 }
726
727 debug!(
728 "Querying {} peers for {} with shared hedged scheduler",
729 ordered_peers.len(),
730 &hash_hex[..8.min(hash_hex.len())],
731 );
732
733 if let Some((requested_mint, payment_sat, quote_ttl_ms)) =
734 self.cashu_quotes.requester_quote_terms().await
735 {
736 if let Some(quote) = self
737 .request_quote_from_peers(
738 &hash_bytes,
739 requested_mint,
740 payment_sat,
741 quote_ttl_ms,
742 &ordered_quote_peers,
743 )
744 .await
745 {
746 if let Some(data) = self
747 .request_from_single_peer(
748 hash_hex,
749 &hash_bytes,
750 expected_hash,
751 "e.peer_id,
752 Some("e),
753 &ordered_quote_peers,
754 )
755 .await
756 {
757 debug!(
758 "Got quoted response from peer {} for {}",
759 quote.peer_id,
760 &hash_hex[..8.min(hash_hex.len())]
761 );
762 return Some((data, quote.peer_id));
763 }
764 }
765 }
766
767 let request = DataRequest {
768 h: hash_bytes.clone(),
769 htl: BLOB_REQUEST_POLICY.max_htl,
770 q: None,
771 };
772 let wire = crate::encode_request(&request);
773 let wire_len = wire.len() as u64;
774 let current_result_rx = Arc::new(Mutex::new(None));
775 if let Some((data, peer_id)) = run_hedged_waves(
776 ordered_peers.len(),
777 self.request_dispatch,
778 self.request_timeout,
779 |range| {
780 let wave_peers = ordered_peers[range].to_vec();
781 let (result_tx, result_rx) =
782 mpsc::channel::<(String, Instant, Result<Option<Vec<u8>>>)>(wave_peers.len());
783 let current_result_rx = current_result_rx.clone();
784 let hash_hex = hash_hex.to_string();
785 async move {
786 *current_result_rx.lock().await = Some(result_rx);
787 let sent = wave_peers.len();
788 for (peer_id, peer, transport) in wave_peers {
789 if transport != PeerTransport::Bluetooth {
790 self.record_sent(&peer_id, wire_len).await;
791 }
792 self.peer_selector
793 .write()
794 .await
795 .record_request(&peer_id, wire_len);
796
797 let result_tx = result_tx.clone();
798 let peer_id_for_task = peer_id.clone();
799 let peer = peer.clone();
800 let hash_hex = hash_hex.clone();
801 let per_request_timeout = self.request_timeout;
802 tokio::spawn(async move {
803 let started = Instant::now();
804 let result = peer.request(&hash_hex, per_request_timeout).await;
805 let _ = result_tx.send((peer_id_for_task, started, result)).await;
806 });
807 }
808 drop(result_tx);
809 sent
810 }
811 },
812 |wait| {
813 let current_result_rx = current_result_rx.clone();
814 async move {
815 let mut current_result_rx = current_result_rx.lock().await;
816 let Some(result_rx) = current_result_rx.as_mut() else {
817 return HedgedWaveAction::Abort;
818 };
819 let deadline = Instant::now() + wait;
820 loop {
821 let now = Instant::now();
822 if now >= deadline {
823 return HedgedWaveAction::Continue;
824 }
825 let remaining = deadline.saturating_duration_since(now);
826 match tokio::time::timeout(remaining, result_rx.recv()).await {
827 Ok(Some((peer_id, started, Ok(Some(data))))) => {
828 let rtt_ms = started.elapsed().as_millis() as u64;
829 if hashtree_core::sha256(&data) == expected_hash {
830 let should_record = {
831 let peers = self.runtime.peers.read().await;
832 peers
833 .get(&peer_id)
834 .map(|entry| {
835 entry.transport != PeerTransport::Bluetooth
836 })
837 .unwrap_or(true)
838 };
839 if should_record {
840 self.record_received(&peer_id, data.len() as u64).await;
841 }
842 self.peer_selector.write().await.record_success(
843 &peer_id,
844 rtt_ms,
845 data.len() as u64,
846 );
847 return HedgedWaveAction::Success((data, peer_id));
848 }
849 self.peer_selector.write().await.record_failure(&peer_id);
850 }
851 Ok(Some((peer_id, _, Ok(None)))) | Ok(Some((peer_id, _, Err(_)))) => {
852 self.peer_selector.write().await.record_timeout(&peer_id);
853 }
854 Ok(None) | Err(_) => return HedgedWaveAction::Continue,
855 }
856 }
857 }
858 },
859 )
860 .await
861 {
862 debug!(
863 "Got response from peer {} for {}",
864 peer_id,
865 &hash_hex[..8.min(hash_hex.len())]
866 );
867 return Some((data, peer_id));
868 }
869
870 debug!(
871 "No peer had data for {}",
872 &hash_hex[..8.min(hash_hex.len())]
873 );
874 None
875 }
876
877 async fn request_quote_from_peers(
878 &self,
879 hash_bytes: &[u8],
880 requested_mint: String,
881 payment_sat: u64,
882 quote_ttl_ms: u32,
883 ordered_peers: &[ConnectedPeer],
884 ) -> Option<NegotiatedQuote> {
885 if ordered_peers.is_empty() || quote_ttl_ms == 0 {
886 return None;
887 }
888
889 let hash_hex = hex::encode(hash_bytes);
890 let rx = self
891 .cashu_quotes
892 .register_pending_quote(hash_hex.clone(), Some(requested_mint.clone()), payment_sat)
893 .await;
894 let quote_request = DataQuoteRequest {
895 h: hash_bytes.to_vec(),
896 p: payment_sat,
897 t: quote_ttl_ms,
898 m: Some(requested_mint),
899 };
900 let wire = crate::encode_quote_request("e_request);
901 let rx = Arc::new(Mutex::new(rx));
902 let result = run_hedged_waves(
903 ordered_peers.len(),
904 self.request_dispatch,
905 self.request_timeout,
906 |range| {
907 let wave_peers = ordered_peers[range].to_vec();
908 let wire = wire.clone();
909 async move {
910 let mut sent = 0usize;
911 for (_, _, dc) in wave_peers {
912 if dc.send(&bytes::Bytes::copy_from_slice(&wire)).await.is_ok() {
913 sent += 1;
914 }
915 }
916 sent
917 }
918 },
919 |wait| {
920 let rx = rx.clone();
921 async move {
922 let mut rx = rx.lock().await;
923 match tokio::time::timeout(wait, &mut *rx).await {
924 Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
925 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
926 Err(_) => HedgedWaveAction::Continue,
927 }
928 }
929 },
930 )
931 .await;
932
933 self.cashu_quotes.clear_pending_quote(&hash_hex).await;
934 result
935 }
936
937 async fn request_from_single_peer(
938 &self,
939 hash_hex: &str,
940 hash_bytes: &[u8],
941 expected_hash: [u8; 32],
942 target_peer_id: &str,
943 quote: Option<&NegotiatedQuote>,
944 ordered_peers: &[ConnectedPeer],
945 ) -> Option<Vec<u8>> {
946 use crate::BLOB_REQUEST_POLICY;
947
948 let (pending_requests, dc) = ordered_peers
949 .iter()
950 .find(|(peer_id, _, _)| peer_id == target_peer_id)
951 .map(|(_, pending_requests, dc)| (pending_requests.clone(), dc.clone()))?;
952
953 let request = DataRequest {
954 h: hash_bytes.to_vec(),
955 htl: BLOB_REQUEST_POLICY.max_htl,
956 q: quote.map(|quote| quote.quote_id),
957 };
958 let wire = crate::encode_request(&request);
959 let wire_len = wire.len() as u64;
960 let sent_at = Instant::now();
961 let (tx, mut rx) = tokio::sync::oneshot::channel();
962
963 {
964 let mut pending = pending_requests.lock().await;
965 pending.insert(
966 hash_hex.to_string(),
967 if let Some(quote) = quote {
968 PendingRequest::quoted(
969 hash_bytes.to_vec(),
970 tx,
971 quote.quote_id,
972 quote.mint_url.clone().unwrap_or_default(),
973 quote.payment_sat,
974 )
975 } else {
976 PendingRequest::standard(hash_bytes.to_vec(), tx)
977 },
978 );
979 }
980
981 if dc
982 .send(&bytes::Bytes::copy_from_slice(&wire))
983 .await
984 .is_err()
985 {
986 let mut pending = pending_requests.lock().await;
987 pending.remove(hash_hex);
988 self.peer_selector
989 .write()
990 .await
991 .record_failure(target_peer_id);
992 return None;
993 }
994
995 self.record_sent(target_peer_id, wire_len).await;
996 self.peer_selector
997 .write()
998 .await
999 .record_request(target_peer_id, wire_len);
1000
1001 let wait_timeout = if let Some(quote) = quote {
1002 let multiplier = quote.payment_sat.clamp(1, 32) as u128;
1003 let extra_ms = self
1004 .cashu_quotes
1005 .settlement_timeout()
1006 .as_millis()
1007 .saturating_mul(multiplier);
1008 self.request_timeout + Duration::from_millis(extra_ms.min(u64::MAX as u128) as u64)
1009 } else {
1010 self.request_timeout
1011 };
1012
1013 match tokio::time::timeout(wait_timeout, &mut rx).await {
1014 Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == expected_hash => {
1015 let rtt_ms = sent_at.elapsed().as_millis() as u64;
1016 self.record_received(target_peer_id, data.len() as u64)
1017 .await;
1018 self.peer_selector.write().await.record_success(
1019 target_peer_id,
1020 rtt_ms,
1021 data.len() as u64,
1022 );
1023 Some(data)
1024 }
1025 Ok(Ok(Some(_))) => {
1026 self.peer_selector
1027 .write()
1028 .await
1029 .record_failure(target_peer_id);
1030 let pending = pending_requests.lock().await.remove(hash_hex);
1031 if let Some(pending) = pending {
1032 if let Some(quoted) = pending.quoted {
1033 if let Some(in_flight) = quoted.in_flight_payment {
1034 let _ = self
1035 .cashu_quotes
1036 .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
1037 .await;
1038 }
1039 }
1040 }
1041 None
1042 }
1043 Ok(Ok(None)) | Ok(Err(_)) | Err(_) => {
1044 let pending = pending_requests.lock().await.remove(hash_hex);
1045 if let Some(pending) = pending {
1046 if let Some(quoted) = pending.quoted {
1047 if let Some(in_flight) = quoted.in_flight_payment {
1048 let _ = self
1049 .cashu_quotes
1050 .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
1051 .await;
1052 }
1053 }
1054 }
1055 self.peer_selector
1056 .write()
1057 .await
1058 .record_timeout(target_peer_id);
1059 None
1060 }
1061 }
1062 }
1063
1064 pub async fn resolve_root_from_peers(
1066 &self,
1067 owner_pubkey: &str,
1068 tree_name: &str,
1069 per_peer_timeout: Duration,
1070 ) -> Option<PeerRootEvent> {
1071 let peer_refs: Vec<(String, Arc<dyn MeshSession>)> = {
1072 let peers = self.runtime.peers.read().await;
1073 peers
1074 .values()
1075 .filter(|entry| entry.state == ConnectionState::Connected)
1076 .filter(|entry| {
1077 !bluetooth_nostr_only_mode() || entry.transport != PeerTransport::Bluetooth
1078 })
1079 .filter_map(|entry| {
1080 let peer = entry.peer.as_ref()?;
1081 Some((
1082 entry.peer_id.short(),
1083 Arc::new(peer.clone()) as Arc<dyn MeshSession>,
1084 ))
1085 })
1086 .collect()
1087 };
1088
1089 let resolved =
1090 resolve_root_via_peer_sessions(peer_refs, owner_pubkey, tree_name, per_peer_timeout)
1091 .await;
1092 if let Some(root) = &resolved {
1093 debug!(
1094 "Resolved {}/{} via peer {} event {}",
1095 owner_pubkey, tree_name, root.peer_id, root.event_id
1096 );
1097 }
1098 resolved
1099 }
1100
1101 pub async fn resolve_root_from_local_buses_with_source(
1102 &self,
1103 owner_pubkey: &str,
1104 tree_name: &str,
1105 timeout: Duration,
1106 ) -> Option<(&'static str, PeerRootEvent)> {
1107 self.runtime
1108 .resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
1109 .await
1110 }
1111
1112 pub async fn resolve_root_from_local_buses(
1113 &self,
1114 owner_pubkey: &str,
1115 tree_name: &str,
1116 timeout: Duration,
1117 ) -> Option<PeerRootEvent> {
1118 self.resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
1119 .await
1120 .map(|(_, root)| root)
1121 }
1122
1123 pub async fn resolve_root_from_multicast(
1124 &self,
1125 owner_pubkey: &str,
1126 tree_name: &str,
1127 timeout: Duration,
1128 ) -> Option<PeerRootEvent> {
1129 self.resolve_root_from_local_buses(owner_pubkey, tree_name, timeout)
1130 .await
1131 }
1132}
1133
1134impl Default for WebRTCState {
1135 fn default() -> Self {
1136 Self::new()
1137 }
1138}
1139
1140pub struct WebRTCManager {
1142 config: WebRTCConfig,
1143 my_peer_id: PeerId,
1144 keys: Keys,
1145 state: Arc<WebRTCState>,
1146 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
1147 shutdown_rx: tokio::sync::watch::Receiver<bool>,
1148 signaling_tx: mpsc::Sender<SignalingMessage>,
1150 signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
1151 store: Option<Arc<dyn ContentStore>>,
1153 peer_classifier: PeerClassifier,
1155 nostr_relay: Option<SharedMeshRelayClient>,
1157 local_buses: Vec<SharedLocalNostrBus>,
1158 state_event_tx: mpsc::Sender<PeerStateEvent>,
1160 state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
1161 mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
1163 mesh_frame_rx: Option<mpsc::Receiver<(PeerId, MeshNostrFrame)>>,
1164 shared_router: Option<Arc<SharedProductionRouter>>,
1165 seen_frame_ids: Arc<Mutex<TimedSeenSet>>,
1166 seen_event_ids: Arc<Mutex<TimedSeenSet>>,
1167}
1168
1169impl WebRTCManager {
1170 pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
1172 let pubkey = keys.public_key().to_hex();
1173 let my_peer_id = PeerId::new(pubkey);
1174 let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
1175 let (signaling_tx, signaling_rx) = mpsc::channel(100);
1176 let (state_event_tx, state_event_rx) = mpsc::channel(100);
1177 let (mesh_frame_tx, mesh_frame_rx) = mpsc::channel(256);
1178 let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1179 config.request_selection_strategy,
1180 config.request_fairness_enabled,
1181 config.request_dispatch,
1182 Duration::from_millis(config.message_timeout_ms),
1183 CashuRoutingConfig::default(),
1184 None,
1185 None,
1186 ));
1187
1188 let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
1190
1191 Self {
1192 config,
1193 my_peer_id,
1194 keys,
1195 state,
1196 shutdown: Arc::new(shutdown),
1197 shutdown_rx,
1198 signaling_tx,
1199 signaling_rx: Some(signaling_rx),
1200 store: None,
1201 peer_classifier,
1202 nostr_relay: None,
1203 local_buses: Vec::new(),
1204 state_event_tx,
1205 state_event_rx: Some(state_event_rx),
1206 mesh_frame_tx,
1207 mesh_frame_rx: Some(mesh_frame_rx),
1208 shared_router: None,
1209 seen_frame_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1210 SEEN_FRAME_CAP,
1211 SEEN_FRAME_TTL,
1212 ))),
1213 seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1214 SEEN_EVENT_CAP,
1215 SEEN_EVENT_TTL,
1216 ))),
1217 }
1218 }
1219
1220 pub fn new_with_state(keys: Keys, config: WebRTCConfig, state: Arc<WebRTCState>) -> Self {
1222 let mut manager = Self::new(keys, config);
1223 manager.state = state;
1224 manager
1225 }
1226
1227 pub fn new_with_classifier(
1229 keys: Keys,
1230 config: WebRTCConfig,
1231 classifier: PeerClassifier,
1232 ) -> Self {
1233 let mut manager = Self::new(keys, config);
1234 manager.peer_classifier = classifier;
1235 manager
1236 }
1237
1238 pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
1240 let mut manager = Self::new(keys, config);
1241 manager.store = Some(store);
1242 manager
1243 }
1244
1245 pub fn new_with_store_and_classifier(
1247 keys: Keys,
1248 config: WebRTCConfig,
1249 store: Arc<dyn ContentStore>,
1250 classifier: PeerClassifier,
1251 ) -> Self {
1252 Self::new_with_store_and_classifier_and_cashu(
1253 keys,
1254 config,
1255 store,
1256 classifier,
1257 CashuRoutingConfig::default(),
1258 None,
1259 None,
1260 )
1261 }
1262
1263 pub fn new_with_state_and_store_and_classifier(
1264 keys: Keys,
1265 config: WebRTCConfig,
1266 state: Arc<WebRTCState>,
1267 store: Arc<dyn ContentStore>,
1268 classifier: PeerClassifier,
1269 ) -> Self {
1270 let mut manager = Self::new_with_state(keys, config, state);
1271 manager.store = Some(store);
1272 manager.peer_classifier = classifier;
1273 manager
1274 }
1275
1276 pub fn new_with_store_and_classifier_and_cashu(
1277 keys: Keys,
1278 config: WebRTCConfig,
1279 store: Arc<dyn ContentStore>,
1280 classifier: PeerClassifier,
1281 cashu_routing: CashuRoutingConfig,
1282 payment_client: Option<Arc<dyn CashuPaymentClient>>,
1283 mint_metadata: Option<Arc<CashuMintMetadataStore>>,
1284 ) -> Self {
1285 let mut manager = Self::new(keys, config);
1286 manager.state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1287 manager.config.request_selection_strategy,
1288 manager.config.request_fairness_enabled,
1289 manager.config.request_dispatch,
1290 Duration::from_millis(manager.config.message_timeout_ms),
1291 cashu_routing,
1292 payment_client,
1293 mint_metadata,
1294 ));
1295 manager.store = Some(store);
1296 manager.peer_classifier = classifier;
1297 manager
1298 }
1299
1300 pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
1302 self.store = Some(store);
1303 }
1304
1305 pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
1307 self.peer_classifier = classifier;
1308 }
1309
1310 pub fn set_nostr_relay(&mut self, relay: SharedMeshRelayClient) {
1312 self.nostr_relay = Some(relay);
1313 }
1314
1315 pub fn my_peer_id(&self) -> &PeerId {
1317 &self.my_peer_id
1318 }
1319
1320 pub fn state(&self) -> Arc<WebRTCState> {
1322 self.state.clone()
1323 }
1324
1325 pub fn shutdown_signal(&self) -> Arc<tokio::sync::watch::Sender<bool>> {
1327 self.shutdown.clone()
1328 }
1329
1330 pub fn shutdown(&self) {
1332 let _ = self.shutdown.send(true);
1333 }
1334
1335 pub async fn connected_count(&self) -> usize {
1337 self.state
1338 .runtime
1339 .connected_count
1340 .load(std::sync::atomic::Ordering::Relaxed)
1341 }
1342
1343 pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
1345 self.state
1346 .runtime
1347 .peers
1348 .read()
1349 .await
1350 .values()
1351 .map(|p| PeerStatus {
1352 peer_id: p.peer_id.to_string(),
1353 pubkey: p.peer_id.pubkey.clone(),
1354 state: p.state.to_string(),
1355 direction: p.direction,
1356 connected_at: Some(p.last_seen),
1357 pool: p.pool,
1358 })
1359 .collect()
1360 }
1361
1362 pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
1366 let peers = self.state.runtime.peers.read().await;
1367 let mut follows_connected = 0;
1368 let mut follows_active = 0;
1369 let mut other_connected = 0;
1370 let mut other_active = 0;
1371
1372 for entry in peers.values() {
1373 let is_active = entry.state == ConnectionState::Connected
1376 || entry.state == ConnectionState::Connecting;
1377
1378 match entry.pool {
1379 PeerPool::Follows => {
1380 if is_active {
1381 follows_active += 1;
1382 }
1383 if entry.state == ConnectionState::Connected {
1384 follows_connected += 1;
1385 }
1386 }
1387 PeerPool::Other => {
1388 if is_active {
1389 other_active += 1;
1390 }
1391 if entry.state == ConnectionState::Connected {
1392 other_connected += 1;
1393 }
1394 }
1395 }
1396 }
1397
1398 (
1399 follows_connected,
1400 follows_active,
1401 other_connected,
1402 other_active,
1403 )
1404 }
1405
1406 fn local_bus_max_peers(&self, source: &str) -> Option<usize> {
1407 match source {
1408 "multicast" => Some(self.config.multicast.max_peers),
1409 WIFI_AWARE_SOURCE => Some(self.config.wifi_aware.max_peers),
1410 _ => None,
1411 }
1412 }
1413
1414 #[cfg_attr(not(test), allow(dead_code))]
1415 fn can_track_local_bus_peer(
1416 &self,
1417 source: &str,
1418 peer_key: &str,
1419 peers: &HashMap<String, PeerEntry>,
1420 ) -> bool {
1421 can_track_source_peer(source, peer_key, peers, self.local_bus_max_peers(source))
1422 }
1423}
1424
1425mod runtime;