1use anyhow::Result;
13use async_trait::async_trait;
14use futures::{SinkExt, StreamExt};
15use hashtree_network::{
16 decode_signaling_event, encode_signaling_event, run_hedged_waves, sync_selector_peers,
17 ClassifyRequest as SharedClassifyRequest, HedgedWaveAction, IceCandidate as SharedIceCandidate,
18 MeshRouter, PeerLink as SharedPeerLink, PeerLinkFactory as SharedPeerLinkFactory, PeerSelector,
19 SignalingTransport as SharedSignalingTransport, TransportError as SharedTransportError,
20};
21use nostr::{ClientMessage, Filter, JsonUtil, Keys, Kind, RelayMessage};
22use std::collections::{BTreeSet, HashMap};
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25use tokio::sync::{mpsc, Mutex, RwLock};
26use tokio_tungstenite::{connect_async, tungstenite::Message};
27use tracing::{debug, error, info, warn};
28
29use super::bluetooth::{BluetoothMesh, BluetoothPeerRegistrar, BluetoothRuntimeContext};
30use super::cashu::{CashuMintMetadataStore, CashuQuoteState, CashuRoutingConfig, NegotiatedQuote};
31use super::local_bus::SharedLocalNostrBus;
32use super::multicast::MulticastNostrBus;
33use super::peer::{ContentStore, Peer, PendingRequest};
34use super::root_events::{
35 build_root_filter, hashtree_event_identifier, is_hashtree_labeled_event, pick_latest_event,
36 root_event_from_peer, PeerRootEvent,
37};
38use super::session::MeshPeer;
39use super::types::{
40 decrement_htl_with_policy, encode_quote_request, encode_request, should_forward_htl,
41 validate_mesh_frame, DataQuoteRequest, DataRequest, MeshNostrFrame, MeshNostrPayload,
42 PeerDirection, PeerId, PeerPool, PeerStateEvent, PeerStatus, RequestDispatchConfig,
43 SignalingMessage, TimedSeenSet, WebRTCConfig, HELLO_TAG, MESH_DEFAULT_HTL, MESH_EVENT_POLICY,
44 WEBRTC_KIND,
45};
46use super::wifi_aware::{mobile_wifi_aware_bridge, WifiAwareNostrBus, WIFI_AWARE_SOURCE};
47use crate::cashu_helper::CashuPaymentClient;
48use crate::nostr_relay::NostrRelay;
49
50pub type PeerClassifier = Arc<dyn Fn(&str) -> PeerPool + Send + Sync>;
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
55pub enum PeerTransport {
56 WebRtc,
57 Bluetooth,
58}
59
60impl PeerTransport {
61 pub const fn as_str(self) -> &'static str {
62 match self {
63 PeerTransport::WebRtc => "webrtc",
64 PeerTransport::Bluetooth => "bluetooth",
65 }
66 }
67}
68
69impl std::fmt::Display for PeerTransport {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.write_str((*self).as_str())
72 }
73}
74
75fn bluetooth_nostr_only_mode() -> bool {
76 matches!(
77 std::env::var("HTREE_BLUETOOTH_NOSTR_ONLY").ok().as_deref(),
78 Some("1" | "true" | "TRUE" | "yes" | "YES")
79 )
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
84pub enum PeerSignalPath {
85 Relay,
86 Multicast,
87 WifiAware,
88 Bluetooth,
89}
90
91impl PeerSignalPath {
92 pub const fn as_str(self) -> &'static str {
93 match self {
94 PeerSignalPath::Relay => "relay",
95 PeerSignalPath::Multicast => "multicast",
96 PeerSignalPath::WifiAware => WIFI_AWARE_SOURCE,
97 PeerSignalPath::Bluetooth => "bluetooth",
98 }
99 }
100
101 pub fn from_source_name(source: &str) -> Self {
102 match source {
103 "multicast" => PeerSignalPath::Multicast,
104 WIFI_AWARE_SOURCE => PeerSignalPath::WifiAware,
105 "bluetooth" => PeerSignalPath::Bluetooth,
106 _ => PeerSignalPath::Relay,
107 }
108 }
109}
110
111impl std::fmt::Display for PeerSignalPath {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 f.write_str((*self).as_str())
114 }
115}
116
117#[derive(Debug, Clone, PartialEq)]
119pub enum ConnectionState {
120 Discovered,
121 Connecting,
122 Connected,
123 Failed,
124}
125
126impl std::fmt::Display for ConnectionState {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 match self {
129 ConnectionState::Discovered => write!(f, "discovered"),
130 ConnectionState::Connecting => write!(f, "connecting"),
131 ConnectionState::Connected => write!(f, "connected"),
132 ConnectionState::Failed => write!(f, "failed"),
133 }
134 }
135}
136
137pub struct PeerEntry {
139 pub peer_id: PeerId,
140 pub direction: PeerDirection,
141 pub state: ConnectionState,
142 pub last_seen: Instant,
143 pub peer: Option<MeshPeer>,
144 pub pool: PeerPool,
145 pub transport: PeerTransport,
146 pub signal_paths: BTreeSet<PeerSignalPath>,
147 pub bytes_sent: u64,
148 pub bytes_received: u64,
149}
150
151pub struct WebRTCState {
153 pub peers: RwLock<HashMap<String, PeerEntry>>,
154 pub connected_count: std::sync::atomic::AtomicUsize,
155 pub bytes_sent: std::sync::atomic::AtomicU64,
157 pub bytes_received: std::sync::atomic::AtomicU64,
159 pub mesh_received: std::sync::atomic::AtomicU64,
161 pub mesh_forwarded: std::sync::atomic::AtomicU64,
163 pub mesh_dropped_duplicate: std::sync::atomic::AtomicU64,
165 peer_selector: Arc<RwLock<PeerSelector>>,
167 request_dispatch: RequestDispatchConfig,
169 request_timeout: Duration,
171 cashu_quotes: Arc<CashuQuoteState>,
173 local_buses: RwLock<Vec<SharedLocalNostrBus>>,
176}
177const SEEN_FRAME_CAP: usize = 4096;
178const SEEN_FRAME_TTL: Duration = Duration::from_secs(120);
179const SEEN_EVENT_CAP: usize = 8192;
180const SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
181
182type PendingRequestsMap = Arc<Mutex<HashMap<String, PendingRequest>>>;
183type ConnectedPeer = (
184 String,
185 PendingRequestsMap,
186 Arc<webrtc::data_channel::RTCDataChannel>,
187);
188type ConnectedSession = (String, MeshPeer, PeerTransport);
189type SharedProductionRouter = MeshRouter<RouterSignalingBridge, SharedRouterPeerFactory>;
190
191async fn remember_peer_signal_path(state: &WebRTCState, peer_id: &str, source: &str) {
192 if let Some(entry) = state.peers.write().await.get_mut(peer_id) {
193 entry
194 .signal_paths
195 .insert(PeerSignalPath::from_source_name(source));
196 }
197}
198
199#[derive(Clone)]
200struct RouterSignalingBridge {
201 peer_id: String,
202 pubkey: String,
203 signaling_tx: mpsc::Sender<SignalingMessage>,
204}
205
206impl RouterSignalingBridge {
207 fn new(peer_id: String, pubkey: String, signaling_tx: mpsc::Sender<SignalingMessage>) -> Self {
208 Self {
209 peer_id,
210 pubkey,
211 signaling_tx,
212 }
213 }
214}
215
216#[async_trait]
217impl SharedSignalingTransport for RouterSignalingBridge {
218 async fn connect(&self, _relays: &[String]) -> Result<(), SharedTransportError> {
219 Ok(())
220 }
221
222 async fn disconnect(&self) {}
223
224 async fn publish(&self, msg: SignalingMessage) -> Result<(), SharedTransportError> {
225 self.signaling_tx
226 .send(msg)
227 .await
228 .map_err(|e| SharedTransportError::SendFailed(e.to_string()))
229 }
230
231 async fn recv(&self) -> Option<SignalingMessage> {
232 None
233 }
234
235 fn try_recv(&self) -> Option<SignalingMessage> {
236 None
237 }
238
239 fn peer_id(&self) -> &str {
240 &self.peer_id
241 }
242
243 fn pubkey(&self) -> &str {
244 &self.pubkey
245 }
246}
247
248struct SharedRouterPeerLink {
249 peer: Arc<Peer>,
250}
251
252#[async_trait]
253impl SharedPeerLink for SharedRouterPeerLink {
254 async fn send(&self, data: Vec<u8>) -> Result<(), SharedTransportError> {
255 let dc = self
256 .peer
257 .data_channel
258 .lock()
259 .await
260 .as_ref()
261 .cloned()
262 .ok_or(SharedTransportError::NotConnected)?;
263 dc.send(&bytes::Bytes::from(data))
264 .await
265 .map(|_| ())
266 .map_err(|e| SharedTransportError::SendFailed(e.to_string()))
267 }
268
269 async fn recv(&self) -> Option<Vec<u8>> {
270 None
271 }
272
273 fn try_recv(&self) -> Option<Vec<u8>> {
274 None
275 }
276
277 fn is_open(&self) -> bool {
278 self.peer.has_data_channel()
279 }
280
281 async fn close(&self) {
282 let _ = self.peer.close().await;
283 }
284}
285
286struct SharedRouterPeerFactory {
287 my_peer_id: PeerId,
288 signaling_tx: mpsc::Sender<SignalingMessage>,
289 stun_servers: Vec<String>,
290 store: Option<Arc<dyn ContentStore>>,
291 state: Arc<WebRTCState>,
292 state_event_tx: mpsc::Sender<PeerStateEvent>,
293 nostr_relay: Option<Arc<NostrRelay>>,
294 mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
295 peer_classifier: PeerClassifier,
296 peers: RwLock<HashMap<String, Arc<Peer>>>,
297}
298
299impl SharedRouterPeerFactory {
300 fn new(
301 my_peer_id: PeerId,
302 signaling_tx: mpsc::Sender<SignalingMessage>,
303 stun_servers: Vec<String>,
304 store: Option<Arc<dyn ContentStore>>,
305 state: Arc<WebRTCState>,
306 state_event_tx: mpsc::Sender<PeerStateEvent>,
307 nostr_relay: Option<Arc<NostrRelay>>,
308 mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
309 peer_classifier: PeerClassifier,
310 ) -> Self {
311 Self {
312 my_peer_id,
313 signaling_tx,
314 stun_servers,
315 store,
316 state,
317 state_event_tx,
318 nostr_relay,
319 mesh_frame_tx,
320 peer_classifier,
321 peers: RwLock::new(HashMap::new()),
322 }
323 }
324
325 async fn register_peer(&self, peer_id: PeerId, direction: PeerDirection, peer: Arc<Peer>) {
326 let peer_key = peer_id.to_string();
327 let pool = (self.peer_classifier)(&peer_id.pubkey);
328 self.peers
329 .write()
330 .await
331 .insert(peer_key.clone(), peer.clone());
332
333 let mut peers = self.state.peers.write().await;
334 peers.insert(
335 peer_key,
336 PeerEntry {
337 peer_id,
338 direction,
339 state: ConnectionState::Connecting,
340 last_seen: Instant::now(),
341 peer: Some(MeshPeer::WebRtc(peer)),
342 pool,
343 transport: PeerTransport::WebRtc,
344 signal_paths: BTreeSet::from([PeerSignalPath::Relay]),
345 bytes_sent: 0,
346 bytes_received: 0,
347 },
348 );
349 }
350
351 async fn create_peer(
352 &self,
353 peer_id: PeerId,
354 direction: PeerDirection,
355 ) -> Result<Peer, SharedTransportError> {
356 Peer::new_with_store_and_events(
357 peer_id,
358 direction,
359 self.my_peer_id.clone(),
360 self.signaling_tx.clone(),
361 self.stun_servers.clone(),
362 self.store.clone(),
363 Some(self.state_event_tx.clone()),
364 self.nostr_relay.clone(),
365 Some(self.mesh_frame_tx.clone()),
366 Some(self.state.cashu_quotes.clone()),
367 )
368 .await
369 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))
370 }
371}
372
373#[async_trait]
374impl SharedPeerLinkFactory for SharedRouterPeerFactory {
375 async fn create_offer(
376 &self,
377 target_peer_id: &str,
378 ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
379 let target_peer = PeerId::from_string(target_peer_id).ok_or_else(|| {
380 SharedTransportError::ConnectionFailed(format!("invalid peer id {target_peer_id}"))
381 })?;
382 let peer = Arc::new(
383 self.create_peer(target_peer.clone(), PeerDirection::Outbound)
384 .await?,
385 );
386 peer.setup_handlers()
387 .await
388 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
389 let offer = peer
390 .connect()
391 .await
392 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
393 let sdp = offer
394 .get("sdp")
395 .and_then(|value| value.as_str())
396 .ok_or_else(|| {
397 SharedTransportError::ConnectionFailed("missing SDP in CLI peer offer".to_string())
398 })?
399 .to_string();
400 self.register_peer(target_peer, PeerDirection::Outbound, peer.clone())
401 .await;
402 Ok((Arc::new(SharedRouterPeerLink { peer }), sdp))
403 }
404
405 async fn accept_offer(
406 &self,
407 from_peer_id: &str,
408 offer_sdp: &str,
409 ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
410 let from_peer = PeerId::from_string(from_peer_id).ok_or_else(|| {
411 SharedTransportError::ConnectionFailed(format!("invalid peer id {from_peer_id}"))
412 })?;
413 let peer = Arc::new(
414 self.create_peer(from_peer.clone(), PeerDirection::Inbound)
415 .await?,
416 );
417 peer.setup_handlers()
418 .await
419 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
420 let answer = peer
421 .handle_offer(serde_json::json!({ "type": "offer", "sdp": offer_sdp }))
422 .await
423 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
424 let sdp = answer
425 .get("sdp")
426 .and_then(|value| value.as_str())
427 .ok_or_else(|| {
428 SharedTransportError::ConnectionFailed("missing SDP in CLI peer answer".to_string())
429 })?
430 .to_string();
431 self.register_peer(from_peer, PeerDirection::Inbound, peer.clone())
432 .await;
433 Ok((Arc::new(SharedRouterPeerLink { peer }), sdp))
434 }
435
436 async fn handle_answer(
437 &self,
438 target_peer_id: &str,
439 answer_sdp: &str,
440 ) -> Result<Arc<dyn SharedPeerLink>, SharedTransportError> {
441 let peer = self
442 .peers
443 .read()
444 .await
445 .get(target_peer_id)
446 .cloned()
447 .ok_or_else(|| {
448 SharedTransportError::ConnectionFailed(format!(
449 "missing outbound peer for {target_peer_id}"
450 ))
451 })?;
452 peer.handle_answer(serde_json::json!({ "type": "answer", "sdp": answer_sdp }))
453 .await
454 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
455 Ok(Arc::new(SharedRouterPeerLink { peer }))
456 }
457
458 async fn handle_candidate(
459 &self,
460 peer_id: &str,
461 candidate: SharedIceCandidate,
462 ) -> Result<(), SharedTransportError> {
463 let peer = self.peers.read().await.get(peer_id).cloned();
464 if let Some(peer) = peer {
465 peer.handle_candidate(serde_json::json!({
466 "candidate": candidate.candidate,
467 "sdpMLineIndex": candidate.sdp_m_line_index,
468 "sdpMid": candidate.sdp_mid,
469 }))
470 .await
471 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
472 }
473 Ok(())
474 }
475
476 async fn remove_peer(&self, peer_id: &str) -> Result<(), SharedTransportError> {
477 self.peers.write().await.remove(peer_id);
478 Ok(())
479 }
480}
481
482impl WebRTCState {
483 pub fn new() -> Self {
484 let cfg = WebRTCConfig::default();
485 Self::new_with_routing_and_cashu(
486 cfg.request_selection_strategy,
487 cfg.request_fairness_enabled,
488 cfg.request_dispatch,
489 Duration::from_millis(cfg.message_timeout_ms),
490 CashuRoutingConfig::default(),
491 None,
492 None,
493 )
494 }
495
496 pub fn new_with_routing(
497 selection_strategy: super::types::SelectionStrategy,
498 fairness_enabled: bool,
499 request_dispatch: RequestDispatchConfig,
500 ) -> Self {
501 let cfg = WebRTCConfig::default();
502 Self::new_with_routing_and_cashu(
503 selection_strategy,
504 fairness_enabled,
505 request_dispatch,
506 Duration::from_millis(cfg.message_timeout_ms),
507 CashuRoutingConfig::default(),
508 None,
509 None,
510 )
511 }
512
513 pub fn new_with_routing_and_cashu(
514 selection_strategy: super::types::SelectionStrategy,
515 fairness_enabled: bool,
516 request_dispatch: RequestDispatchConfig,
517 request_timeout: Duration,
518 cashu_routing: CashuRoutingConfig,
519 payment_client: Option<Arc<dyn CashuPaymentClient>>,
520 mint_metadata: Option<Arc<CashuMintMetadataStore>>,
521 ) -> Self {
522 let mut selector = PeerSelector::with_strategy(selection_strategy);
523 selector.set_fairness(fairness_enabled);
524 let peer_selector = Arc::new(RwLock::new(selector));
525 let cashu_quotes = Arc::new(if let Some(mint_metadata) = mint_metadata {
526 CashuQuoteState::new_with_mint_metadata(
527 cashu_routing,
528 peer_selector.clone(),
529 payment_client,
530 mint_metadata,
531 )
532 } else {
533 CashuQuoteState::new(cashu_routing, peer_selector.clone(), payment_client)
534 });
535 Self {
536 peers: RwLock::new(HashMap::new()),
537 connected_count: std::sync::atomic::AtomicUsize::new(0),
538 bytes_sent: std::sync::atomic::AtomicU64::new(0),
539 bytes_received: std::sync::atomic::AtomicU64::new(0),
540 mesh_received: std::sync::atomic::AtomicU64::new(0),
541 mesh_forwarded: std::sync::atomic::AtomicU64::new(0),
542 mesh_dropped_duplicate: std::sync::atomic::AtomicU64::new(0),
543 peer_selector,
544 request_dispatch,
545 request_timeout,
546 cashu_quotes,
547 local_buses: RwLock::new(Vec::new()),
548 }
549 }
550
551 pub async fn set_local_buses(&self, buses: Vec<SharedLocalNostrBus>) {
552 *self.local_buses.write().await = buses;
553 }
554
555 pub async fn add_local_bus(&self, bus: SharedLocalNostrBus) {
556 self.local_buses.write().await.push(bus);
557 }
558
559 pub async fn set_multicast_bus(&self, bus: Option<Arc<MulticastNostrBus>>) {
560 let buses = bus
561 .into_iter()
562 .map(|bus| bus as SharedLocalNostrBus)
563 .collect();
564 self.set_local_buses(buses).await;
565 }
566
567 pub async fn reset_runtime_state(&self) {
570 self.set_local_buses(Vec::new()).await;
571 let peers = {
572 let mut peers = self.peers.write().await;
573 std::mem::take(&mut *peers)
574 };
575 self.connected_count
576 .store(0, std::sync::atomic::Ordering::Relaxed);
577 for entry in peers.into_values() {
578 if let Some(peer) = entry.peer {
579 let _ = peer.close().await;
580 }
581 }
582 }
583
584 pub fn get_bandwidth(&self) -> (u64, u64) {
586 (
587 self.bytes_sent.load(std::sync::atomic::Ordering::Relaxed),
588 self.bytes_received
589 .load(std::sync::atomic::Ordering::Relaxed),
590 )
591 }
592
593 pub fn get_mesh_stats(&self) -> (u64, u64, u64) {
594 (
595 self.mesh_received
596 .load(std::sync::atomic::Ordering::Relaxed),
597 self.mesh_forwarded
598 .load(std::sync::atomic::Ordering::Relaxed),
599 self.mesh_dropped_duplicate
600 .load(std::sync::atomic::Ordering::Relaxed),
601 )
602 }
603
604 pub fn record_mesh_received(&self) {
605 self.mesh_received
606 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
607 }
608
609 pub fn record_mesh_forwarded(&self, count: u64) {
610 self.mesh_forwarded
611 .fetch_add(count, std::sync::atomic::Ordering::Relaxed);
612 }
613
614 pub fn record_mesh_duplicate_drop(&self) {
615 self.mesh_dropped_duplicate
616 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
617 }
618
619 pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
621 self.bytes_sent
622 .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
623 if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
624 entry.bytes_sent += bytes;
625 }
626 }
627
628 pub async fn record_received(&self, peer_id: &str, bytes: u64) {
630 self.bytes_received
631 .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
632 if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
633 entry.bytes_received += bytes;
634 }
635 }
636
637 pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
641 self.request_from_peers_with_source(hash_hex)
642 .await
643 .map(|(data, _peer_id)| data)
644 }
645
646 pub async fn request_from_peers_with_source(
648 &self,
649 hash_hex: &str,
650 ) -> Option<(Vec<u8>, String)> {
651 use super::types::BLOB_REQUEST_POLICY;
652
653 let peers = self.peers.read().await;
654
655 let peer_refs: Vec<_> = peers
656 .values()
657 .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
658 .filter_map(|p| {
659 p.peer
660 .clone()
661 .map(|peer| (p.peer_id.to_string(), peer, p.transport))
662 })
663 .collect();
664
665 drop(peers); let mut connected_peers: Vec<ConnectedPeer> = Vec::new();
668 let mut connected_sessions: Vec<ConnectedSession> = Vec::new();
669 for (peer_id, peer, transport) in peer_refs {
670 if !peer.is_ready() {
671 continue;
672 }
673 if bluetooth_nostr_only_mode() && transport == PeerTransport::Bluetooth {
674 continue;
675 }
676 if let Some(webrtc_peer) = peer.as_webrtc() {
677 let dc_guard = webrtc_peer.data_channel.lock().await;
678 if let Some(dc) = dc_guard.as_ref() {
679 connected_peers.push((
680 peer_id.clone(),
681 webrtc_peer.pending_requests.clone(),
682 dc.clone(),
683 ));
684 }
685 }
686 connected_sessions.push((peer_id, peer, transport));
687 }
688
689 if connected_sessions.is_empty() {
690 debug!(
691 "No connected peers to query for {}",
692 &hash_hex[..8.min(hash_hex.len())]
693 );
694 return None;
695 }
696
697 let hash_bytes = match hex::decode(hash_hex) {
699 Ok(b) => b,
700 Err(_) => return None,
701 };
702
703 let expected_hash: [u8; 32] = match hash_bytes.as_slice().try_into() {
704 Ok(h) => h,
705 Err(_) => {
706 debug!(
707 "Invalid hash length {}, expected 32 bytes",
708 hash_bytes.len()
709 );
710 return None;
711 }
712 };
713
714 let connected_peer_ids: Vec<String> = connected_sessions
715 .iter()
716 .map(|(peer_id, _, _)| peer_id.clone())
717 .collect();
718 sync_selector_peers(self.peer_selector.as_ref(), &connected_peer_ids).await;
719
720 let ordered_peer_ids = self.peer_selector.write().await.select_peers();
721 let mut quote_by_peer: HashMap<
722 String,
723 (
724 PendingRequestsMap,
725 Arc<webrtc::data_channel::RTCDataChannel>,
726 ),
727 > = connected_peers
728 .iter()
729 .cloned()
730 .map(|(peer_id, pending, dc)| (peer_id, (pending, dc)))
731 .collect();
732 let mut ordered_quote_peers: Vec<ConnectedPeer> = Vec::new();
733 for peer_id in &ordered_peer_ids {
734 if let Some((pending, dc)) = quote_by_peer.remove(peer_id) {
735 ordered_quote_peers.push((peer_id.clone(), pending, dc));
736 }
737 }
738 for (peer_id, (pending, dc)) in quote_by_peer {
739 ordered_quote_peers.push((peer_id, pending, dc));
740 }
741
742 let mut by_peer: HashMap<String, (MeshPeer, PeerTransport)> = connected_sessions
743 .into_iter()
744 .map(|(peer_id, peer, transport)| (peer_id, (peer, transport)))
745 .collect();
746
747 let mut ordered_peers: Vec<ConnectedSession> = Vec::new();
748 for peer_id in ordered_peer_ids {
749 if let Some((peer, transport)) = by_peer.remove(&peer_id) {
750 ordered_peers.push((peer_id, peer, transport));
751 }
752 }
753 for (peer_id, (peer, transport)) in by_peer {
754 ordered_peers.push((peer_id, peer, transport));
755 }
756
757 debug!(
758 "Querying {} peers for {} with shared hedged scheduler",
759 ordered_peers.len(),
760 &hash_hex[..8.min(hash_hex.len())],
761 );
762
763 if let Some((requested_mint, payment_sat, quote_ttl_ms)) =
764 self.cashu_quotes.requester_quote_terms().await
765 {
766 if let Some(quote) = self
767 .request_quote_from_peers(
768 &hash_bytes,
769 requested_mint,
770 payment_sat,
771 quote_ttl_ms,
772 &ordered_quote_peers,
773 )
774 .await
775 {
776 if let Some(data) = self
777 .request_from_single_peer(
778 hash_hex,
779 &hash_bytes,
780 expected_hash,
781 "e.peer_id,
782 Some("e),
783 &ordered_quote_peers,
784 )
785 .await
786 {
787 debug!(
788 "Got quoted response from peer {} for {}",
789 quote.peer_id,
790 &hash_hex[..8.min(hash_hex.len())]
791 );
792 return Some((data, quote.peer_id));
793 }
794 }
795 }
796
797 let request = DataRequest {
798 h: hash_bytes.clone(),
799 htl: BLOB_REQUEST_POLICY.max_htl,
800 q: None,
801 };
802 let wire = match encode_request(&request) {
803 Ok(w) => w,
804 Err(_) => return None,
805 };
806 let wire_len = wire.len() as u64;
807 let current_result_rx = Arc::new(Mutex::new(None));
808 if let Some((data, peer_id)) = run_hedged_waves(
809 ordered_peers.len(),
810 self.request_dispatch,
811 self.request_timeout,
812 |range| {
813 let wave_peers = ordered_peers[range].to_vec();
814 let (result_tx, result_rx) =
815 mpsc::channel::<(String, Instant, Result<Option<Vec<u8>>>)>(wave_peers.len());
816 let current_result_rx = current_result_rx.clone();
817 let hash_hex = hash_hex.to_string();
818 async move {
819 *current_result_rx.lock().await = Some(result_rx);
820 let sent = wave_peers.len();
821 for (peer_id, peer, transport) in wave_peers {
822 if transport != PeerTransport::Bluetooth {
823 self.record_sent(&peer_id, wire_len).await;
824 }
825 self.peer_selector
826 .write()
827 .await
828 .record_request(&peer_id, wire_len);
829
830 let result_tx = result_tx.clone();
831 let peer_id_for_task = peer_id.clone();
832 let peer = peer.clone();
833 let hash_hex = hash_hex.clone();
834 let per_request_timeout = self.request_timeout;
835 tokio::spawn(async move {
836 let started = Instant::now();
837 let result = peer.request(&hash_hex, per_request_timeout).await;
838 let _ = result_tx.send((peer_id_for_task, started, result)).await;
839 });
840 }
841 drop(result_tx);
842 sent
843 }
844 },
845 |wait| {
846 let current_result_rx = current_result_rx.clone();
847 async move {
848 let mut current_result_rx = current_result_rx.lock().await;
849 let Some(result_rx) = current_result_rx.as_mut() else {
850 return HedgedWaveAction::Abort;
851 };
852 let deadline = Instant::now() + wait;
853 loop {
854 let now = Instant::now();
855 if now >= deadline {
856 return HedgedWaveAction::Continue;
857 }
858 let remaining = deadline.saturating_duration_since(now);
859 match tokio::time::timeout(remaining, result_rx.recv()).await {
860 Ok(Some((peer_id, started, Ok(Some(data))))) => {
861 let rtt_ms = started.elapsed().as_millis() as u64;
862 if hashtree_core::sha256(&data) == expected_hash {
863 let should_record = {
864 let peers = self.peers.read().await;
865 peers
866 .get(&peer_id)
867 .map(|entry| {
868 entry.transport != PeerTransport::Bluetooth
869 })
870 .unwrap_or(true)
871 };
872 if should_record {
873 self.record_received(&peer_id, data.len() as u64).await;
874 }
875 self.peer_selector.write().await.record_success(
876 &peer_id,
877 rtt_ms,
878 data.len() as u64,
879 );
880 return HedgedWaveAction::Success((data, peer_id));
881 }
882 self.peer_selector.write().await.record_failure(&peer_id);
883 }
884 Ok(Some((peer_id, _, Ok(None)))) | Ok(Some((peer_id, _, Err(_)))) => {
885 self.peer_selector.write().await.record_timeout(&peer_id);
886 }
887 Ok(None) | Err(_) => return HedgedWaveAction::Continue,
888 }
889 }
890 }
891 },
892 )
893 .await
894 {
895 debug!(
896 "Got response from peer {} for {}",
897 peer_id,
898 &hash_hex[..8.min(hash_hex.len())]
899 );
900 return Some((data, peer_id));
901 }
902
903 debug!(
904 "No peer had data for {}",
905 &hash_hex[..8.min(hash_hex.len())]
906 );
907 None
908 }
909
910 async fn request_quote_from_peers(
911 &self,
912 hash_bytes: &[u8],
913 requested_mint: String,
914 payment_sat: u64,
915 quote_ttl_ms: u32,
916 ordered_peers: &[ConnectedPeer],
917 ) -> Option<NegotiatedQuote> {
918 if ordered_peers.is_empty() || quote_ttl_ms == 0 {
919 return None;
920 }
921
922 let hash_hex = hex::encode(hash_bytes);
923 let rx = self
924 .cashu_quotes
925 .register_pending_quote(hash_hex.clone(), Some(requested_mint.clone()), payment_sat)
926 .await;
927 let quote_request = DataQuoteRequest {
928 h: hash_bytes.to_vec(),
929 p: payment_sat,
930 t: quote_ttl_ms,
931 m: Some(requested_mint),
932 };
933 let wire = match encode_quote_request("e_request) {
934 Ok(wire) => wire,
935 Err(_) => {
936 self.cashu_quotes.clear_pending_quote(&hash_hex).await;
937 return None;
938 }
939 };
940 let rx = Arc::new(Mutex::new(rx));
941 let result = run_hedged_waves(
942 ordered_peers.len(),
943 self.request_dispatch,
944 self.request_timeout,
945 |range| {
946 let wave_peers = ordered_peers[range].to_vec();
947 let wire = wire.clone();
948 async move {
949 let mut sent = 0usize;
950 for (_, _, dc) in wave_peers {
951 if dc.send(&bytes::Bytes::copy_from_slice(&wire)).await.is_ok() {
952 sent += 1;
953 }
954 }
955 sent
956 }
957 },
958 |wait| {
959 let rx = rx.clone();
960 async move {
961 let mut rx = rx.lock().await;
962 match tokio::time::timeout(wait, &mut *rx).await {
963 Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
964 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
965 Err(_) => HedgedWaveAction::Continue,
966 }
967 }
968 },
969 )
970 .await;
971
972 self.cashu_quotes.clear_pending_quote(&hash_hex).await;
973 result
974 }
975
976 async fn request_from_single_peer(
977 &self,
978 hash_hex: &str,
979 hash_bytes: &[u8],
980 expected_hash: [u8; 32],
981 target_peer_id: &str,
982 quote: Option<&NegotiatedQuote>,
983 ordered_peers: &[ConnectedPeer],
984 ) -> Option<Vec<u8>> {
985 use super::types::BLOB_REQUEST_POLICY;
986
987 let (pending_requests, dc) = ordered_peers
988 .iter()
989 .find(|(peer_id, _, _)| peer_id == target_peer_id)
990 .map(|(_, pending_requests, dc)| (pending_requests.clone(), dc.clone()))?;
991
992 let request = DataRequest {
993 h: hash_bytes.to_vec(),
994 htl: BLOB_REQUEST_POLICY.max_htl,
995 q: quote.map(|quote| quote.quote_id),
996 };
997 let wire = encode_request(&request).ok()?;
998 let wire_len = wire.len() as u64;
999 let sent_at = Instant::now();
1000 let (tx, mut rx) = tokio::sync::oneshot::channel();
1001
1002 {
1003 let mut pending = pending_requests.lock().await;
1004 pending.insert(
1005 hash_hex.to_string(),
1006 if let Some(quote) = quote {
1007 PendingRequest::quoted(
1008 hash_bytes.to_vec(),
1009 tx,
1010 quote.quote_id,
1011 quote.mint_url.clone().unwrap_or_default(),
1012 quote.payment_sat,
1013 )
1014 } else {
1015 PendingRequest::standard(hash_bytes.to_vec(), tx)
1016 },
1017 );
1018 }
1019
1020 if dc
1021 .send(&bytes::Bytes::copy_from_slice(&wire))
1022 .await
1023 .is_err()
1024 {
1025 let mut pending = pending_requests.lock().await;
1026 pending.remove(hash_hex);
1027 self.peer_selector
1028 .write()
1029 .await
1030 .record_failure(target_peer_id);
1031 return None;
1032 }
1033
1034 self.record_sent(target_peer_id, wire_len).await;
1035 self.peer_selector
1036 .write()
1037 .await
1038 .record_request(target_peer_id, wire_len);
1039
1040 let wait_timeout = if let Some(quote) = quote {
1041 let multiplier = quote.payment_sat.clamp(1, 32) as u128;
1042 let extra_ms = self
1043 .cashu_quotes
1044 .settlement_timeout()
1045 .as_millis()
1046 .saturating_mul(multiplier);
1047 self.request_timeout + Duration::from_millis(extra_ms.min(u64::MAX as u128) as u64)
1048 } else {
1049 self.request_timeout
1050 };
1051
1052 match tokio::time::timeout(wait_timeout, &mut rx).await {
1053 Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == expected_hash => {
1054 let rtt_ms = sent_at.elapsed().as_millis() as u64;
1055 self.record_received(target_peer_id, data.len() as u64)
1056 .await;
1057 self.peer_selector.write().await.record_success(
1058 target_peer_id,
1059 rtt_ms,
1060 data.len() as u64,
1061 );
1062 Some(data)
1063 }
1064 Ok(Ok(Some(_))) => {
1065 self.peer_selector
1066 .write()
1067 .await
1068 .record_failure(target_peer_id);
1069 let pending = pending_requests.lock().await.remove(hash_hex);
1070 if let Some(pending) = pending {
1071 if let Some(quoted) = pending.quoted {
1072 if let Some(in_flight) = quoted.in_flight_payment {
1073 let _ = self
1074 .cashu_quotes
1075 .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
1076 .await;
1077 }
1078 }
1079 }
1080 None
1081 }
1082 Ok(Ok(None)) | Ok(Err(_)) | Err(_) => {
1083 let pending = pending_requests.lock().await.remove(hash_hex);
1084 if let Some(pending) = pending {
1085 if let Some(quoted) = pending.quoted {
1086 if let Some(in_flight) = quoted.in_flight_payment {
1087 let _ = self
1088 .cashu_quotes
1089 .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
1090 .await;
1091 }
1092 }
1093 }
1094 self.peer_selector
1095 .write()
1096 .await
1097 .record_timeout(target_peer_id);
1098 None
1099 }
1100 }
1101 }
1102
1103 pub async fn resolve_root_from_peers(
1105 &self,
1106 owner_pubkey: &str,
1107 tree_name: &str,
1108 per_peer_timeout: Duration,
1109 ) -> Option<PeerRootEvent> {
1110 let filter = build_root_filter(owner_pubkey, tree_name)?;
1111
1112 let peer_refs: Vec<_> = {
1113 let peers = self.peers.read().await;
1114 peers
1115 .values()
1116 .filter(|entry| entry.state == ConnectionState::Connected)
1117 .filter(|entry| {
1118 !bluetooth_nostr_only_mode() || entry.transport != PeerTransport::Bluetooth
1119 })
1120 .filter_map(|entry| {
1121 let peer = entry.peer.as_ref()?;
1122 if !peer.is_ready() {
1123 return None;
1124 }
1125 Some((entry.peer_id.short(), peer.clone()))
1126 })
1127 .collect()
1128 };
1129
1130 for (peer_short, peer) in peer_refs {
1131 debug!(
1132 "Querying peer {} for root event {}/{}",
1133 peer_short, owner_pubkey, tree_name
1134 );
1135 let events = match peer
1136 .query_nostr_events(vec![filter.clone()], per_peer_timeout)
1137 .await
1138 {
1139 Ok(events) => events,
1140 Err(e) => {
1141 debug!(
1142 "Peer {} Nostr query failed for {}/{}: {}",
1143 peer_short, owner_pubkey, tree_name, e
1144 );
1145 continue;
1146 }
1147 };
1148 debug!(
1149 "Peer {} returned {} Nostr event(s) for {}/{}",
1150 peer_short,
1151 events.len(),
1152 owner_pubkey,
1153 tree_name
1154 );
1155
1156 let latest = pick_latest_event(events.iter().filter(|event| {
1157 hashtree_event_identifier(event).as_deref() == Some(tree_name)
1158 && is_hashtree_labeled_event(event)
1159 }));
1160 if let Some(event) = latest {
1161 if let Some(root) = root_event_from_peer(event, &peer_short, tree_name) {
1162 debug!(
1163 "Resolved {}/{} via peer {} event {}",
1164 owner_pubkey,
1165 tree_name,
1166 peer_short,
1167 event.id.to_hex()
1168 );
1169 return Some(root);
1170 }
1171 }
1172 }
1173
1174 None
1175 }
1176
1177 pub async fn resolve_root_from_local_buses_with_source(
1178 &self,
1179 owner_pubkey: &str,
1180 tree_name: &str,
1181 timeout: Duration,
1182 ) -> Option<(&'static str, PeerRootEvent)> {
1183 let buses = self.local_buses.read().await.clone();
1184 for bus in buses {
1185 if let Some(root) = bus.query_root(owner_pubkey, tree_name, timeout).await {
1186 return Some((bus.source_name(), root));
1187 }
1188 }
1189 None
1190 }
1191
1192 pub async fn resolve_root_from_local_buses(
1193 &self,
1194 owner_pubkey: &str,
1195 tree_name: &str,
1196 timeout: Duration,
1197 ) -> Option<PeerRootEvent> {
1198 self.resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
1199 .await
1200 .map(|(_, root)| root)
1201 }
1202
1203 pub async fn resolve_root_from_multicast(
1204 &self,
1205 owner_pubkey: &str,
1206 tree_name: &str,
1207 timeout: Duration,
1208 ) -> Option<PeerRootEvent> {
1209 self.resolve_root_from_local_buses(owner_pubkey, tree_name, timeout)
1210 .await
1211 }
1212}
1213
1214impl Default for WebRTCState {
1215 fn default() -> Self {
1216 Self::new()
1217 }
1218}
1219
1220pub type PeerRouterState = WebRTCState;
1221
1222pub struct WebRTCManager {
1224 config: WebRTCConfig,
1225 my_peer_id: PeerId,
1226 keys: Keys,
1227 state: Arc<WebRTCState>,
1228 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
1229 shutdown_rx: tokio::sync::watch::Receiver<bool>,
1230 signaling_tx: mpsc::Sender<SignalingMessage>,
1232 signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
1233 store: Option<Arc<dyn ContentStore>>,
1235 peer_classifier: PeerClassifier,
1237 nostr_relay: Option<Arc<NostrRelay>>,
1239 local_buses: Vec<SharedLocalNostrBus>,
1240 state_event_tx: mpsc::Sender<PeerStateEvent>,
1242 state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
1243 mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
1245 mesh_frame_rx: Option<mpsc::Receiver<(PeerId, MeshNostrFrame)>>,
1246 shared_router: Option<Arc<SharedProductionRouter>>,
1247 seen_frame_ids: Arc<Mutex<TimedSeenSet>>,
1248 seen_event_ids: Arc<Mutex<TimedSeenSet>>,
1249}
1250
1251impl WebRTCManager {
1252 pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
1254 let pubkey = keys.public_key().to_hex();
1255 let my_peer_id = PeerId::new(pubkey, None);
1256 let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
1257 let (signaling_tx, signaling_rx) = mpsc::channel(100);
1258 let (state_event_tx, state_event_rx) = mpsc::channel(100);
1259 let (mesh_frame_tx, mesh_frame_rx) = mpsc::channel(256);
1260 let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1261 config.request_selection_strategy,
1262 config.request_fairness_enabled,
1263 config.request_dispatch,
1264 Duration::from_millis(config.message_timeout_ms),
1265 CashuRoutingConfig::default(),
1266 None,
1267 None,
1268 ));
1269
1270 let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
1272
1273 Self {
1274 config,
1275 my_peer_id,
1276 keys,
1277 state,
1278 shutdown: Arc::new(shutdown),
1279 shutdown_rx,
1280 signaling_tx,
1281 signaling_rx: Some(signaling_rx),
1282 store: None,
1283 peer_classifier,
1284 nostr_relay: None,
1285 local_buses: Vec::new(),
1286 state_event_tx,
1287 state_event_rx: Some(state_event_rx),
1288 mesh_frame_tx,
1289 mesh_frame_rx: Some(mesh_frame_rx),
1290 shared_router: None,
1291 seen_frame_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1292 SEEN_FRAME_CAP,
1293 SEEN_FRAME_TTL,
1294 ))),
1295 seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1296 SEEN_EVENT_CAP,
1297 SEEN_EVENT_TTL,
1298 ))),
1299 }
1300 }
1301
1302 pub fn new_with_state(keys: Keys, config: WebRTCConfig, state: Arc<WebRTCState>) -> Self {
1304 let mut manager = Self::new(keys, config);
1305 manager.state = state;
1306 manager
1307 }
1308
1309 pub fn new_with_classifier(
1311 keys: Keys,
1312 config: WebRTCConfig,
1313 classifier: PeerClassifier,
1314 ) -> Self {
1315 let mut manager = Self::new(keys, config);
1316 manager.peer_classifier = classifier;
1317 manager
1318 }
1319
1320 pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
1322 let mut manager = Self::new(keys, config);
1323 manager.store = Some(store);
1324 manager
1325 }
1326
1327 pub fn new_with_store_and_classifier(
1329 keys: Keys,
1330 config: WebRTCConfig,
1331 store: Arc<dyn ContentStore>,
1332 classifier: PeerClassifier,
1333 ) -> Self {
1334 Self::new_with_store_and_classifier_and_cashu(
1335 keys,
1336 config,
1337 store,
1338 classifier,
1339 CashuRoutingConfig::default(),
1340 None,
1341 None,
1342 )
1343 }
1344
1345 pub fn new_with_state_and_store_and_classifier(
1346 keys: Keys,
1347 config: WebRTCConfig,
1348 state: Arc<WebRTCState>,
1349 store: Arc<dyn ContentStore>,
1350 classifier: PeerClassifier,
1351 ) -> Self {
1352 let mut manager = Self::new_with_state(keys, config, state);
1353 manager.store = Some(store);
1354 manager.peer_classifier = classifier;
1355 manager
1356 }
1357
1358 pub fn new_with_store_and_classifier_and_cashu(
1359 keys: Keys,
1360 config: WebRTCConfig,
1361 store: Arc<dyn ContentStore>,
1362 classifier: PeerClassifier,
1363 cashu_routing: CashuRoutingConfig,
1364 payment_client: Option<Arc<dyn CashuPaymentClient>>,
1365 mint_metadata: Option<Arc<CashuMintMetadataStore>>,
1366 ) -> Self {
1367 let mut manager = Self::new(keys, config);
1368 manager.state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1369 manager.config.request_selection_strategy,
1370 manager.config.request_fairness_enabled,
1371 manager.config.request_dispatch,
1372 Duration::from_millis(manager.config.message_timeout_ms),
1373 cashu_routing,
1374 payment_client,
1375 mint_metadata,
1376 ));
1377 manager.store = Some(store);
1378 manager.peer_classifier = classifier;
1379 manager
1380 }
1381
1382 pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
1384 self.store = Some(store);
1385 }
1386
1387 pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
1389 self.peer_classifier = classifier;
1390 }
1391
1392 pub fn set_nostr_relay(&mut self, relay: Arc<NostrRelay>) {
1394 self.nostr_relay = Some(relay);
1395 }
1396
1397 pub fn my_peer_id(&self) -> &PeerId {
1399 &self.my_peer_id
1400 }
1401
1402 pub fn state(&self) -> Arc<WebRTCState> {
1404 self.state.clone()
1405 }
1406
1407 pub fn shutdown_signal(&self) -> Arc<tokio::sync::watch::Sender<bool>> {
1409 self.shutdown.clone()
1410 }
1411
1412 pub fn shutdown(&self) {
1414 let _ = self.shutdown.send(true);
1415 }
1416
1417 pub async fn connected_count(&self) -> usize {
1419 self.state
1420 .connected_count
1421 .load(std::sync::atomic::Ordering::Relaxed)
1422 }
1423
1424 pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
1426 self.state
1427 .peers
1428 .read()
1429 .await
1430 .values()
1431 .map(|p| PeerStatus {
1432 peer_id: p.peer_id.to_string(),
1433 pubkey: p.peer_id.pubkey.clone(),
1434 state: p.state.to_string(),
1435 direction: p.direction,
1436 connected_at: Some(p.last_seen),
1437 pool: p.pool,
1438 })
1439 .collect()
1440 }
1441
1442 pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
1446 let peers = self.state.peers.read().await;
1447 let mut follows_connected = 0;
1448 let mut follows_active = 0;
1449 let mut other_connected = 0;
1450 let mut other_active = 0;
1451
1452 for entry in peers.values() {
1453 let is_active = entry.state == ConnectionState::Connected
1456 || entry.state == ConnectionState::Connecting;
1457
1458 match entry.pool {
1459 PeerPool::Follows => {
1460 if is_active {
1461 follows_active += 1;
1462 }
1463 if entry.state == ConnectionState::Connected {
1464 follows_connected += 1;
1465 }
1466 }
1467 PeerPool::Other => {
1468 if is_active {
1469 other_active += 1;
1470 }
1471 if entry.state == ConnectionState::Connected {
1472 other_connected += 1;
1473 }
1474 }
1475 }
1476 }
1477
1478 (
1479 follows_connected,
1480 follows_active,
1481 other_connected,
1482 other_active,
1483 )
1484 }
1485
1486 fn local_hello_message(&self) -> SignalingMessage {
1487 SignalingMessage::Hello {
1488 peer_id: self.my_peer_id.to_string(),
1489 roots: Vec::new(),
1490 }
1491 }
1492
1493 fn local_bus_max_peers(&self, source: &str) -> Option<usize> {
1494 match source {
1495 "multicast" => Some(self.config.multicast.max_peers),
1496 WIFI_AWARE_SOURCE => Some(self.config.wifi_aware.max_peers),
1497 _ => None,
1498 }
1499 }
1500
1501 fn can_track_local_bus_peer(
1502 &self,
1503 source: &str,
1504 peer_key: &str,
1505 peers: &HashMap<String, PeerEntry>,
1506 ) -> bool {
1507 let Some(max_peers) = self.local_bus_max_peers(source) else {
1508 return true;
1509 };
1510 if peers.contains_key(peer_key) {
1511 return true;
1512 }
1513 if max_peers == 0 {
1514 return false;
1515 }
1516 let signal_path = PeerSignalPath::from_source_name(source);
1517 peers
1518 .values()
1519 .filter(|entry| {
1520 entry.signal_paths.contains(&signal_path) && entry.state != ConnectionState::Failed
1521 })
1522 .count()
1523 < max_peers
1524 }
1525
1526 pub async fn run(&mut self) -> Result<()> {
1528 info!(
1529 "Starting peer router with peer ID: {}",
1530 self.my_peer_id.short()
1531 );
1532
1533 let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr::Event)>(100);
1534
1535 let mut signaling_rx = self
1537 .signaling_rx
1538 .take()
1539 .expect("signaling_rx already taken");
1540
1541 let mut state_event_rx = self
1543 .state_event_rx
1544 .take()
1545 .expect("state_event_rx already taken");
1546 let mut mesh_frame_rx = self
1547 .mesh_frame_rx
1548 .take()
1549 .expect("mesh_frame_rx already taken");
1550
1551 if self.config.bluetooth.is_enabled() {
1552 let bluetooth = BluetoothMesh::new(self.config.bluetooth.clone());
1553 let context = BluetoothRuntimeContext {
1554 my_peer_id: self.my_peer_id.clone(),
1555 store: if bluetooth_nostr_only_mode() {
1556 None
1557 } else {
1558 self.store.clone()
1559 },
1560 nostr_relay: self.nostr_relay.clone(),
1561 mesh_frame_tx: self.mesh_frame_tx.clone(),
1562 registrar: BluetoothPeerRegistrar::new(
1563 self.state.clone(),
1564 self.peer_classifier.clone(),
1565 self.config.pools.clone(),
1566 self.config.bluetooth.max_peers,
1567 ),
1568 };
1569 let _ = bluetooth.start(context).await;
1570 }
1571
1572 let (relay_write_tx, _) = tokio::sync::broadcast::channel::<SignalingMessage>(100);
1574
1575 for relay_url in &self.config.relays {
1577 let url = relay_url.clone();
1578 let event_tx = event_tx.clone();
1579 let shutdown_rx = self.shutdown_rx.clone();
1580 let keys = self.keys.clone();
1581 let relay_write_rx = relay_write_tx.subscribe();
1582
1583 tokio::spawn(async move {
1584 if let Err(e) =
1585 Self::relay_task(url.clone(), event_tx, shutdown_rx, keys, relay_write_rx).await
1586 {
1587 error!("Relay {} error: {}", url, e);
1588 }
1589 });
1590 }
1591
1592 if self.config.multicast.is_enabled() {
1593 if let Some(relay) = self.nostr_relay.clone() {
1594 match MulticastNostrBus::bind(
1595 self.config.multicast.clone(),
1596 self.keys.clone(),
1597 relay,
1598 )
1599 .await
1600 {
1601 Ok(bus) => {
1602 let local_bus: SharedLocalNostrBus = bus.clone();
1603 self.state.add_local_bus(local_bus.clone()).await;
1604 self.local_buses.push(local_bus);
1605 let shutdown_rx = self.shutdown_rx.clone();
1606 let signaling_tx = event_tx.clone();
1607 tokio::spawn(async move {
1608 if let Err(err) = bus.run(shutdown_rx, signaling_tx).await {
1609 error!("Multicast bus error: {}", err);
1610 }
1611 });
1612 }
1613 Err(err) => {
1614 warn!("Failed to start multicast bus: {}", err);
1615 }
1616 }
1617 } else {
1618 warn!("Multicast enabled but Nostr relay is unavailable");
1619 }
1620 }
1621
1622 if self.config.wifi_aware.is_enabled() {
1623 if let Some(relay) = self.nostr_relay.clone() {
1624 if let Some(bridge) = mobile_wifi_aware_bridge() {
1625 let bus = WifiAwareNostrBus::new(
1626 self.config.wifi_aware.clone(),
1627 self.keys.clone(),
1628 relay,
1629 bridge,
1630 );
1631 let local_bus: SharedLocalNostrBus = bus.clone();
1632 self.state.add_local_bus(local_bus.clone()).await;
1633 self.local_buses.push(local_bus);
1634 let shutdown_rx = self.shutdown_rx.clone();
1635 let signaling_tx = event_tx.clone();
1636 let local_peer_id = self.my_peer_id.to_string();
1637 tokio::spawn(async move {
1638 if let Err(err) = bus.run(local_peer_id, shutdown_rx, signaling_tx).await {
1639 error!("Wi-Fi Aware bus error: {}", err);
1640 }
1641 });
1642 } else {
1643 warn!("Wi-Fi Aware enabled but no mobile bridge is installed");
1644 }
1645 } else {
1646 warn!("Wi-Fi Aware enabled but Nostr relay is unavailable");
1647 }
1648 }
1649
1650 if self.config.signaling_enabled {
1651 let transport = Arc::new(RouterSignalingBridge::new(
1652 self.my_peer_id.to_string(),
1653 self.my_peer_id.pubkey.clone(),
1654 self.signaling_tx.clone(),
1655 ));
1656 let factory = Arc::new(SharedRouterPeerFactory::new(
1657 self.my_peer_id.clone(),
1658 self.signaling_tx.clone(),
1659 self.config.stun_servers.clone(),
1660 self.store.clone(),
1661 self.state.clone(),
1662 self.state_event_tx.clone(),
1663 self.nostr_relay.clone(),
1664 self.mesh_frame_tx.clone(),
1665 self.peer_classifier.clone(),
1666 ));
1667 let (classifier_tx, mut classifier_rx) = mpsc::channel::<SharedClassifyRequest>(32);
1668 let classifier = self.peer_classifier.clone();
1669 tokio::spawn(async move {
1670 while let Some(request) = classifier_rx.recv().await {
1671 let _ = request.response.send(classifier(&request.pubkey));
1672 }
1673 });
1674
1675 let mut router = MeshRouter::new(
1676 self.my_peer_id.to_string(),
1677 self.my_peer_id.pubkey.clone(),
1678 transport,
1679 factory.clone(),
1680 self.config.pools.clone(),
1681 self.config.debug,
1682 );
1683 router.set_classifier(classifier_tx);
1684 self.shared_router = Some(Arc::new(router));
1685 }
1686
1687 let mut shutdown_rx = self.shutdown_rx.clone();
1689 let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
1691 let mut hello_ticker =
1692 tokio::time::interval(Duration::from_millis(self.config.hello_interval_ms));
1693 if self.config.signaling_enabled {
1694 if let Some(shared_router) = self.shared_router.as_ref() {
1695 let _ = shared_router.send_hello(Vec::new()).await;
1696 } else {
1697 self.dispatch_signaling_message(self.local_hello_message(), &relay_write_tx)
1698 .await;
1699 }
1700 }
1701 loop {
1702 tokio::select! {
1703 _ = shutdown_rx.changed() => {
1704 if *shutdown_rx.borrow() {
1705 info!("WebRTC manager shutting down");
1706 break;
1707 }
1708 }
1709 Some((relay, event)) = event_rx.recv() => {
1710 if let Err(e) = self
1711 .handle_event(&relay, &event, self.shared_router.as_ref())
1712 .await
1713 {
1714 debug!("Error handling event from {}: {}", relay, e);
1715 }
1716 }
1717 Some(msg) = signaling_rx.recv() => {
1718 self.dispatch_signaling_message(msg, &relay_write_tx).await;
1719 }
1720 Some(event) = state_event_rx.recv() => {
1721 self.handle_peer_state_event(event, &relay_write_tx).await;
1723 }
1724 Some((from_peer_id, frame)) = mesh_frame_rx.recv() => {
1725 self.handle_mesh_frame(from_peer_id, frame).await;
1726 }
1727 _ = hello_ticker.tick(), if self.config.signaling_enabled => {
1728 if let Some(shared_router) = self.shared_router.as_ref() {
1729 let _ = shared_router.send_hello(Vec::new()).await;
1730 } else {
1731 self.dispatch_signaling_message(self.local_hello_message(), &relay_write_tx)
1732 .await;
1733 }
1734 }
1735 _ = cleanup_interval.tick() => {
1736 self.cleanup_stale_peers().await;
1738 }
1739 }
1740 }
1741
1742 Ok(())
1743 }
1744
1745 async fn relay_task(
1747 url: String,
1748 event_tx: mpsc::Sender<(String, nostr::Event)>,
1749 mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
1750 keys: Keys,
1751 mut signaling_rx: tokio::sync::broadcast::Receiver<SignalingMessage>,
1752 ) -> Result<()> {
1753 info!("Connecting to relay: {}", url);
1754
1755 let (ws_stream, _) = connect_async(&url).await?;
1756 let (mut write, mut read) = ws_stream.split();
1757
1758 let hello_filter = Filter::new()
1762 .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
1763 .custom_tag(
1764 nostr::SingleLetterTag::lowercase(nostr::Alphabet::L),
1765 vec![HELLO_TAG],
1766 )
1767 .since(nostr::Timestamp::now() - Duration::from_secs(60));
1768
1769 let directed_filter = Filter::new()
1770 .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
1771 .custom_tag(
1772 nostr::SingleLetterTag::lowercase(nostr::Alphabet::P),
1773 vec![keys.public_key().to_hex()],
1774 )
1775 .since(nostr::Timestamp::now() - Duration::from_secs(60));
1776
1777 let sub_id = nostr::SubscriptionId::generate();
1778 let sub_msg = ClientMessage::req(sub_id.clone(), vec![hello_filter, directed_filter]);
1779 write.send(Message::Text(sub_msg.as_json())).await?;
1780
1781 info!(
1782 "Subscribed to {} for WebRTC events (kind {})",
1783 url, WEBRTC_KIND
1784 );
1785
1786 loop {
1787 tokio::select! {
1788 _ = shutdown_rx.changed() => {
1789 if *shutdown_rx.borrow() {
1790 break;
1791 }
1792 }
1793 Ok(signaling_msg) = signaling_rx.recv() => {
1795 info!("Sending {} via {}", signaling_msg.msg_type(), url);
1796 if let Ok(event) = Self::create_signaling_event(&keys, &signaling_msg).await {
1797 let event_id = event.id.to_string();
1798 let msg = ClientMessage::event(event);
1799 if write.send(Message::Text(msg.as_json())).await.is_ok() {
1800 info!("Sent {} to {} (event id: {})", signaling_msg.msg_type(), url, &event_id[..16]);
1801 }
1802 }
1803 }
1804 msg = read.next() => {
1805 match msg {
1806 Some(Ok(Message::Text(text))) => {
1807 if let Ok(RelayMessage::Event { event, .. }) =
1808 RelayMessage::from_json(&text)
1809 {
1810 let _ = event_tx.send((url.clone(), *event)).await;
1811 }
1812 }
1813 Some(Err(e)) => {
1814 error!("WebSocket error from {}: {}", url, e);
1815 break;
1816 }
1817 None => {
1818 warn!("WebSocket closed: {}", url);
1819 break;
1820 }
1821 _ => {}
1822 }
1823 }
1824 }
1825 }
1826
1827 Ok(())
1828 }
1829
1830 async fn mark_seen_frame_id(&self, frame_id: String) -> bool {
1831 let mut seen = self.seen_frame_ids.lock().await;
1832 seen.insert_if_new(frame_id)
1833 }
1834
1835 async fn mark_seen_event_id(&self, event_id: String) -> bool {
1836 let mut seen = self.seen_event_ids.lock().await;
1837 seen.insert_if_new(event_id)
1838 }
1839
1840 async fn dispatch_signaling_message(
1841 &self,
1842 msg: SignalingMessage,
1843 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1844 ) {
1845 if !self.config.signaling_enabled {
1846 debug!(
1847 "Skipping signaling message {} because WebRTC signaling is disabled",
1848 msg.msg_type()
1849 );
1850 return;
1851 }
1852
1853 if relay_write_tx.send(msg.clone()).is_err() {
1854 debug!(
1855 "No relay subscribers for signaling message {}",
1856 msg.msg_type()
1857 );
1858 }
1859
1860 let event = match Self::create_signaling_event(&self.keys, &msg).await {
1861 Ok(event) => event,
1862 Err(e) => {
1863 debug!("Failed to create signaling event for mesh dispatch: {}", e);
1864 return;
1865 }
1866 };
1867
1868 for bus in &self.local_buses {
1869 if let Err(err) = bus.broadcast_event(&event).await {
1870 debug!(
1871 "Failed to broadcast signaling event over {} ({}): {}",
1872 bus.source_name(),
1873 msg.msg_type(),
1874 err
1875 );
1876 }
1877 }
1878
1879 let mut frame =
1880 MeshNostrFrame::new_event(event, &self.my_peer_id.to_string(), MESH_DEFAULT_HTL);
1881 if !self.mark_seen_frame_id(frame.frame_id.clone()).await {
1882 self.state.record_mesh_duplicate_drop();
1883 return;
1884 }
1885 if !self.mark_seen_event_id(frame.event().id.to_hex()).await {
1886 self.state.record_mesh_duplicate_drop();
1887 return;
1888 }
1889
1890 frame.sender_peer_id = self.my_peer_id.to_string();
1892 let forwarded = self.forward_mesh_frame(&frame, None).await;
1893 if forwarded > 0 {
1894 self.state.record_mesh_forwarded(forwarded as u64);
1895 }
1896 }
1897
1898 async fn forward_mesh_frame(
1899 &self,
1900 frame: &MeshNostrFrame,
1901 exclude_peer_id: Option<&str>,
1902 ) -> usize {
1903 let peers = self.state.peers.read().await;
1904 let peer_refs: Vec<_> = peers
1905 .values()
1906 .filter(|entry| entry.state == ConnectionState::Connected)
1907 .filter(|entry| {
1908 entry
1909 .peer
1910 .as_ref()
1911 .map(|peer| peer.is_ready())
1912 .unwrap_or(false)
1913 })
1914 .filter(|entry| {
1915 exclude_peer_id
1916 .map(|exclude| exclude != entry.peer_id.to_string())
1917 .unwrap_or(true)
1918 })
1919 .filter_map(|entry| {
1920 entry.peer.as_ref().map(|peer| {
1921 (
1922 entry.peer_id.to_string(),
1923 entry.peer_id.short(),
1924 peer.clone(),
1925 peer.htl_config(),
1926 )
1927 })
1928 })
1929 .collect();
1930 drop(peers);
1931
1932 let mut forwarded = 0usize;
1933 for (_peer_key, peer_short, peer, htl_cfg) in peer_refs {
1934 let next_htl = decrement_htl_with_policy(frame.htl, &MESH_EVENT_POLICY, &htl_cfg);
1935 if !should_forward_htl(next_htl) {
1936 continue;
1937 }
1938
1939 let mut outbound = frame.clone();
1940 outbound.htl = next_htl;
1941 if peer.send_mesh_frame_text(&outbound).await.is_ok() {
1942 forwarded += 1;
1943 } else {
1944 debug!("Failed to forward mesh frame to {}", peer_short);
1945 }
1946 }
1947
1948 forwarded
1949 }
1950
1951 async fn handle_mesh_frame(&self, from_peer_id: PeerId, frame: MeshNostrFrame) {
1952 if let Err(reason) = validate_mesh_frame(&frame) {
1953 debug!(
1954 "Ignoring mesh frame from {} (invalid: {})",
1955 from_peer_id.short(),
1956 reason
1957 );
1958 return;
1959 }
1960
1961 if !self.mark_seen_frame_id(frame.frame_id.clone()).await {
1962 self.state.record_mesh_duplicate_drop();
1963 return;
1964 }
1965
1966 let event = match &frame.payload {
1967 MeshNostrPayload::Event { event } => event.clone(),
1968 };
1969
1970 if !self.mark_seen_event_id(event.id.to_hex()).await {
1971 self.state.record_mesh_duplicate_drop();
1972 return;
1973 }
1974
1975 if event.verify().is_err() {
1976 debug!(
1977 "Ignoring mesh event from {} due to invalid signature",
1978 from_peer_id.short()
1979 );
1980 return;
1981 }
1982
1983 self.state.record_mesh_received();
1984
1985 if let Err(e) = self
1986 .handle_event("mesh", &event, self.shared_router.as_ref())
1987 .await
1988 {
1989 debug!(
1990 "Error handling mesh event from {}: {}",
1991 from_peer_id.short(),
1992 e
1993 );
1994 }
1995
1996 let forwarded = self
1997 .forward_mesh_frame(&frame, Some(&from_peer_id.to_string()))
1998 .await;
1999 if forwarded > 0 {
2000 self.state.record_mesh_forwarded(forwarded as u64);
2001 }
2002 }
2003
2004 async fn create_signaling_event(keys: &Keys, msg: &SignalingMessage) -> Result<nostr::Event> {
2010 encode_signaling_event(
2011 keys,
2012 msg.peer_id(),
2013 msg,
2014 Kind::Ephemeral(WEBRTC_KIND as u16),
2015 )
2016 .map_err(|e| anyhow::anyhow!(e.to_string()))
2017 }
2018
2019 async fn handle_event(
2025 &self,
2026 relay: &str,
2027 event: &nostr::Event,
2028 shared_router: Option<&Arc<SharedProductionRouter>>,
2029 ) -> Result<()> {
2030 if !self.config.signaling_enabled {
2031 return Ok(());
2032 }
2033
2034 let Some(shared_router) = shared_router else {
2035 return Ok(());
2036 };
2037
2038 let Some(msg) = decode_signaling_event(
2039 event,
2040 &self.my_peer_id.to_string(),
2041 &self.keys.public_key().to_hex(),
2042 &self.keys,
2043 ) else {
2044 return Ok(());
2045 };
2046
2047 if matches!(
2048 msg,
2049 SignalingMessage::Hello { .. } | SignalingMessage::Offer { .. }
2050 ) {
2051 let peers = self.state.peers.read().await;
2052 if !self.can_track_local_bus_peer(relay, msg.peer_id(), &peers) {
2053 return Ok(());
2054 }
2055 }
2056
2057 debug!(
2058 "Received {} from {} via {}",
2059 msg.msg_type(),
2060 msg.peer_id(),
2061 relay
2062 );
2063 let peer_id = msg.peer_id().to_string();
2064 shared_router
2065 .handle_message(msg)
2066 .await
2067 .map_err(|e| anyhow::anyhow!(e.to_string()))?;
2068 remember_peer_signal_path(self.state.as_ref(), &peer_id, relay).await;
2069
2070 Ok(())
2071 }
2072
2073 async fn handle_peer_state_event(
2075 &self,
2076 event: PeerStateEvent,
2077 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
2078 ) {
2079 match event {
2080 PeerStateEvent::Connected(peer_id) => {
2081 let peer_key = peer_id.to_string();
2082 let mut emit_hello = false;
2083 let mut peers = self.state.peers.write().await;
2084 if let Some(entry) = peers.get_mut(&peer_key) {
2085 if entry.state != ConnectionState::Connected {
2086 info!("Peer {} connected (via state event)", peer_id.short());
2087 entry.state = ConnectionState::Connected;
2088 emit_hello = true;
2089 self.state
2091 .connected_count
2092 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2093 }
2094 }
2095 drop(peers);
2096 if emit_hello {
2097 if let Some(shared_router) = self.shared_router.as_ref() {
2098 let _ = shared_router.send_hello(Vec::new()).await;
2099 } else {
2100 self.dispatch_signaling_message(self.local_hello_message(), relay_write_tx)
2101 .await;
2102 }
2103 }
2104 }
2105 PeerStateEvent::Failed(peer_id) => {
2106 let peer_key = peer_id.to_string();
2107 info!(
2108 "Peer {} connection failed - removing from pool",
2109 peer_id.short()
2110 );
2111 let removed = {
2112 let mut peers = self.state.peers.write().await;
2113 peers.remove(&peer_key)
2114 };
2115 if let Some(entry) = removed {
2116 if entry.state == ConnectionState::Connected {
2118 self.state
2119 .connected_count
2120 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
2121 }
2122 if let Some(peer) = entry.peer {
2124 let _ = peer.close().await;
2125 }
2126 }
2127 if let Some(shared_router) = self.shared_router.as_ref() {
2128 if let Some(channel) = shared_router.remove_peer(&peer_key).await {
2129 channel.close().await;
2130 }
2131 }
2132 }
2133 PeerStateEvent::Disconnected(peer_id) => {
2134 let peer_key = peer_id.to_string();
2135 info!("Peer {} disconnected - removing from pool", peer_id.short());
2136 let removed = {
2137 let mut peers = self.state.peers.write().await;
2138 peers.remove(&peer_key)
2139 };
2140 if let Some(entry) = removed {
2141 if entry.state == ConnectionState::Connected {
2143 self.state
2144 .connected_count
2145 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
2146 }
2147 if let Some(peer) = entry.peer {
2149 let _ = peer.close().await;
2150 }
2151 }
2152 if let Some(shared_router) = self.shared_router.as_ref() {
2153 if let Some(channel) = shared_router.remove_peer(&peer_key).await {
2154 channel.close().await;
2155 }
2156 }
2157 }
2158 }
2159 }
2160
2161 async fn cleanup_stale_peers(&self) {
2163 let mut peers = self.state.peers.write().await;
2164 let mut connected_count = 0;
2165 let mut to_remove = Vec::new();
2166 let stale_timeout = Duration::from_secs(60); for (key, entry) in peers.iter_mut() {
2169 if let Some(ref peer) = entry.peer {
2170 if peer.is_connected() {
2172 if entry.state != ConnectionState::Connected {
2173 info!(
2174 "Peer {} is now connected (sync fallback)",
2175 entry.peer_id.short()
2176 );
2177 entry.state = ConnectionState::Connected;
2178 }
2179 connected_count += 1;
2180 } else if entry.state == ConnectionState::Connected {
2181 info!(
2182 "Removing disconnected peer {} after transport closed",
2183 entry.peer_id.short()
2184 );
2185 to_remove.push(key.clone());
2186 } else if entry.state == ConnectionState::Connecting
2187 && entry.last_seen.elapsed() > stale_timeout
2188 {
2189 info!(
2191 "Removing stale peer {} (stuck in Connecting for {:?})",
2192 entry.peer_id.short(),
2193 entry.last_seen.elapsed()
2194 );
2195 to_remove.push(key.clone());
2196 }
2197 } else if entry.state == ConnectionState::Discovered
2198 && entry.last_seen.elapsed() > stale_timeout
2199 {
2200 debug!("Removing stale discovered peer {}", entry.peer_id.short());
2202 to_remove.push(key.clone());
2203 }
2204 }
2205
2206 let mut removed_peers = Vec::new();
2208 for key in to_remove {
2209 if let Some(entry) = peers.remove(&key) {
2210 removed_peers.push(entry);
2211 }
2212 }
2213 drop(peers);
2214
2215 for entry in removed_peers {
2216 if let Some(peer) = entry.peer {
2217 let _ = peer.close().await;
2218 }
2219 }
2220
2221 self.state
2222 .connected_count
2223 .store(connected_count, std::sync::atomic::Ordering::Relaxed);
2224 }
2225}
2226
2227pub type PeerRouter = WebRTCManager;
2228
2229#[allow(dead_code)]
2231#[derive(Debug, Clone)]
2232pub struct PeerState {
2233 pub peer_id: PeerId,
2234 pub direction: PeerDirection,
2235 pub state: String,
2236 pub last_seen: Instant,
2237}
2238
2239#[cfg(test)]
2240mod tests {
2241 use super::*;
2242 use crate::webrtc::root_events::PeerRootEvent;
2243 use crate::webrtc::session::TestMeshPeer;
2244 use crate::webrtc::SelectionStrategy;
2245 use anyhow::Result as AnyResult;
2246 use async_trait::async_trait;
2247 use hashtree_network::{build_hedged_wave_plan, normalize_dispatch_config};
2248 use nostr::{EventBuilder, Keys, Tag};
2249 use std::time::Duration;
2250
2251 struct TestLocalBus {
2252 source: &'static str,
2253 root: Option<PeerRootEvent>,
2254 }
2255
2256 #[async_trait]
2257 impl super::super::LocalNostrBus for TestLocalBus {
2258 fn source_name(&self) -> &'static str {
2259 self.source
2260 }
2261
2262 async fn broadcast_event(&self, _event: &nostr::Event) -> AnyResult<()> {
2263 Ok(())
2264 }
2265
2266 async fn query_root(
2267 &self,
2268 _owner_pubkey: &str,
2269 _tree_name: &str,
2270 _timeout: Duration,
2271 ) -> Option<PeerRootEvent> {
2272 self.root.clone()
2273 }
2274 }
2275
2276 #[test]
2277 fn root_event_from_peer_extracts_tags() {
2278 let keys = Keys::generate();
2279 let hash = "ab".repeat(32);
2280 let event = EventBuilder::new(
2281 Kind::Custom(super::super::root_events::HASHTREE_KIND),
2282 "",
2283 [
2284 Tag::parse(&["d", "repo"]).unwrap(),
2285 Tag::parse(&["l", super::super::root_events::HASHTREE_LABEL]).unwrap(),
2286 Tag::parse(&["hash", &hash]).unwrap(),
2287 Tag::parse(&["encryptedKey", &"11".repeat(32)]).unwrap(),
2288 ],
2289 )
2290 .to_event(&keys)
2291 .unwrap();
2292
2293 let parsed = root_event_from_peer(&event, "peer-a", "repo").unwrap();
2294 let expected_encrypted = "11".repeat(32);
2295 assert_eq!(parsed.hash, hash);
2296 assert_eq!(parsed.peer_id, "peer-a");
2297 assert_eq!(
2298 parsed.encrypted_key.as_deref(),
2299 Some(expected_encrypted.as_str())
2300 );
2301 assert!(parsed.key.is_none());
2302 }
2303
2304 #[test]
2305 fn pick_latest_event_prefers_higher_event_id_on_timestamp_tie() {
2306 let keys = Keys::generate();
2307 let created_at = nostr::Timestamp::from_secs(1_700_000_000);
2308 let event_a = EventBuilder::new(
2309 Kind::Custom(super::super::root_events::HASHTREE_KIND),
2310 "",
2311 [],
2312 )
2313 .custom_created_at(created_at)
2314 .to_event(&keys)
2315 .unwrap();
2316 let event_b = EventBuilder::new(
2317 Kind::Custom(super::super::root_events::HASHTREE_KIND),
2318 "",
2319 [],
2320 )
2321 .custom_created_at(created_at)
2322 .to_event(&keys)
2323 .unwrap();
2324
2325 let expected = if event_a.id > event_b.id {
2326 event_a.id
2327 } else {
2328 event_b.id
2329 };
2330 let picked = pick_latest_event([&event_a, &event_b]).unwrap();
2331 assert_eq!(picked.id, expected);
2332 }
2333
2334 #[tokio::test]
2335 async fn resolve_root_from_local_buses_returns_source_and_first_match() {
2336 let state = WebRTCState::new();
2337 let root = PeerRootEvent {
2338 hash: "ab".repeat(32),
2339 key: None,
2340 encrypted_key: None,
2341 self_encrypted_key: None,
2342 event_id: "event-1".to_string(),
2343 created_at: 1,
2344 peer_id: "bus-peer".to_string(),
2345 };
2346
2347 state
2348 .set_local_buses(vec![
2349 Arc::new(TestLocalBus {
2350 source: "empty",
2351 root: None,
2352 }),
2353 Arc::new(TestLocalBus {
2354 source: "mock-bus",
2355 root: Some(root.clone()),
2356 }),
2357 ])
2358 .await;
2359
2360 let resolved = state
2361 .resolve_root_from_local_buses_with_source("owner", "tree", Duration::from_millis(10))
2362 .await
2363 .expect("expected root from local bus");
2364
2365 assert_eq!(resolved.0, "mock-bus");
2366 assert_eq!(resolved.1.hash, root.hash);
2367 assert_eq!(resolved.1.peer_id, root.peer_id);
2368 }
2369
2370 #[tokio::test]
2371 async fn can_track_local_bus_peer_enforces_wifi_aware_limit() {
2372 let keys = Keys::generate();
2373 let mut config = WebRTCConfig::default();
2374 config.wifi_aware.enabled = true;
2375 config.wifi_aware.max_peers = 1;
2376 let manager = WebRTCManager::new(keys, config);
2377 let existing_peer = PeerId::new("peer-a".to_string(), Some("sess-a".to_string()));
2378 let existing_key = existing_peer.to_string();
2379 let mut peers = HashMap::new();
2380 peers.insert(
2381 existing_key.clone(),
2382 PeerEntry {
2383 peer_id: existing_peer,
2384 direction: PeerDirection::Outbound,
2385 state: ConnectionState::Discovered,
2386 last_seen: Instant::now(),
2387 peer: None,
2388 pool: PeerPool::Other,
2389 transport: PeerTransport::WebRtc,
2390 signal_paths: BTreeSet::from([PeerSignalPath::WifiAware]),
2391 bytes_sent: 0,
2392 bytes_received: 0,
2393 },
2394 );
2395
2396 assert!(manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, &existing_key, &peers,));
2397 assert!(!manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, "peer-b:sess-b", &peers,));
2398 assert!(manager.can_track_local_bus_peer("relay", "peer-c:sess-c", &peers));
2399 }
2400
2401 #[tokio::test]
2402 async fn request_from_peers_with_source_accepts_generic_mesh_peers() {
2403 let state = WebRTCState::new();
2404 let data = b"offline-over-ble".to_vec();
2405 let hash_hex = hex::encode(hashtree_core::sha256(&data));
2406
2407 state.peers.write().await.insert(
2408 "peer-a".to_string(),
2409 PeerEntry {
2410 peer_id: PeerId::new("peer-a-pub".to_string(), Some("session-a".to_string())),
2411 direction: PeerDirection::Outbound,
2412 state: ConnectionState::Connected,
2413 last_seen: Instant::now(),
2414 peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_response(Some(
2415 data.clone(),
2416 )))),
2417 pool: PeerPool::Other,
2418 transport: PeerTransport::Bluetooth,
2419 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2420 bytes_sent: 0,
2421 bytes_received: 0,
2422 },
2423 );
2424
2425 let resolved = state
2426 .request_from_peers_with_source(&hash_hex)
2427 .await
2428 .expect("expected mock mesh peer response");
2429
2430 assert_eq!(resolved.0, data);
2431 assert_eq!(resolved.1, "peer-a-pub:session-a");
2432 }
2433
2434 #[tokio::test]
2435 async fn request_from_peers_with_source_waits_full_timeout_for_last_generic_peer() {
2436 let state = WebRTCState::new_with_routing_and_cashu(
2437 SelectionStrategy::TitForTat,
2438 true,
2439 RequestDispatchConfig {
2440 initial_fanout: 1,
2441 hedge_fanout: 1,
2442 max_fanout: 1,
2443 hedge_interval_ms: 50,
2444 },
2445 Duration::from_millis(400),
2446 CashuRoutingConfig::default(),
2447 None,
2448 None,
2449 );
2450 let data = b"slow-offline-over-ble".to_vec();
2451 let hash_hex = hex::encode(hashtree_core::sha256(&data));
2452
2453 state.peers.write().await.insert(
2454 "peer-a".to_string(),
2455 PeerEntry {
2456 peer_id: PeerId::new("peer-a-pub".to_string(), Some("session-a".to_string())),
2457 direction: PeerDirection::Outbound,
2458 state: ConnectionState::Connected,
2459 last_seen: Instant::now(),
2460 peer: Some(MeshPeer::mock_for_tests(
2461 TestMeshPeer::with_delayed_response(
2462 Some(data.clone()),
2463 Duration::from_millis(200),
2464 ),
2465 )),
2466 pool: PeerPool::Other,
2467 transport: PeerTransport::Bluetooth,
2468 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2469 bytes_sent: 0,
2470 bytes_received: 0,
2471 },
2472 );
2473
2474 let resolved = state
2475 .request_from_peers_with_source(&hash_hex)
2476 .await
2477 .expect("expected delayed mock mesh peer response");
2478
2479 assert_eq!(resolved.0, data);
2480 assert_eq!(resolved.1, "peer-a-pub:session-a");
2481 }
2482
2483 #[tokio::test]
2484 async fn dispatch_signaling_message_is_noop_when_signaling_disabled() {
2485 let keys = Keys::generate();
2486 let mut config = WebRTCConfig::default();
2487 config.signaling_enabled = false;
2488 let manager = WebRTCManager::new(keys, config);
2489 let peer_id = PeerId::new("peer-a-pub".to_string(), Some("session-a".to_string()));
2490 let peer_key = peer_id.to_string();
2491 let peer = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
2492 let peer_ref = peer.mock_ref().expect("mock peer").clone();
2493
2494 manager.state.peers.write().await.insert(
2495 peer_key,
2496 PeerEntry {
2497 peer_id,
2498 direction: PeerDirection::Outbound,
2499 state: ConnectionState::Connected,
2500 last_seen: Instant::now(),
2501 peer: Some(peer),
2502 pool: PeerPool::Other,
2503 transport: PeerTransport::Bluetooth,
2504 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2505 bytes_sent: 0,
2506 bytes_received: 0,
2507 },
2508 );
2509
2510 let (relay_tx, _) = tokio::sync::broadcast::channel(4);
2511 manager
2512 .dispatch_signaling_message(
2513 SignalingMessage::Hello {
2514 peer_id: manager.my_peer_id.to_string(),
2515 roots: Vec::new(),
2516 },
2517 &relay_tx,
2518 )
2519 .await;
2520
2521 assert_eq!(peer_ref.sent_frame_count().await, 0);
2522 }
2523
2524 #[tokio::test]
2525 async fn failed_peer_cleanup_does_not_hold_peer_map_lock_while_closing() {
2526 let keys = Keys::generate();
2527 let manager = Arc::new(WebRTCManager::new(keys, WebRTCConfig::default()));
2528 let peer_id = PeerId::new("peer-a-pub".to_string(), Some("session-a".to_string()));
2529 let peer_key = peer_id.to_string();
2530
2531 manager.state.peers.write().await.insert(
2532 peer_key.clone(),
2533 PeerEntry {
2534 peer_id: peer_id.clone(),
2535 direction: PeerDirection::Outbound,
2536 state: ConnectionState::Connected,
2537 last_seen: Instant::now(),
2538 peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_close(
2539 Duration::from_millis(200),
2540 ))),
2541 pool: PeerPool::Other,
2542 transport: PeerTransport::Bluetooth,
2543 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2544 bytes_sent: 0,
2545 bytes_received: 0,
2546 },
2547 );
2548
2549 let (relay_tx, _) = tokio::sync::broadcast::channel(4);
2550 let manager_for_task = manager.clone();
2551 let peer_id_for_task = peer_id.clone();
2552 let cleanup_task = tokio::spawn(async move {
2553 manager_for_task
2554 .handle_peer_state_event(PeerStateEvent::Failed(peer_id_for_task), &relay_tx)
2555 .await;
2556 });
2557
2558 tokio::time::sleep(Duration::from_millis(20)).await;
2559
2560 let remaining = tokio::time::timeout(Duration::from_millis(50), async {
2561 manager.state.peers.read().await.len()
2562 })
2563 .await
2564 .expect("peer map read should not block on close");
2565
2566 assert_eq!(remaining, 0);
2567 cleanup_task.await.expect("cleanup task");
2568 }
2569
2570 #[tokio::test]
2571 async fn resolve_root_from_peers_does_not_hold_peer_map_lock_while_querying() {
2572 let keys = Keys::generate();
2573 let manager = Arc::new(WebRTCManager::new(keys.clone(), WebRTCConfig::default()));
2574 let owner_keys = Keys::generate();
2575 let owner_pubkey = owner_keys.public_key().to_hex();
2576 let tree_name = "video";
2577 let hash = "ab".repeat(32);
2578 let event = EventBuilder::new(
2579 Kind::Custom(super::super::root_events::HASHTREE_KIND),
2580 "",
2581 [
2582 Tag::parse(&["d", tree_name]).unwrap(),
2583 Tag::parse(&["l", super::super::root_events::HASHTREE_LABEL]).unwrap(),
2584 Tag::parse(&["hash", &hash]).unwrap(),
2585 ],
2586 )
2587 .to_event(&owner_keys)
2588 .unwrap();
2589
2590 let peer_id = PeerId::new("peer-a-pub".to_string(), Some("session-a".to_string()));
2591 let peer_key = peer_id.to_string();
2592
2593 manager.state.peers.write().await.insert(
2594 peer_key.clone(),
2595 PeerEntry {
2596 peer_id,
2597 direction: PeerDirection::Outbound,
2598 state: ConnectionState::Connected,
2599 last_seen: Instant::now(),
2600 peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_events(
2601 vec![event],
2602 Duration::from_millis(200),
2603 ))),
2604 pool: PeerPool::Other,
2605 transport: PeerTransport::Bluetooth,
2606 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2607 bytes_sent: 0,
2608 bytes_received: 0,
2609 },
2610 );
2611
2612 let manager_for_task = manager.clone();
2613 let owner_pubkey_for_task = owner_pubkey.clone();
2614 let resolve_task = tokio::spawn(async move {
2615 manager_for_task
2616 .state
2617 .resolve_root_from_peers(
2618 &owner_pubkey_for_task,
2619 tree_name,
2620 Duration::from_millis(500),
2621 )
2622 .await
2623 });
2624
2625 tokio::time::sleep(Duration::from_millis(20)).await;
2626
2627 let manager_for_writer = manager.clone();
2628 let peer_key_for_writer = peer_key.clone();
2629 let writer_task = tokio::spawn(async move {
2630 let mut peers = manager_for_writer.state.peers.write().await;
2631 if let Some(entry) = peers.get_mut(&peer_key_for_writer) {
2632 entry.bytes_received += 1;
2633 }
2634 });
2635
2636 tokio::time::sleep(Duration::from_millis(20)).await;
2637
2638 let status_count = tokio::time::timeout(Duration::from_millis(50), async {
2639 manager.state.peers.read().await.len()
2640 })
2641 .await
2642 .expect("peer map read should not block on root query");
2643
2644 assert_eq!(status_count, 1);
2645 assert!(resolve_task.await.expect("resolve task").is_some());
2646 writer_task.await.expect("writer task");
2647 }
2648
2649 #[test]
2650 fn test_formal_timed_seen_set_rejects_duplicates() {
2651 let mut seen = TimedSeenSet::new(4, Duration::from_secs(60));
2652 assert!(seen.insert_if_new("frame-1".to_string()));
2653 assert!(!seen.insert_if_new("frame-1".to_string()));
2654 assert!(seen.insert_if_new("frame-2".to_string()));
2655 }
2656
2657 #[test]
2658 fn test_formal_timed_seen_set_evicts_oldest_when_capacity_exceeded() {
2659 let mut seen = TimedSeenSet::new(2, Duration::from_secs(60));
2660 assert!(seen.insert_if_new("a".to_string()));
2661 assert!(seen.insert_if_new("b".to_string()));
2662 assert!(seen.insert_if_new("c".to_string()));
2663
2664 assert!(seen.insert_if_new("a".to_string()));
2666 assert!(!seen.insert_if_new("a".to_string()));
2667 }
2668
2669 #[test]
2670 fn test_request_dispatch_normalization_caps_to_available_peers() {
2671 let normalized = normalize_dispatch_config(
2672 RequestDispatchConfig {
2673 initial_fanout: 8,
2674 hedge_fanout: 6,
2675 max_fanout: 5,
2676 hedge_interval_ms: 120,
2677 },
2678 3,
2679 );
2680 assert_eq!(normalized.max_fanout, 3);
2681 assert_eq!(normalized.initial_fanout, 3);
2682 assert_eq!(normalized.hedge_fanout, 3);
2683 }
2684
2685 #[test]
2686 fn test_hedged_wave_plan_matches_dispatch_policy() {
2687 let plan = build_hedged_wave_plan(
2688 7,
2689 RequestDispatchConfig {
2690 initial_fanout: 2,
2691 hedge_fanout: 3,
2692 max_fanout: 6,
2693 hedge_interval_ms: 120,
2694 },
2695 );
2696 assert_eq!(plan, vec![2, 3, 1]);
2697 }
2698}