1use anyhow::Result;
13use async_trait::async_trait;
14use futures::{SinkExt, StreamExt};
15use hashtree_network::{
16 decode_signaling_event, encode_signaling_event, run_hedged_waves, sync_selector_peers,
17 ClassifyRequest as SharedClassifyRequest, HedgedWaveAction, IceCandidate as SharedIceCandidate,
18 MeshRouter, PeerLink as SharedPeerLink, PeerLinkFactory as SharedPeerLinkFactory, PeerSelector,
19 SignalingTransport as SharedSignalingTransport, TransportError as SharedTransportError,
20};
21use nostr::{ClientMessage, Filter, JsonUtil, Keys, Kind, RelayMessage};
22use std::collections::{BTreeSet, HashMap};
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25use tokio::sync::{mpsc, Mutex, RwLock};
26use tokio_tungstenite::{connect_async, tungstenite::Message};
27use tracing::{debug, error, info, warn};
28
29use super::bluetooth::{BluetoothMesh, BluetoothPeerRegistrar, BluetoothRuntimeContext};
30use super::cashu::{CashuMintMetadataStore, CashuQuoteState, CashuRoutingConfig, NegotiatedQuote};
31use super::local_bus::SharedLocalNostrBus;
32use super::multicast::MulticastNostrBus;
33use super::peer::{ContentStore, Peer, PendingRequest};
34use super::root_events::{
35 build_root_filter, hashtree_event_identifier, is_hashtree_labeled_event, pick_latest_event,
36 root_event_from_peer, PeerRootEvent,
37};
38use super::session::MeshPeer;
39use super::types::{
40 decrement_htl_with_policy, encode_quote_request, encode_request, should_forward_htl,
41 validate_mesh_frame, DataQuoteRequest, DataRequest, MeshNostrFrame, MeshNostrPayload,
42 PeerDirection, PeerId, PeerPool, PeerStateEvent, PeerStatus, RequestDispatchConfig,
43 SignalingMessage, TimedSeenSet, WebRTCConfig, HELLO_TAG, MESH_DEFAULT_HTL, MESH_EVENT_POLICY,
44 WEBRTC_KIND,
45};
46use super::wifi_aware::{mobile_wifi_aware_bridge, WifiAwareNostrBus, WIFI_AWARE_SOURCE};
47use crate::cashu_helper::CashuPaymentClient;
48use crate::nostr_relay::NostrRelay;
49
50pub type PeerClassifier = Arc<dyn Fn(&str) -> PeerPool + Send + Sync>;
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
55pub enum PeerTransport {
56 WebRtc,
57 Bluetooth,
58}
59
60impl PeerTransport {
61 pub const fn as_str(self) -> &'static str {
62 match self {
63 PeerTransport::WebRtc => "webrtc",
64 PeerTransport::Bluetooth => "bluetooth",
65 }
66 }
67}
68
69impl std::fmt::Display for PeerTransport {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.write_str((*self).as_str())
72 }
73}
74
75fn bluetooth_nostr_only_mode() -> bool {
76 matches!(
77 std::env::var("HTREE_BLUETOOTH_NOSTR_ONLY").ok().as_deref(),
78 Some("1" | "true" | "TRUE" | "yes" | "YES")
79 )
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
84pub enum PeerSignalPath {
85 Relay,
86 Multicast,
87 WifiAware,
88 Bluetooth,
89}
90
91impl PeerSignalPath {
92 pub const fn as_str(self) -> &'static str {
93 match self {
94 PeerSignalPath::Relay => "relay",
95 PeerSignalPath::Multicast => "multicast",
96 PeerSignalPath::WifiAware => WIFI_AWARE_SOURCE,
97 PeerSignalPath::Bluetooth => "bluetooth",
98 }
99 }
100
101 pub fn from_source_name(source: &str) -> Self {
102 match source {
103 "multicast" => PeerSignalPath::Multicast,
104 WIFI_AWARE_SOURCE => PeerSignalPath::WifiAware,
105 "bluetooth" => PeerSignalPath::Bluetooth,
106 _ => PeerSignalPath::Relay,
107 }
108 }
109}
110
111impl std::fmt::Display for PeerSignalPath {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 f.write_str((*self).as_str())
114 }
115}
116
117#[derive(Debug, Clone, PartialEq)]
119pub enum ConnectionState {
120 Discovered,
121 Connecting,
122 Connected,
123 Failed,
124}
125
126impl std::fmt::Display for ConnectionState {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 match self {
129 ConnectionState::Discovered => write!(f, "discovered"),
130 ConnectionState::Connecting => write!(f, "connecting"),
131 ConnectionState::Connected => write!(f, "connected"),
132 ConnectionState::Failed => write!(f, "failed"),
133 }
134 }
135}
136
137pub struct PeerEntry {
139 pub peer_id: PeerId,
140 pub direction: PeerDirection,
141 pub state: ConnectionState,
142 pub last_seen: Instant,
143 pub peer: Option<MeshPeer>,
144 pub pool: PeerPool,
145 pub transport: PeerTransport,
146 pub signal_paths: BTreeSet<PeerSignalPath>,
147 pub bytes_sent: u64,
148 pub bytes_received: u64,
149}
150
151pub struct WebRTCState {
153 pub peers: RwLock<HashMap<String, PeerEntry>>,
154 pub connected_count: std::sync::atomic::AtomicUsize,
155 pub bytes_sent: std::sync::atomic::AtomicU64,
157 pub bytes_received: std::sync::atomic::AtomicU64,
159 pub mesh_received: std::sync::atomic::AtomicU64,
161 pub mesh_forwarded: std::sync::atomic::AtomicU64,
163 pub mesh_dropped_duplicate: std::sync::atomic::AtomicU64,
165 peer_selector: Arc<RwLock<PeerSelector>>,
167 request_dispatch: RequestDispatchConfig,
169 request_timeout: Duration,
171 cashu_quotes: Arc<CashuQuoteState>,
173 local_buses: RwLock<Vec<SharedLocalNostrBus>>,
176}
177const SEEN_FRAME_CAP: usize = 4096;
178const SEEN_FRAME_TTL: Duration = Duration::from_secs(120);
179const SEEN_EVENT_CAP: usize = 8192;
180const SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
181
182type PendingRequestsMap = Arc<Mutex<HashMap<String, PendingRequest>>>;
183type ConnectedPeer = (
184 String,
185 PendingRequestsMap,
186 Arc<webrtc::data_channel::RTCDataChannel>,
187);
188type ConnectedSession = (String, MeshPeer, PeerTransport);
189type SharedProductionRouter = MeshRouter<RouterSignalingBridge, SharedRouterPeerFactory>;
190
191async fn remember_peer_signal_path(state: &WebRTCState, peer_id: &str, source: &str) {
192 if let Some(entry) = state.peers.write().await.get_mut(peer_id) {
193 entry
194 .signal_paths
195 .insert(PeerSignalPath::from_source_name(source));
196 }
197}
198
199#[derive(Clone)]
200struct RouterSignalingBridge {
201 peer_id: String,
202 signaling_tx: mpsc::Sender<SignalingMessage>,
203}
204
205impl RouterSignalingBridge {
206 fn new(peer_id: String, signaling_tx: mpsc::Sender<SignalingMessage>) -> Self {
207 Self {
208 peer_id,
209 signaling_tx,
210 }
211 }
212}
213
214#[async_trait]
215impl SharedSignalingTransport for RouterSignalingBridge {
216 async fn connect(&self, _relays: &[String]) -> Result<(), SharedTransportError> {
217 Ok(())
218 }
219
220 async fn disconnect(&self) {}
221
222 async fn publish(&self, msg: SignalingMessage) -> Result<(), SharedTransportError> {
223 self.signaling_tx
224 .send(msg)
225 .await
226 .map_err(|e| SharedTransportError::SendFailed(e.to_string()))
227 }
228
229 async fn recv(&self) -> Option<SignalingMessage> {
230 None
231 }
232
233 fn try_recv(&self) -> Option<SignalingMessage> {
234 None
235 }
236
237 fn peer_id(&self) -> &str {
238 &self.peer_id
239 }
240}
241
242struct SharedRouterPeerFactory {
243 my_peer_id: PeerId,
244 signaling_tx: mpsc::Sender<SignalingMessage>,
245 stun_servers: Vec<String>,
246 store: Option<Arc<dyn ContentStore>>,
247 state: Arc<WebRTCState>,
248 state_event_tx: mpsc::Sender<PeerStateEvent>,
249 nostr_relay: Option<Arc<NostrRelay>>,
250 mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
251 peer_classifier: PeerClassifier,
252 peers: RwLock<HashMap<String, Arc<Peer>>>,
253}
254
255impl SharedRouterPeerFactory {
256 fn new(
257 my_peer_id: PeerId,
258 signaling_tx: mpsc::Sender<SignalingMessage>,
259 stun_servers: Vec<String>,
260 store: Option<Arc<dyn ContentStore>>,
261 state: Arc<WebRTCState>,
262 state_event_tx: mpsc::Sender<PeerStateEvent>,
263 nostr_relay: Option<Arc<NostrRelay>>,
264 mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
265 peer_classifier: PeerClassifier,
266 ) -> Self {
267 Self {
268 my_peer_id,
269 signaling_tx,
270 stun_servers,
271 store,
272 state,
273 state_event_tx,
274 nostr_relay,
275 mesh_frame_tx,
276 peer_classifier,
277 peers: RwLock::new(HashMap::new()),
278 }
279 }
280
281 async fn register_peer(&self, peer_id: PeerId, direction: PeerDirection, peer: Arc<Peer>) {
282 let peer_key = peer_id.to_string();
283 let pool = (self.peer_classifier)(&peer_id.pubkey);
284 self.peers
285 .write()
286 .await
287 .insert(peer_key.clone(), peer.clone());
288
289 let mut peers = self.state.peers.write().await;
290 peers.insert(
291 peer_key,
292 PeerEntry {
293 peer_id,
294 direction,
295 state: ConnectionState::Connecting,
296 last_seen: Instant::now(),
297 peer: Some(MeshPeer::WebRtc(peer)),
298 pool,
299 transport: PeerTransport::WebRtc,
300 signal_paths: BTreeSet::from([PeerSignalPath::Relay]),
301 bytes_sent: 0,
302 bytes_received: 0,
303 },
304 );
305 }
306
307 async fn create_peer(
308 &self,
309 peer_id: PeerId,
310 direction: PeerDirection,
311 ) -> Result<Peer, SharedTransportError> {
312 Peer::new_with_store_and_events(
313 peer_id,
314 direction,
315 self.my_peer_id.clone(),
316 self.signaling_tx.clone(),
317 self.stun_servers.clone(),
318 self.store.clone(),
319 Some(self.state_event_tx.clone()),
320 self.nostr_relay.clone(),
321 Some(self.mesh_frame_tx.clone()),
322 Some(self.state.cashu_quotes.clone()),
323 )
324 .await
325 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))
326 }
327}
328
329#[async_trait]
330impl SharedPeerLinkFactory for SharedRouterPeerFactory {
331 async fn create_offer(
332 &self,
333 target_peer_id: &str,
334 ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
335 let target_peer = PeerId::from_string(target_peer_id).ok_or_else(|| {
336 SharedTransportError::ConnectionFailed(format!("invalid peer id {target_peer_id}"))
337 })?;
338 let peer = Arc::new(
339 self.create_peer(target_peer.clone(), PeerDirection::Outbound)
340 .await?,
341 );
342 peer.setup_handlers()
343 .await
344 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
345 let offer = peer
346 .connect()
347 .await
348 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
349 let sdp = offer
350 .get("sdp")
351 .and_then(|value| value.as_str())
352 .ok_or_else(|| {
353 SharedTransportError::ConnectionFailed("missing SDP in CLI peer offer".to_string())
354 })?
355 .to_string();
356 self.register_peer(target_peer, PeerDirection::Outbound, peer.clone())
357 .await;
358 Ok((peer as Arc<dyn SharedPeerLink>, sdp))
359 }
360
361 async fn accept_offer(
362 &self,
363 from_peer_id: &str,
364 offer_sdp: &str,
365 ) -> Result<(Arc<dyn SharedPeerLink>, String), SharedTransportError> {
366 let from_peer = PeerId::from_string(from_peer_id).ok_or_else(|| {
367 SharedTransportError::ConnectionFailed(format!("invalid peer id {from_peer_id}"))
368 })?;
369 let peer = Arc::new(
370 self.create_peer(from_peer.clone(), PeerDirection::Inbound)
371 .await?,
372 );
373 peer.setup_handlers()
374 .await
375 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
376 let answer = peer
377 .handle_offer(serde_json::json!({ "type": "offer", "sdp": offer_sdp }))
378 .await
379 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
380 let sdp = answer
381 .get("sdp")
382 .and_then(|value| value.as_str())
383 .ok_or_else(|| {
384 SharedTransportError::ConnectionFailed("missing SDP in CLI peer answer".to_string())
385 })?
386 .to_string();
387 self.register_peer(from_peer, PeerDirection::Inbound, peer.clone())
388 .await;
389 Ok((peer as Arc<dyn SharedPeerLink>, sdp))
390 }
391
392 async fn handle_answer(
393 &self,
394 target_peer_id: &str,
395 answer_sdp: &str,
396 ) -> Result<Arc<dyn SharedPeerLink>, SharedTransportError> {
397 let peer = self
398 .peers
399 .read()
400 .await
401 .get(target_peer_id)
402 .cloned()
403 .ok_or_else(|| {
404 SharedTransportError::ConnectionFailed(format!(
405 "missing outbound peer for {target_peer_id}"
406 ))
407 })?;
408 peer.handle_answer(serde_json::json!({ "type": "answer", "sdp": answer_sdp }))
409 .await
410 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
411 Ok(peer as Arc<dyn SharedPeerLink>)
412 }
413
414 async fn handle_candidate(
415 &self,
416 peer_id: &str,
417 candidate: SharedIceCandidate,
418 ) -> Result<(), SharedTransportError> {
419 let peer = self.peers.read().await.get(peer_id).cloned();
420 if let Some(peer) = peer {
421 peer.handle_candidate(serde_json::json!({
422 "candidate": candidate.candidate,
423 "sdpMLineIndex": candidate.sdp_m_line_index,
424 "sdpMid": candidate.sdp_mid,
425 }))
426 .await
427 .map_err(|e| SharedTransportError::ConnectionFailed(e.to_string()))?;
428 }
429 Ok(())
430 }
431
432 async fn remove_peer(&self, peer_id: &str) -> Result<(), SharedTransportError> {
433 self.peers.write().await.remove(peer_id);
434 Ok(())
435 }
436}
437
438impl WebRTCState {
439 pub fn new() -> Self {
440 let cfg = WebRTCConfig::default();
441 Self::new_with_routing_and_cashu(
442 cfg.request_selection_strategy,
443 cfg.request_fairness_enabled,
444 cfg.request_dispatch,
445 Duration::from_millis(cfg.message_timeout_ms),
446 CashuRoutingConfig::default(),
447 None,
448 None,
449 )
450 }
451
452 pub fn new_with_routing(
453 selection_strategy: super::types::SelectionStrategy,
454 fairness_enabled: bool,
455 request_dispatch: RequestDispatchConfig,
456 ) -> Self {
457 let cfg = WebRTCConfig::default();
458 Self::new_with_routing_and_cashu(
459 selection_strategy,
460 fairness_enabled,
461 request_dispatch,
462 Duration::from_millis(cfg.message_timeout_ms),
463 CashuRoutingConfig::default(),
464 None,
465 None,
466 )
467 }
468
469 pub fn new_with_routing_and_cashu(
470 selection_strategy: super::types::SelectionStrategy,
471 fairness_enabled: bool,
472 request_dispatch: RequestDispatchConfig,
473 request_timeout: Duration,
474 cashu_routing: CashuRoutingConfig,
475 payment_client: Option<Arc<dyn CashuPaymentClient>>,
476 mint_metadata: Option<Arc<CashuMintMetadataStore>>,
477 ) -> Self {
478 let mut selector = PeerSelector::with_strategy(selection_strategy);
479 selector.set_fairness(fairness_enabled);
480 let peer_selector = Arc::new(RwLock::new(selector));
481 let cashu_quotes = Arc::new(if let Some(mint_metadata) = mint_metadata {
482 CashuQuoteState::new_with_mint_metadata(
483 cashu_routing,
484 peer_selector.clone(),
485 payment_client,
486 mint_metadata,
487 )
488 } else {
489 CashuQuoteState::new(cashu_routing, peer_selector.clone(), payment_client)
490 });
491 Self {
492 peers: RwLock::new(HashMap::new()),
493 connected_count: std::sync::atomic::AtomicUsize::new(0),
494 bytes_sent: std::sync::atomic::AtomicU64::new(0),
495 bytes_received: std::sync::atomic::AtomicU64::new(0),
496 mesh_received: std::sync::atomic::AtomicU64::new(0),
497 mesh_forwarded: std::sync::atomic::AtomicU64::new(0),
498 mesh_dropped_duplicate: std::sync::atomic::AtomicU64::new(0),
499 peer_selector,
500 request_dispatch,
501 request_timeout,
502 cashu_quotes,
503 local_buses: RwLock::new(Vec::new()),
504 }
505 }
506
507 pub async fn set_local_buses(&self, buses: Vec<SharedLocalNostrBus>) {
508 *self.local_buses.write().await = buses;
509 }
510
511 pub async fn add_local_bus(&self, bus: SharedLocalNostrBus) {
512 self.local_buses.write().await.push(bus);
513 }
514
515 pub async fn set_multicast_bus(&self, bus: Option<Arc<MulticastNostrBus>>) {
516 let buses = bus
517 .into_iter()
518 .map(|bus| bus as SharedLocalNostrBus)
519 .collect();
520 self.set_local_buses(buses).await;
521 }
522
523 pub async fn reset_runtime_state(&self) {
526 self.set_local_buses(Vec::new()).await;
527 let peers = {
528 let mut peers = self.peers.write().await;
529 std::mem::take(&mut *peers)
530 };
531 self.connected_count
532 .store(0, std::sync::atomic::Ordering::Relaxed);
533 for entry in peers.into_values() {
534 if let Some(peer) = entry.peer {
535 let _ = peer.close().await;
536 }
537 }
538 }
539
540 pub fn get_bandwidth(&self) -> (u64, u64) {
542 (
543 self.bytes_sent.load(std::sync::atomic::Ordering::Relaxed),
544 self.bytes_received
545 .load(std::sync::atomic::Ordering::Relaxed),
546 )
547 }
548
549 pub fn get_mesh_stats(&self) -> (u64, u64, u64) {
550 (
551 self.mesh_received
552 .load(std::sync::atomic::Ordering::Relaxed),
553 self.mesh_forwarded
554 .load(std::sync::atomic::Ordering::Relaxed),
555 self.mesh_dropped_duplicate
556 .load(std::sync::atomic::Ordering::Relaxed),
557 )
558 }
559
560 pub fn record_mesh_received(&self) {
561 self.mesh_received
562 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
563 }
564
565 pub fn record_mesh_forwarded(&self, count: u64) {
566 self.mesh_forwarded
567 .fetch_add(count, std::sync::atomic::Ordering::Relaxed);
568 }
569
570 pub fn record_mesh_duplicate_drop(&self) {
571 self.mesh_dropped_duplicate
572 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
573 }
574
575 pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
577 self.bytes_sent
578 .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
579 if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
580 entry.bytes_sent += bytes;
581 }
582 }
583
584 pub async fn record_received(&self, peer_id: &str, bytes: u64) {
586 self.bytes_received
587 .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
588 if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
589 entry.bytes_received += bytes;
590 }
591 }
592
593 pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
597 self.request_from_peers_with_source(hash_hex)
598 .await
599 .map(|(data, _peer_id)| data)
600 }
601
602 pub async fn request_from_peers_with_source(
604 &self,
605 hash_hex: &str,
606 ) -> Option<(Vec<u8>, String)> {
607 use super::types::BLOB_REQUEST_POLICY;
608
609 let peers = self.peers.read().await;
610
611 let peer_refs: Vec<_> = peers
612 .values()
613 .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
614 .filter_map(|p| {
615 p.peer
616 .clone()
617 .map(|peer| (p.peer_id.to_string(), peer, p.transport))
618 })
619 .collect();
620
621 drop(peers); let mut connected_peers: Vec<ConnectedPeer> = Vec::new();
624 let mut connected_sessions: Vec<ConnectedSession> = Vec::new();
625 for (peer_id, peer, transport) in peer_refs {
626 if !peer.is_ready() {
627 continue;
628 }
629 if bluetooth_nostr_only_mode() && transport == PeerTransport::Bluetooth {
630 continue;
631 }
632 if let Some(webrtc_peer) = peer.as_webrtc() {
633 let dc_guard = webrtc_peer.data_channel.lock().await;
634 if let Some(dc) = dc_guard.as_ref() {
635 connected_peers.push((
636 peer_id.clone(),
637 webrtc_peer.pending_requests.clone(),
638 dc.clone(),
639 ));
640 }
641 }
642 connected_sessions.push((peer_id, peer, transport));
643 }
644
645 if connected_sessions.is_empty() {
646 debug!(
647 "No connected peers to query for {}",
648 &hash_hex[..8.min(hash_hex.len())]
649 );
650 return None;
651 }
652
653 let hash_bytes = match hex::decode(hash_hex) {
655 Ok(b) => b,
656 Err(_) => return None,
657 };
658
659 let expected_hash: [u8; 32] = match hash_bytes.as_slice().try_into() {
660 Ok(h) => h,
661 Err(_) => {
662 debug!(
663 "Invalid hash length {}, expected 32 bytes",
664 hash_bytes.len()
665 );
666 return None;
667 }
668 };
669
670 let connected_peer_ids: Vec<String> = connected_sessions
671 .iter()
672 .map(|(peer_id, _, _)| peer_id.clone())
673 .collect();
674 sync_selector_peers(self.peer_selector.as_ref(), &connected_peer_ids).await;
675
676 let ordered_peer_ids = self.peer_selector.write().await.select_peers();
677 let mut quote_by_peer: HashMap<
678 String,
679 (
680 PendingRequestsMap,
681 Arc<webrtc::data_channel::RTCDataChannel>,
682 ),
683 > = connected_peers
684 .iter()
685 .cloned()
686 .map(|(peer_id, pending, dc)| (peer_id, (pending, dc)))
687 .collect();
688 let mut ordered_quote_peers: Vec<ConnectedPeer> = Vec::new();
689 for peer_id in &ordered_peer_ids {
690 if let Some((pending, dc)) = quote_by_peer.remove(peer_id) {
691 ordered_quote_peers.push((peer_id.clone(), pending, dc));
692 }
693 }
694 for (peer_id, (pending, dc)) in quote_by_peer {
695 ordered_quote_peers.push((peer_id, pending, dc));
696 }
697
698 let mut by_peer: HashMap<String, (MeshPeer, PeerTransport)> = connected_sessions
699 .into_iter()
700 .map(|(peer_id, peer, transport)| (peer_id, (peer, transport)))
701 .collect();
702
703 let mut ordered_peers: Vec<ConnectedSession> = Vec::new();
704 for peer_id in ordered_peer_ids {
705 if let Some((peer, transport)) = by_peer.remove(&peer_id) {
706 ordered_peers.push((peer_id, peer, transport));
707 }
708 }
709 for (peer_id, (peer, transport)) in by_peer {
710 ordered_peers.push((peer_id, peer, transport));
711 }
712
713 debug!(
714 "Querying {} peers for {} with shared hedged scheduler",
715 ordered_peers.len(),
716 &hash_hex[..8.min(hash_hex.len())],
717 );
718
719 if let Some((requested_mint, payment_sat, quote_ttl_ms)) =
720 self.cashu_quotes.requester_quote_terms().await
721 {
722 if let Some(quote) = self
723 .request_quote_from_peers(
724 &hash_bytes,
725 requested_mint,
726 payment_sat,
727 quote_ttl_ms,
728 &ordered_quote_peers,
729 )
730 .await
731 {
732 if let Some(data) = self
733 .request_from_single_peer(
734 hash_hex,
735 &hash_bytes,
736 expected_hash,
737 "e.peer_id,
738 Some("e),
739 &ordered_quote_peers,
740 )
741 .await
742 {
743 debug!(
744 "Got quoted response from peer {} for {}",
745 quote.peer_id,
746 &hash_hex[..8.min(hash_hex.len())]
747 );
748 return Some((data, quote.peer_id));
749 }
750 }
751 }
752
753 let request = DataRequest {
754 h: hash_bytes.clone(),
755 htl: BLOB_REQUEST_POLICY.max_htl,
756 q: None,
757 };
758 let wire = match encode_request(&request) {
759 Ok(w) => w,
760 Err(_) => return None,
761 };
762 let wire_len = wire.len() as u64;
763 let current_result_rx = Arc::new(Mutex::new(None));
764 if let Some((data, peer_id)) = run_hedged_waves(
765 ordered_peers.len(),
766 self.request_dispatch,
767 self.request_timeout,
768 |range| {
769 let wave_peers = ordered_peers[range].to_vec();
770 let (result_tx, result_rx) =
771 mpsc::channel::<(String, Instant, Result<Option<Vec<u8>>>)>(wave_peers.len());
772 let current_result_rx = current_result_rx.clone();
773 let hash_hex = hash_hex.to_string();
774 async move {
775 *current_result_rx.lock().await = Some(result_rx);
776 let sent = wave_peers.len();
777 for (peer_id, peer, transport) in wave_peers {
778 if transport != PeerTransport::Bluetooth {
779 self.record_sent(&peer_id, wire_len).await;
780 }
781 self.peer_selector
782 .write()
783 .await
784 .record_request(&peer_id, wire_len);
785
786 let result_tx = result_tx.clone();
787 let peer_id_for_task = peer_id.clone();
788 let peer = peer.clone();
789 let hash_hex = hash_hex.clone();
790 let per_request_timeout = self.request_timeout;
791 tokio::spawn(async move {
792 let started = Instant::now();
793 let result = peer.request(&hash_hex, per_request_timeout).await;
794 let _ = result_tx.send((peer_id_for_task, started, result)).await;
795 });
796 }
797 drop(result_tx);
798 sent
799 }
800 },
801 |wait| {
802 let current_result_rx = current_result_rx.clone();
803 async move {
804 let mut current_result_rx = current_result_rx.lock().await;
805 let Some(result_rx) = current_result_rx.as_mut() else {
806 return HedgedWaveAction::Abort;
807 };
808 let deadline = Instant::now() + wait;
809 loop {
810 let now = Instant::now();
811 if now >= deadline {
812 return HedgedWaveAction::Continue;
813 }
814 let remaining = deadline.saturating_duration_since(now);
815 match tokio::time::timeout(remaining, result_rx.recv()).await {
816 Ok(Some((peer_id, started, Ok(Some(data))))) => {
817 let rtt_ms = started.elapsed().as_millis() as u64;
818 if hashtree_core::sha256(&data) == expected_hash {
819 let should_record = {
820 let peers = self.peers.read().await;
821 peers
822 .get(&peer_id)
823 .map(|entry| {
824 entry.transport != PeerTransport::Bluetooth
825 })
826 .unwrap_or(true)
827 };
828 if should_record {
829 self.record_received(&peer_id, data.len() as u64).await;
830 }
831 self.peer_selector.write().await.record_success(
832 &peer_id,
833 rtt_ms,
834 data.len() as u64,
835 );
836 return HedgedWaveAction::Success((data, peer_id));
837 }
838 self.peer_selector.write().await.record_failure(&peer_id);
839 }
840 Ok(Some((peer_id, _, Ok(None)))) | Ok(Some((peer_id, _, Err(_)))) => {
841 self.peer_selector.write().await.record_timeout(&peer_id);
842 }
843 Ok(None) | Err(_) => return HedgedWaveAction::Continue,
844 }
845 }
846 }
847 },
848 )
849 .await
850 {
851 debug!(
852 "Got response from peer {} for {}",
853 peer_id,
854 &hash_hex[..8.min(hash_hex.len())]
855 );
856 return Some((data, peer_id));
857 }
858
859 debug!(
860 "No peer had data for {}",
861 &hash_hex[..8.min(hash_hex.len())]
862 );
863 None
864 }
865
866 async fn request_quote_from_peers(
867 &self,
868 hash_bytes: &[u8],
869 requested_mint: String,
870 payment_sat: u64,
871 quote_ttl_ms: u32,
872 ordered_peers: &[ConnectedPeer],
873 ) -> Option<NegotiatedQuote> {
874 if ordered_peers.is_empty() || quote_ttl_ms == 0 {
875 return None;
876 }
877
878 let hash_hex = hex::encode(hash_bytes);
879 let rx = self
880 .cashu_quotes
881 .register_pending_quote(hash_hex.clone(), Some(requested_mint.clone()), payment_sat)
882 .await;
883 let quote_request = DataQuoteRequest {
884 h: hash_bytes.to_vec(),
885 p: payment_sat,
886 t: quote_ttl_ms,
887 m: Some(requested_mint),
888 };
889 let wire = match encode_quote_request("e_request) {
890 Ok(wire) => wire,
891 Err(_) => {
892 self.cashu_quotes.clear_pending_quote(&hash_hex).await;
893 return None;
894 }
895 };
896 let rx = Arc::new(Mutex::new(rx));
897 let result = run_hedged_waves(
898 ordered_peers.len(),
899 self.request_dispatch,
900 self.request_timeout,
901 |range| {
902 let wave_peers = ordered_peers[range].to_vec();
903 let wire = wire.clone();
904 async move {
905 let mut sent = 0usize;
906 for (_, _, dc) in wave_peers {
907 if dc.send(&bytes::Bytes::copy_from_slice(&wire)).await.is_ok() {
908 sent += 1;
909 }
910 }
911 sent
912 }
913 },
914 |wait| {
915 let rx = rx.clone();
916 async move {
917 let mut rx = rx.lock().await;
918 match tokio::time::timeout(wait, &mut *rx).await {
919 Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
920 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
921 Err(_) => HedgedWaveAction::Continue,
922 }
923 }
924 },
925 )
926 .await;
927
928 self.cashu_quotes.clear_pending_quote(&hash_hex).await;
929 result
930 }
931
932 async fn request_from_single_peer(
933 &self,
934 hash_hex: &str,
935 hash_bytes: &[u8],
936 expected_hash: [u8; 32],
937 target_peer_id: &str,
938 quote: Option<&NegotiatedQuote>,
939 ordered_peers: &[ConnectedPeer],
940 ) -> Option<Vec<u8>> {
941 use super::types::BLOB_REQUEST_POLICY;
942
943 let (pending_requests, dc) = ordered_peers
944 .iter()
945 .find(|(peer_id, _, _)| peer_id == target_peer_id)
946 .map(|(_, pending_requests, dc)| (pending_requests.clone(), dc.clone()))?;
947
948 let request = DataRequest {
949 h: hash_bytes.to_vec(),
950 htl: BLOB_REQUEST_POLICY.max_htl,
951 q: quote.map(|quote| quote.quote_id),
952 };
953 let wire = encode_request(&request).ok()?;
954 let wire_len = wire.len() as u64;
955 let sent_at = Instant::now();
956 let (tx, mut rx) = tokio::sync::oneshot::channel();
957
958 {
959 let mut pending = pending_requests.lock().await;
960 pending.insert(
961 hash_hex.to_string(),
962 if let Some(quote) = quote {
963 PendingRequest::quoted(
964 hash_bytes.to_vec(),
965 tx,
966 quote.quote_id,
967 quote.mint_url.clone().unwrap_or_default(),
968 quote.payment_sat,
969 )
970 } else {
971 PendingRequest::standard(hash_bytes.to_vec(), tx)
972 },
973 );
974 }
975
976 if dc
977 .send(&bytes::Bytes::copy_from_slice(&wire))
978 .await
979 .is_err()
980 {
981 let mut pending = pending_requests.lock().await;
982 pending.remove(hash_hex);
983 self.peer_selector
984 .write()
985 .await
986 .record_failure(target_peer_id);
987 return None;
988 }
989
990 self.record_sent(target_peer_id, wire_len).await;
991 self.peer_selector
992 .write()
993 .await
994 .record_request(target_peer_id, wire_len);
995
996 let wait_timeout = if let Some(quote) = quote {
997 let multiplier = quote.payment_sat.clamp(1, 32) as u128;
998 let extra_ms = self
999 .cashu_quotes
1000 .settlement_timeout()
1001 .as_millis()
1002 .saturating_mul(multiplier);
1003 self.request_timeout + Duration::from_millis(extra_ms.min(u64::MAX as u128) as u64)
1004 } else {
1005 self.request_timeout
1006 };
1007
1008 match tokio::time::timeout(wait_timeout, &mut rx).await {
1009 Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == expected_hash => {
1010 let rtt_ms = sent_at.elapsed().as_millis() as u64;
1011 self.record_received(target_peer_id, data.len() as u64)
1012 .await;
1013 self.peer_selector.write().await.record_success(
1014 target_peer_id,
1015 rtt_ms,
1016 data.len() as u64,
1017 );
1018 Some(data)
1019 }
1020 Ok(Ok(Some(_))) => {
1021 self.peer_selector
1022 .write()
1023 .await
1024 .record_failure(target_peer_id);
1025 let pending = pending_requests.lock().await.remove(hash_hex);
1026 if let Some(pending) = pending {
1027 if let Some(quoted) = pending.quoted {
1028 if let Some(in_flight) = quoted.in_flight_payment {
1029 let _ = self
1030 .cashu_quotes
1031 .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
1032 .await;
1033 }
1034 }
1035 }
1036 None
1037 }
1038 Ok(Ok(None)) | Ok(Err(_)) | Err(_) => {
1039 let pending = pending_requests.lock().await.remove(hash_hex);
1040 if let Some(pending) = pending {
1041 if let Some(quoted) = pending.quoted {
1042 if let Some(in_flight) = quoted.in_flight_payment {
1043 let _ = self
1044 .cashu_quotes
1045 .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
1046 .await;
1047 }
1048 }
1049 }
1050 self.peer_selector
1051 .write()
1052 .await
1053 .record_timeout(target_peer_id);
1054 None
1055 }
1056 }
1057 }
1058
1059 pub async fn resolve_root_from_peers(
1061 &self,
1062 owner_pubkey: &str,
1063 tree_name: &str,
1064 per_peer_timeout: Duration,
1065 ) -> Option<PeerRootEvent> {
1066 let filter = build_root_filter(owner_pubkey, tree_name)?;
1067
1068 let peer_refs: Vec<_> = {
1069 let peers = self.peers.read().await;
1070 peers
1071 .values()
1072 .filter(|entry| entry.state == ConnectionState::Connected)
1073 .filter(|entry| {
1074 !bluetooth_nostr_only_mode() || entry.transport != PeerTransport::Bluetooth
1075 })
1076 .filter_map(|entry| {
1077 let peer = entry.peer.as_ref()?;
1078 if !peer.is_ready() {
1079 return None;
1080 }
1081 Some((entry.peer_id.short(), peer.clone()))
1082 })
1083 .collect()
1084 };
1085
1086 for (peer_short, peer) in peer_refs {
1087 debug!(
1088 "Querying peer {} for root event {}/{}",
1089 peer_short, owner_pubkey, tree_name
1090 );
1091 let events = match peer
1092 .query_nostr_events(vec![filter.clone()], per_peer_timeout)
1093 .await
1094 {
1095 Ok(events) => events,
1096 Err(e) => {
1097 debug!(
1098 "Peer {} Nostr query failed for {}/{}: {}",
1099 peer_short, owner_pubkey, tree_name, e
1100 );
1101 continue;
1102 }
1103 };
1104 debug!(
1105 "Peer {} returned {} Nostr event(s) for {}/{}",
1106 peer_short,
1107 events.len(),
1108 owner_pubkey,
1109 tree_name
1110 );
1111
1112 let latest = pick_latest_event(events.iter().filter(|event| {
1113 hashtree_event_identifier(event).as_deref() == Some(tree_name)
1114 && is_hashtree_labeled_event(event)
1115 }));
1116 if let Some(event) = latest {
1117 if let Some(root) = root_event_from_peer(event, &peer_short, tree_name) {
1118 debug!(
1119 "Resolved {}/{} via peer {} event {}",
1120 owner_pubkey,
1121 tree_name,
1122 peer_short,
1123 event.id.to_hex()
1124 );
1125 return Some(root);
1126 }
1127 }
1128 }
1129
1130 None
1131 }
1132
1133 pub async fn resolve_root_from_local_buses_with_source(
1134 &self,
1135 owner_pubkey: &str,
1136 tree_name: &str,
1137 timeout: Duration,
1138 ) -> Option<(&'static str, PeerRootEvent)> {
1139 let buses = self.local_buses.read().await.clone();
1140 for bus in buses {
1141 if let Some(root) = bus.query_root(owner_pubkey, tree_name, timeout).await {
1142 return Some((bus.source_name(), root));
1143 }
1144 }
1145 None
1146 }
1147
1148 pub async fn resolve_root_from_local_buses(
1149 &self,
1150 owner_pubkey: &str,
1151 tree_name: &str,
1152 timeout: Duration,
1153 ) -> Option<PeerRootEvent> {
1154 self.resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
1155 .await
1156 .map(|(_, root)| root)
1157 }
1158
1159 pub async fn resolve_root_from_multicast(
1160 &self,
1161 owner_pubkey: &str,
1162 tree_name: &str,
1163 timeout: Duration,
1164 ) -> Option<PeerRootEvent> {
1165 self.resolve_root_from_local_buses(owner_pubkey, tree_name, timeout)
1166 .await
1167 }
1168}
1169
1170impl Default for WebRTCState {
1171 fn default() -> Self {
1172 Self::new()
1173 }
1174}
1175
1176pub struct WebRTCManager {
1178 config: WebRTCConfig,
1179 my_peer_id: PeerId,
1180 keys: Keys,
1181 state: Arc<WebRTCState>,
1182 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
1183 shutdown_rx: tokio::sync::watch::Receiver<bool>,
1184 signaling_tx: mpsc::Sender<SignalingMessage>,
1186 signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
1187 store: Option<Arc<dyn ContentStore>>,
1189 peer_classifier: PeerClassifier,
1191 nostr_relay: Option<Arc<NostrRelay>>,
1193 local_buses: Vec<SharedLocalNostrBus>,
1194 state_event_tx: mpsc::Sender<PeerStateEvent>,
1196 state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
1197 mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
1199 mesh_frame_rx: Option<mpsc::Receiver<(PeerId, MeshNostrFrame)>>,
1200 shared_router: Option<Arc<SharedProductionRouter>>,
1201 seen_frame_ids: Arc<Mutex<TimedSeenSet>>,
1202 seen_event_ids: Arc<Mutex<TimedSeenSet>>,
1203}
1204
1205impl WebRTCManager {
1206 pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
1208 let pubkey = keys.public_key().to_hex();
1209 let my_peer_id = PeerId::new(pubkey);
1210 let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
1211 let (signaling_tx, signaling_rx) = mpsc::channel(100);
1212 let (state_event_tx, state_event_rx) = mpsc::channel(100);
1213 let (mesh_frame_tx, mesh_frame_rx) = mpsc::channel(256);
1214 let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1215 config.request_selection_strategy,
1216 config.request_fairness_enabled,
1217 config.request_dispatch,
1218 Duration::from_millis(config.message_timeout_ms),
1219 CashuRoutingConfig::default(),
1220 None,
1221 None,
1222 ));
1223
1224 let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
1226
1227 Self {
1228 config,
1229 my_peer_id,
1230 keys,
1231 state,
1232 shutdown: Arc::new(shutdown),
1233 shutdown_rx,
1234 signaling_tx,
1235 signaling_rx: Some(signaling_rx),
1236 store: None,
1237 peer_classifier,
1238 nostr_relay: None,
1239 local_buses: Vec::new(),
1240 state_event_tx,
1241 state_event_rx: Some(state_event_rx),
1242 mesh_frame_tx,
1243 mesh_frame_rx: Some(mesh_frame_rx),
1244 shared_router: None,
1245 seen_frame_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1246 SEEN_FRAME_CAP,
1247 SEEN_FRAME_TTL,
1248 ))),
1249 seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1250 SEEN_EVENT_CAP,
1251 SEEN_EVENT_TTL,
1252 ))),
1253 }
1254 }
1255
1256 pub fn new_with_state(keys: Keys, config: WebRTCConfig, state: Arc<WebRTCState>) -> Self {
1258 let mut manager = Self::new(keys, config);
1259 manager.state = state;
1260 manager
1261 }
1262
1263 pub fn new_with_classifier(
1265 keys: Keys,
1266 config: WebRTCConfig,
1267 classifier: PeerClassifier,
1268 ) -> Self {
1269 let mut manager = Self::new(keys, config);
1270 manager.peer_classifier = classifier;
1271 manager
1272 }
1273
1274 pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
1276 let mut manager = Self::new(keys, config);
1277 manager.store = Some(store);
1278 manager
1279 }
1280
1281 pub fn new_with_store_and_classifier(
1283 keys: Keys,
1284 config: WebRTCConfig,
1285 store: Arc<dyn ContentStore>,
1286 classifier: PeerClassifier,
1287 ) -> Self {
1288 Self::new_with_store_and_classifier_and_cashu(
1289 keys,
1290 config,
1291 store,
1292 classifier,
1293 CashuRoutingConfig::default(),
1294 None,
1295 None,
1296 )
1297 }
1298
1299 pub fn new_with_state_and_store_and_classifier(
1300 keys: Keys,
1301 config: WebRTCConfig,
1302 state: Arc<WebRTCState>,
1303 store: Arc<dyn ContentStore>,
1304 classifier: PeerClassifier,
1305 ) -> Self {
1306 let mut manager = Self::new_with_state(keys, config, state);
1307 manager.store = Some(store);
1308 manager.peer_classifier = classifier;
1309 manager
1310 }
1311
1312 pub fn new_with_store_and_classifier_and_cashu(
1313 keys: Keys,
1314 config: WebRTCConfig,
1315 store: Arc<dyn ContentStore>,
1316 classifier: PeerClassifier,
1317 cashu_routing: CashuRoutingConfig,
1318 payment_client: Option<Arc<dyn CashuPaymentClient>>,
1319 mint_metadata: Option<Arc<CashuMintMetadataStore>>,
1320 ) -> Self {
1321 let mut manager = Self::new(keys, config);
1322 manager.state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1323 manager.config.request_selection_strategy,
1324 manager.config.request_fairness_enabled,
1325 manager.config.request_dispatch,
1326 Duration::from_millis(manager.config.message_timeout_ms),
1327 cashu_routing,
1328 payment_client,
1329 mint_metadata,
1330 ));
1331 manager.store = Some(store);
1332 manager.peer_classifier = classifier;
1333 manager
1334 }
1335
1336 pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
1338 self.store = Some(store);
1339 }
1340
1341 pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
1343 self.peer_classifier = classifier;
1344 }
1345
1346 pub fn set_nostr_relay(&mut self, relay: Arc<NostrRelay>) {
1348 self.nostr_relay = Some(relay);
1349 }
1350
1351 pub fn my_peer_id(&self) -> &PeerId {
1353 &self.my_peer_id
1354 }
1355
1356 pub fn state(&self) -> Arc<WebRTCState> {
1358 self.state.clone()
1359 }
1360
1361 pub fn shutdown_signal(&self) -> Arc<tokio::sync::watch::Sender<bool>> {
1363 self.shutdown.clone()
1364 }
1365
1366 pub fn shutdown(&self) {
1368 let _ = self.shutdown.send(true);
1369 }
1370
1371 pub async fn connected_count(&self) -> usize {
1373 self.state
1374 .connected_count
1375 .load(std::sync::atomic::Ordering::Relaxed)
1376 }
1377
1378 pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
1380 self.state
1381 .peers
1382 .read()
1383 .await
1384 .values()
1385 .map(|p| PeerStatus {
1386 peer_id: p.peer_id.to_string(),
1387 pubkey: p.peer_id.pubkey.clone(),
1388 state: p.state.to_string(),
1389 direction: p.direction,
1390 connected_at: Some(p.last_seen),
1391 pool: p.pool,
1392 })
1393 .collect()
1394 }
1395
1396 pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
1400 let peers = self.state.peers.read().await;
1401 let mut follows_connected = 0;
1402 let mut follows_active = 0;
1403 let mut other_connected = 0;
1404 let mut other_active = 0;
1405
1406 for entry in peers.values() {
1407 let is_active = entry.state == ConnectionState::Connected
1410 || entry.state == ConnectionState::Connecting;
1411
1412 match entry.pool {
1413 PeerPool::Follows => {
1414 if is_active {
1415 follows_active += 1;
1416 }
1417 if entry.state == ConnectionState::Connected {
1418 follows_connected += 1;
1419 }
1420 }
1421 PeerPool::Other => {
1422 if is_active {
1423 other_active += 1;
1424 }
1425 if entry.state == ConnectionState::Connected {
1426 other_connected += 1;
1427 }
1428 }
1429 }
1430 }
1431
1432 (
1433 follows_connected,
1434 follows_active,
1435 other_connected,
1436 other_active,
1437 )
1438 }
1439
1440 fn local_hello_message(&self) -> SignalingMessage {
1441 SignalingMessage::Hello {
1442 peer_id: self.my_peer_id.to_string(),
1443 roots: Vec::new(),
1444 }
1445 }
1446
1447 fn local_bus_max_peers(&self, source: &str) -> Option<usize> {
1448 match source {
1449 "multicast" => Some(self.config.multicast.max_peers),
1450 WIFI_AWARE_SOURCE => Some(self.config.wifi_aware.max_peers),
1451 _ => None,
1452 }
1453 }
1454
1455 fn can_track_local_bus_peer(
1456 &self,
1457 source: &str,
1458 peer_key: &str,
1459 peers: &HashMap<String, PeerEntry>,
1460 ) -> bool {
1461 let Some(max_peers) = self.local_bus_max_peers(source) else {
1462 return true;
1463 };
1464 if peers.contains_key(peer_key) {
1465 return true;
1466 }
1467 if max_peers == 0 {
1468 return false;
1469 }
1470 let signal_path = PeerSignalPath::from_source_name(source);
1471 peers
1472 .values()
1473 .filter(|entry| {
1474 entry.signal_paths.contains(&signal_path) && entry.state != ConnectionState::Failed
1475 })
1476 .count()
1477 < max_peers
1478 }
1479
1480 pub async fn run(&mut self) -> Result<()> {
1482 info!(
1483 "Starting peer router with peer ID: {}",
1484 self.my_peer_id.short()
1485 );
1486
1487 let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr::Event)>(100);
1488
1489 let mut signaling_rx = self
1491 .signaling_rx
1492 .take()
1493 .expect("signaling_rx already taken");
1494
1495 let mut state_event_rx = self
1497 .state_event_rx
1498 .take()
1499 .expect("state_event_rx already taken");
1500 let mut mesh_frame_rx = self
1501 .mesh_frame_rx
1502 .take()
1503 .expect("mesh_frame_rx already taken");
1504
1505 if self.config.bluetooth.is_enabled() {
1506 let bluetooth = BluetoothMesh::new(self.config.bluetooth.clone());
1507 let context = BluetoothRuntimeContext {
1508 my_peer_id: self.my_peer_id.clone(),
1509 store: if bluetooth_nostr_only_mode() {
1510 None
1511 } else {
1512 self.store.clone()
1513 },
1514 nostr_relay: self.nostr_relay.clone(),
1515 mesh_frame_tx: self.mesh_frame_tx.clone(),
1516 registrar: BluetoothPeerRegistrar::new(
1517 self.state.clone(),
1518 self.peer_classifier.clone(),
1519 self.config.pools.clone(),
1520 self.config.bluetooth.max_peers,
1521 ),
1522 };
1523 let _ = bluetooth.start(context).await;
1524 }
1525
1526 let (relay_write_tx, _) = tokio::sync::broadcast::channel::<SignalingMessage>(100);
1528
1529 for relay_url in &self.config.relays {
1531 let url = relay_url.clone();
1532 let event_tx = event_tx.clone();
1533 let shutdown_rx = self.shutdown_rx.clone();
1534 let keys = self.keys.clone();
1535 let relay_write_rx = relay_write_tx.subscribe();
1536
1537 tokio::spawn(async move {
1538 if let Err(e) =
1539 Self::relay_task(url.clone(), event_tx, shutdown_rx, keys, relay_write_rx).await
1540 {
1541 error!("Relay {} error: {}", url, e);
1542 }
1543 });
1544 }
1545
1546 if self.config.multicast.is_enabled() {
1547 if let Some(relay) = self.nostr_relay.clone() {
1548 match MulticastNostrBus::bind(
1549 self.config.multicast.clone(),
1550 self.keys.clone(),
1551 relay,
1552 )
1553 .await
1554 {
1555 Ok(bus) => {
1556 let local_bus: SharedLocalNostrBus = bus.clone();
1557 self.state.add_local_bus(local_bus.clone()).await;
1558 self.local_buses.push(local_bus);
1559 let shutdown_rx = self.shutdown_rx.clone();
1560 let signaling_tx = event_tx.clone();
1561 tokio::spawn(async move {
1562 if let Err(err) = bus.run(shutdown_rx, signaling_tx).await {
1563 error!("Multicast bus error: {}", err);
1564 }
1565 });
1566 }
1567 Err(err) => {
1568 warn!("Failed to start multicast bus: {}", err);
1569 }
1570 }
1571 } else {
1572 warn!("Multicast enabled but Nostr relay is unavailable");
1573 }
1574 }
1575
1576 if self.config.wifi_aware.is_enabled() {
1577 if let Some(relay) = self.nostr_relay.clone() {
1578 if let Some(bridge) = mobile_wifi_aware_bridge() {
1579 let bus = WifiAwareNostrBus::new(
1580 self.config.wifi_aware.clone(),
1581 self.keys.clone(),
1582 relay,
1583 bridge,
1584 );
1585 let local_bus: SharedLocalNostrBus = bus.clone();
1586 self.state.add_local_bus(local_bus.clone()).await;
1587 self.local_buses.push(local_bus);
1588 let shutdown_rx = self.shutdown_rx.clone();
1589 let signaling_tx = event_tx.clone();
1590 let local_peer_id = self.my_peer_id.to_string();
1591 tokio::spawn(async move {
1592 if let Err(err) = bus.run(local_peer_id, shutdown_rx, signaling_tx).await {
1593 error!("Wi-Fi Aware bus error: {}", err);
1594 }
1595 });
1596 } else {
1597 warn!("Wi-Fi Aware enabled but no mobile bridge is installed");
1598 }
1599 } else {
1600 warn!("Wi-Fi Aware enabled but Nostr relay is unavailable");
1601 }
1602 }
1603
1604 if self.config.signaling_enabled {
1605 let transport = Arc::new(RouterSignalingBridge::new(
1606 self.my_peer_id.to_string(),
1607 self.signaling_tx.clone(),
1608 ));
1609 let factory = Arc::new(SharedRouterPeerFactory::new(
1610 self.my_peer_id.clone(),
1611 self.signaling_tx.clone(),
1612 self.config.stun_servers.clone(),
1613 self.store.clone(),
1614 self.state.clone(),
1615 self.state_event_tx.clone(),
1616 self.nostr_relay.clone(),
1617 self.mesh_frame_tx.clone(),
1618 self.peer_classifier.clone(),
1619 ));
1620 let (classifier_tx, mut classifier_rx) = mpsc::channel::<SharedClassifyRequest>(32);
1621 let classifier = self.peer_classifier.clone();
1622 tokio::spawn(async move {
1623 while let Some(request) = classifier_rx.recv().await {
1624 let _ = request.response.send(classifier(&request.pubkey));
1625 }
1626 });
1627
1628 let mut router = MeshRouter::new(
1629 self.my_peer_id.to_string(),
1630 transport,
1631 factory.clone(),
1632 self.config.pools.clone(),
1633 self.config.debug,
1634 );
1635 router.set_classifier(classifier_tx);
1636 self.shared_router = Some(Arc::new(router));
1637 }
1638
1639 let mut shutdown_rx = self.shutdown_rx.clone();
1641 let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
1643 let mut hello_ticker =
1644 tokio::time::interval(Duration::from_millis(self.config.hello_interval_ms));
1645 if self.config.signaling_enabled {
1646 if let Some(shared_router) = self.shared_router.as_ref() {
1647 let _ = shared_router.send_hello(Vec::new()).await;
1648 } else {
1649 self.dispatch_signaling_message(self.local_hello_message(), &relay_write_tx)
1650 .await;
1651 }
1652 }
1653 loop {
1654 tokio::select! {
1655 _ = shutdown_rx.changed() => {
1656 if *shutdown_rx.borrow() {
1657 info!("WebRTC manager shutting down");
1658 break;
1659 }
1660 }
1661 Some((relay, event)) = event_rx.recv() => {
1662 if let Err(e) = self
1663 .handle_event(&relay, &event, self.shared_router.as_ref())
1664 .await
1665 {
1666 debug!("Error handling event from {}: {}", relay, e);
1667 }
1668 }
1669 Some(msg) = signaling_rx.recv() => {
1670 self.dispatch_signaling_message(msg, &relay_write_tx).await;
1671 }
1672 Some(event) = state_event_rx.recv() => {
1673 self.handle_peer_state_event(event, &relay_write_tx).await;
1675 }
1676 Some((from_peer_id, frame)) = mesh_frame_rx.recv() => {
1677 self.handle_mesh_frame(from_peer_id, frame).await;
1678 }
1679 _ = hello_ticker.tick(), if self.config.signaling_enabled => {
1680 if let Some(shared_router) = self.shared_router.as_ref() {
1681 let _ = shared_router.send_hello(Vec::new()).await;
1682 } else {
1683 self.dispatch_signaling_message(self.local_hello_message(), &relay_write_tx)
1684 .await;
1685 }
1686 }
1687 _ = cleanup_interval.tick() => {
1688 self.cleanup_stale_peers().await;
1690 }
1691 }
1692 }
1693
1694 Ok(())
1695 }
1696
1697 async fn relay_task(
1699 url: String,
1700 event_tx: mpsc::Sender<(String, nostr::Event)>,
1701 mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
1702 keys: Keys,
1703 mut signaling_rx: tokio::sync::broadcast::Receiver<SignalingMessage>,
1704 ) -> Result<()> {
1705 info!("Connecting to relay: {}", url);
1706
1707 let (ws_stream, _) = connect_async(&url).await?;
1708 let (mut write, mut read) = ws_stream.split();
1709
1710 let hello_filter = Filter::new()
1714 .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
1715 .custom_tag(
1716 nostr::SingleLetterTag::lowercase(nostr::Alphabet::L),
1717 vec![HELLO_TAG],
1718 )
1719 .since(nostr::Timestamp::now() - Duration::from_secs(60));
1720
1721 let directed_filter = Filter::new()
1722 .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
1723 .custom_tag(
1724 nostr::SingleLetterTag::lowercase(nostr::Alphabet::P),
1725 vec![keys.public_key().to_hex()],
1726 )
1727 .since(nostr::Timestamp::now() - Duration::from_secs(60));
1728
1729 let sub_id = nostr::SubscriptionId::generate();
1730 let sub_msg = ClientMessage::req(sub_id.clone(), vec![hello_filter, directed_filter]);
1731 write.send(Message::Text(sub_msg.as_json())).await?;
1732
1733 info!(
1734 "Subscribed to {} for WebRTC events (kind {})",
1735 url, WEBRTC_KIND
1736 );
1737
1738 loop {
1739 tokio::select! {
1740 _ = shutdown_rx.changed() => {
1741 if *shutdown_rx.borrow() {
1742 break;
1743 }
1744 }
1745 Ok(signaling_msg) = signaling_rx.recv() => {
1747 info!("Sending {} via {}", signaling_msg.msg_type(), url);
1748 if let Ok(event) = Self::create_signaling_event(&keys, &signaling_msg).await {
1749 let event_id = event.id.to_string();
1750 let msg = ClientMessage::event(event);
1751 if write.send(Message::Text(msg.as_json())).await.is_ok() {
1752 info!("Sent {} to {} (event id: {})", signaling_msg.msg_type(), url, &event_id[..16]);
1753 }
1754 }
1755 }
1756 msg = read.next() => {
1757 match msg {
1758 Some(Ok(Message::Text(text))) => {
1759 if let Ok(RelayMessage::Event { event, .. }) =
1760 RelayMessage::from_json(&text)
1761 {
1762 let _ = event_tx.send((url.clone(), *event)).await;
1763 }
1764 }
1765 Some(Err(e)) => {
1766 error!("WebSocket error from {}: {}", url, e);
1767 break;
1768 }
1769 None => {
1770 warn!("WebSocket closed: {}", url);
1771 break;
1772 }
1773 _ => {}
1774 }
1775 }
1776 }
1777 }
1778
1779 Ok(())
1780 }
1781
1782 async fn mark_seen_frame_id(&self, frame_id: String) -> bool {
1783 let mut seen = self.seen_frame_ids.lock().await;
1784 seen.insert_if_new(frame_id)
1785 }
1786
1787 async fn mark_seen_event_id(&self, event_id: String) -> bool {
1788 let mut seen = self.seen_event_ids.lock().await;
1789 seen.insert_if_new(event_id)
1790 }
1791
1792 async fn dispatch_signaling_message(
1793 &self,
1794 msg: SignalingMessage,
1795 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1796 ) {
1797 if !self.config.signaling_enabled {
1798 debug!(
1799 "Skipping signaling message {} because WebRTC signaling is disabled",
1800 msg.msg_type()
1801 );
1802 return;
1803 }
1804
1805 if relay_write_tx.send(msg.clone()).is_err() {
1806 debug!(
1807 "No relay subscribers for signaling message {}",
1808 msg.msg_type()
1809 );
1810 }
1811
1812 let event = match Self::create_signaling_event(&self.keys, &msg).await {
1813 Ok(event) => event,
1814 Err(e) => {
1815 debug!("Failed to create signaling event for mesh dispatch: {}", e);
1816 return;
1817 }
1818 };
1819
1820 for bus in &self.local_buses {
1821 if let Err(err) = bus.broadcast_event(&event).await {
1822 debug!(
1823 "Failed to broadcast signaling event over {} ({}): {}",
1824 bus.source_name(),
1825 msg.msg_type(),
1826 err
1827 );
1828 }
1829 }
1830
1831 let mut frame =
1832 MeshNostrFrame::new_event(event, &self.my_peer_id.to_string(), MESH_DEFAULT_HTL);
1833 if !self.mark_seen_frame_id(frame.frame_id.clone()).await {
1834 self.state.record_mesh_duplicate_drop();
1835 return;
1836 }
1837 if !self.mark_seen_event_id(frame.event().id.to_hex()).await {
1838 self.state.record_mesh_duplicate_drop();
1839 return;
1840 }
1841
1842 frame.sender_peer_id = self.my_peer_id.to_string();
1844 let forwarded = self.forward_mesh_frame(&frame, None).await;
1845 if forwarded > 0 {
1846 self.state.record_mesh_forwarded(forwarded as u64);
1847 }
1848 }
1849
1850 async fn forward_mesh_frame(
1851 &self,
1852 frame: &MeshNostrFrame,
1853 exclude_peer_id: Option<&str>,
1854 ) -> usize {
1855 let peers = self.state.peers.read().await;
1856 let peer_refs: Vec<_> = peers
1857 .values()
1858 .filter(|entry| entry.state == ConnectionState::Connected)
1859 .filter(|entry| {
1860 entry
1861 .peer
1862 .as_ref()
1863 .map(|peer| peer.is_ready())
1864 .unwrap_or(false)
1865 })
1866 .filter(|entry| {
1867 exclude_peer_id
1868 .map(|exclude| exclude != entry.peer_id.to_string())
1869 .unwrap_or(true)
1870 })
1871 .filter_map(|entry| {
1872 entry.peer.as_ref().map(|peer| {
1873 (
1874 entry.peer_id.to_string(),
1875 entry.peer_id.short(),
1876 peer.clone(),
1877 peer.htl_config(),
1878 )
1879 })
1880 })
1881 .collect();
1882 drop(peers);
1883
1884 let mut forwarded = 0usize;
1885 for (_peer_key, peer_short, peer, htl_cfg) in peer_refs {
1886 let next_htl = decrement_htl_with_policy(frame.htl, &MESH_EVENT_POLICY, &htl_cfg);
1887 if !should_forward_htl(next_htl) {
1888 continue;
1889 }
1890
1891 let mut outbound = frame.clone();
1892 outbound.htl = next_htl;
1893 if peer.send_mesh_frame_text(&outbound).await.is_ok() {
1894 forwarded += 1;
1895 } else {
1896 debug!("Failed to forward mesh frame to {}", peer_short);
1897 }
1898 }
1899
1900 forwarded
1901 }
1902
1903 async fn handle_mesh_frame(&self, from_peer_id: PeerId, frame: MeshNostrFrame) {
1904 if let Err(reason) = validate_mesh_frame(&frame) {
1905 debug!(
1906 "Ignoring mesh frame from {} (invalid: {})",
1907 from_peer_id.short(),
1908 reason
1909 );
1910 return;
1911 }
1912
1913 if !self.mark_seen_frame_id(frame.frame_id.clone()).await {
1914 self.state.record_mesh_duplicate_drop();
1915 return;
1916 }
1917
1918 let event = match &frame.payload {
1919 MeshNostrPayload::Event { event } => event.clone(),
1920 };
1921
1922 if !self.mark_seen_event_id(event.id.to_hex()).await {
1923 self.state.record_mesh_duplicate_drop();
1924 return;
1925 }
1926
1927 if event.verify().is_err() {
1928 debug!(
1929 "Ignoring mesh event from {} due to invalid signature",
1930 from_peer_id.short()
1931 );
1932 return;
1933 }
1934
1935 self.state.record_mesh_received();
1936
1937 if let Err(e) = self
1938 .handle_event("mesh", &event, self.shared_router.as_ref())
1939 .await
1940 {
1941 debug!(
1942 "Error handling mesh event from {}: {}",
1943 from_peer_id.short(),
1944 e
1945 );
1946 }
1947
1948 let forwarded = self
1949 .forward_mesh_frame(&frame, Some(&from_peer_id.to_string()))
1950 .await;
1951 if forwarded > 0 {
1952 self.state.record_mesh_forwarded(forwarded as u64);
1953 }
1954 }
1955
1956 async fn create_signaling_event(keys: &Keys, msg: &SignalingMessage) -> Result<nostr::Event> {
1962 encode_signaling_event(
1963 keys,
1964 msg.peer_id(),
1965 msg,
1966 Kind::Ephemeral(WEBRTC_KIND as u16),
1967 )
1968 .map_err(|e| anyhow::anyhow!(e.to_string()))
1969 }
1970
1971 async fn handle_event(
1977 &self,
1978 relay: &str,
1979 event: &nostr::Event,
1980 shared_router: Option<&Arc<SharedProductionRouter>>,
1981 ) -> Result<()> {
1982 if !self.config.signaling_enabled {
1983 return Ok(());
1984 }
1985
1986 let Some(shared_router) = shared_router else {
1987 return Ok(());
1988 };
1989
1990 let Some(msg) = decode_signaling_event(
1991 event,
1992 &self.my_peer_id.to_string(),
1993 &self.keys.public_key().to_hex(),
1994 &self.keys,
1995 ) else {
1996 return Ok(());
1997 };
1998
1999 if matches!(
2000 msg,
2001 SignalingMessage::Hello { .. } | SignalingMessage::Offer { .. }
2002 ) {
2003 let peers = self.state.peers.read().await;
2004 if !self.can_track_local_bus_peer(relay, msg.peer_id(), &peers) {
2005 return Ok(());
2006 }
2007 }
2008
2009 debug!(
2010 "Received {} from {} via {}",
2011 msg.msg_type(),
2012 msg.peer_id(),
2013 relay
2014 );
2015 let peer_id = msg.peer_id().to_string();
2016 shared_router
2017 .handle_message(msg)
2018 .await
2019 .map_err(|e| anyhow::anyhow!(e.to_string()))?;
2020 remember_peer_signal_path(self.state.as_ref(), &peer_id, relay).await;
2021
2022 Ok(())
2023 }
2024
2025 async fn handle_peer_state_event(
2027 &self,
2028 event: PeerStateEvent,
2029 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
2030 ) {
2031 match event {
2032 PeerStateEvent::Connected(peer_id) => {
2033 let peer_key = peer_id.to_string();
2034 let mut emit_hello = false;
2035 let mut peers = self.state.peers.write().await;
2036 if let Some(entry) = peers.get_mut(&peer_key) {
2037 if entry.state != ConnectionState::Connected {
2038 info!("Peer {} connected (via state event)", peer_id.short());
2039 entry.state = ConnectionState::Connected;
2040 emit_hello = true;
2041 self.state
2043 .connected_count
2044 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2045 }
2046 }
2047 drop(peers);
2048 if emit_hello {
2049 if let Some(shared_router) = self.shared_router.as_ref() {
2050 let _ = shared_router.send_hello(Vec::new()).await;
2051 } else {
2052 self.dispatch_signaling_message(self.local_hello_message(), relay_write_tx)
2053 .await;
2054 }
2055 }
2056 }
2057 PeerStateEvent::Failed(peer_id) => {
2058 let peer_key = peer_id.to_string();
2059 info!(
2060 "Peer {} connection failed - removing from pool",
2061 peer_id.short()
2062 );
2063 let removed = {
2064 let mut peers = self.state.peers.write().await;
2065 peers.remove(&peer_key)
2066 };
2067 if let Some(entry) = removed {
2068 if entry.state == ConnectionState::Connected {
2070 self.state
2071 .connected_count
2072 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
2073 }
2074 if let Some(peer) = entry.peer {
2076 let _ = peer.close().await;
2077 }
2078 }
2079 if let Some(shared_router) = self.shared_router.as_ref() {
2080 if let Some(channel) = shared_router.remove_peer(&peer_key).await {
2081 channel.close().await;
2082 }
2083 }
2084 }
2085 PeerStateEvent::Disconnected(peer_id) => {
2086 let peer_key = peer_id.to_string();
2087 info!("Peer {} disconnected - removing from pool", peer_id.short());
2088 let removed = {
2089 let mut peers = self.state.peers.write().await;
2090 peers.remove(&peer_key)
2091 };
2092 if let Some(entry) = removed {
2093 if entry.state == ConnectionState::Connected {
2095 self.state
2096 .connected_count
2097 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
2098 }
2099 if let Some(peer) = entry.peer {
2101 let _ = peer.close().await;
2102 }
2103 }
2104 if let Some(shared_router) = self.shared_router.as_ref() {
2105 if let Some(channel) = shared_router.remove_peer(&peer_key).await {
2106 channel.close().await;
2107 }
2108 }
2109 }
2110 }
2111 }
2112
2113 async fn cleanup_stale_peers(&self) {
2115 let mut peers = self.state.peers.write().await;
2116 let mut connected_count = 0;
2117 let mut to_remove = Vec::new();
2118 let stale_timeout = Duration::from_secs(60); for (key, entry) in peers.iter_mut() {
2121 if let Some(ref peer) = entry.peer {
2122 if peer.is_connected() {
2124 if entry.state != ConnectionState::Connected {
2125 info!(
2126 "Peer {} is now connected (sync fallback)",
2127 entry.peer_id.short()
2128 );
2129 entry.state = ConnectionState::Connected;
2130 }
2131 connected_count += 1;
2132 } else if entry.state == ConnectionState::Connected {
2133 info!(
2134 "Removing disconnected peer {} after transport closed",
2135 entry.peer_id.short()
2136 );
2137 to_remove.push(key.clone());
2138 } else if entry.state == ConnectionState::Connecting
2139 && entry.last_seen.elapsed() > stale_timeout
2140 {
2141 info!(
2143 "Removing stale peer {} (stuck in Connecting for {:?})",
2144 entry.peer_id.short(),
2145 entry.last_seen.elapsed()
2146 );
2147 to_remove.push(key.clone());
2148 }
2149 } else if entry.state == ConnectionState::Discovered
2150 && entry.last_seen.elapsed() > stale_timeout
2151 {
2152 debug!("Removing stale discovered peer {}", entry.peer_id.short());
2154 to_remove.push(key.clone());
2155 }
2156 }
2157
2158 let mut removed_peers = Vec::new();
2160 for key in to_remove {
2161 if let Some(entry) = peers.remove(&key) {
2162 removed_peers.push(entry);
2163 }
2164 }
2165 drop(peers);
2166
2167 for entry in removed_peers {
2168 if let Some(peer) = entry.peer {
2169 let _ = peer.close().await;
2170 }
2171 }
2172
2173 self.state
2174 .connected_count
2175 .store(connected_count, std::sync::atomic::Ordering::Relaxed);
2176 }
2177}
2178
2179#[allow(dead_code)]
2181#[derive(Debug, Clone)]
2182pub struct PeerState {
2183 pub peer_id: PeerId,
2184 pub direction: PeerDirection,
2185 pub state: String,
2186 pub last_seen: Instant,
2187}
2188
2189#[cfg(test)]
2190mod tests {
2191 use super::*;
2192 use crate::webrtc::root_events::PeerRootEvent;
2193 use crate::webrtc::session::TestMeshPeer;
2194 use crate::webrtc::SelectionStrategy;
2195 use anyhow::Result as AnyResult;
2196 use async_trait::async_trait;
2197 use hashtree_network::{build_hedged_wave_plan, normalize_dispatch_config};
2198 use nostr::{EventBuilder, Keys, Tag};
2199 use std::time::Duration;
2200
2201 struct TestLocalBus {
2202 source: &'static str,
2203 root: Option<PeerRootEvent>,
2204 }
2205
2206 #[async_trait]
2207 impl super::super::LocalNostrBus for TestLocalBus {
2208 fn source_name(&self) -> &'static str {
2209 self.source
2210 }
2211
2212 async fn broadcast_event(&self, _event: &nostr::Event) -> AnyResult<()> {
2213 Ok(())
2214 }
2215
2216 async fn query_root(
2217 &self,
2218 _owner_pubkey: &str,
2219 _tree_name: &str,
2220 _timeout: Duration,
2221 ) -> Option<PeerRootEvent> {
2222 self.root.clone()
2223 }
2224 }
2225
2226 #[test]
2227 fn root_event_from_peer_extracts_tags() {
2228 let keys = Keys::generate();
2229 let hash = "ab".repeat(32);
2230 let event = EventBuilder::new(
2231 Kind::Custom(super::super::root_events::HASHTREE_KIND),
2232 "",
2233 [
2234 Tag::parse(&["d", "repo"]).unwrap(),
2235 Tag::parse(&["l", super::super::root_events::HASHTREE_LABEL]).unwrap(),
2236 Tag::parse(&["hash", &hash]).unwrap(),
2237 Tag::parse(&["encryptedKey", &"11".repeat(32)]).unwrap(),
2238 ],
2239 )
2240 .to_event(&keys)
2241 .unwrap();
2242
2243 let parsed = root_event_from_peer(&event, "peer-a", "repo").unwrap();
2244 let expected_encrypted = "11".repeat(32);
2245 assert_eq!(parsed.hash, hash);
2246 assert_eq!(parsed.peer_id, "peer-a");
2247 assert_eq!(
2248 parsed.encrypted_key.as_deref(),
2249 Some(expected_encrypted.as_str())
2250 );
2251 assert!(parsed.key.is_none());
2252 }
2253
2254 #[test]
2255 fn pick_latest_event_prefers_higher_event_id_on_timestamp_tie() {
2256 let keys = Keys::generate();
2257 let created_at = nostr::Timestamp::from_secs(1_700_000_000);
2258 let event_a = EventBuilder::new(
2259 Kind::Custom(super::super::root_events::HASHTREE_KIND),
2260 "",
2261 [],
2262 )
2263 .custom_created_at(created_at)
2264 .to_event(&keys)
2265 .unwrap();
2266 let event_b = EventBuilder::new(
2267 Kind::Custom(super::super::root_events::HASHTREE_KIND),
2268 "",
2269 [],
2270 )
2271 .custom_created_at(created_at)
2272 .to_event(&keys)
2273 .unwrap();
2274
2275 let expected = if event_a.id > event_b.id {
2276 event_a.id
2277 } else {
2278 event_b.id
2279 };
2280 let picked = pick_latest_event([&event_a, &event_b]).unwrap();
2281 assert_eq!(picked.id, expected);
2282 }
2283
2284 #[tokio::test]
2285 async fn resolve_root_from_local_buses_returns_source_and_first_match() {
2286 let state = WebRTCState::new();
2287 let root = PeerRootEvent {
2288 hash: "ab".repeat(32),
2289 key: None,
2290 encrypted_key: None,
2291 self_encrypted_key: None,
2292 event_id: "event-1".to_string(),
2293 created_at: 1,
2294 peer_id: "bus-peer".to_string(),
2295 };
2296
2297 state
2298 .set_local_buses(vec![
2299 Arc::new(TestLocalBus {
2300 source: "empty",
2301 root: None,
2302 }),
2303 Arc::new(TestLocalBus {
2304 source: "mock-bus",
2305 root: Some(root.clone()),
2306 }),
2307 ])
2308 .await;
2309
2310 let resolved = state
2311 .resolve_root_from_local_buses_with_source("owner", "tree", Duration::from_millis(10))
2312 .await
2313 .expect("expected root from local bus");
2314
2315 assert_eq!(resolved.0, "mock-bus");
2316 assert_eq!(resolved.1.hash, root.hash);
2317 assert_eq!(resolved.1.peer_id, root.peer_id);
2318 }
2319
2320 #[tokio::test]
2321 async fn can_track_local_bus_peer_enforces_wifi_aware_limit() {
2322 let keys = Keys::generate();
2323 let mut config = WebRTCConfig::default();
2324 config.wifi_aware.enabled = true;
2325 config.wifi_aware.max_peers = 1;
2326 let manager = WebRTCManager::new(keys, config);
2327 let existing_peer = PeerId::new("peer-a".to_string());
2328 let existing_key = existing_peer.to_string();
2329 let mut peers = HashMap::new();
2330 peers.insert(
2331 existing_key.clone(),
2332 PeerEntry {
2333 peer_id: existing_peer,
2334 direction: PeerDirection::Outbound,
2335 state: ConnectionState::Discovered,
2336 last_seen: Instant::now(),
2337 peer: None,
2338 pool: PeerPool::Other,
2339 transport: PeerTransport::WebRtc,
2340 signal_paths: BTreeSet::from([PeerSignalPath::WifiAware]),
2341 bytes_sent: 0,
2342 bytes_received: 0,
2343 },
2344 );
2345
2346 assert!(manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, &existing_key, &peers,));
2347 assert!(!manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, "peer-b:sess-b", &peers,));
2348 assert!(manager.can_track_local_bus_peer("relay", "peer-c:sess-c", &peers));
2349 }
2350
2351 #[tokio::test]
2352 async fn request_from_peers_with_source_accepts_generic_mesh_peers() {
2353 let state = WebRTCState::new();
2354 let data = b"offline-over-ble".to_vec();
2355 let hash_hex = hex::encode(hashtree_core::sha256(&data));
2356
2357 state.peers.write().await.insert(
2358 "peer-a".to_string(),
2359 PeerEntry {
2360 peer_id: PeerId::new("peer-a-pub".to_string()),
2361 direction: PeerDirection::Outbound,
2362 state: ConnectionState::Connected,
2363 last_seen: Instant::now(),
2364 peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_response(Some(
2365 data.clone(),
2366 )))),
2367 pool: PeerPool::Other,
2368 transport: PeerTransport::Bluetooth,
2369 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2370 bytes_sent: 0,
2371 bytes_received: 0,
2372 },
2373 );
2374
2375 let resolved = state
2376 .request_from_peers_with_source(&hash_hex)
2377 .await
2378 .expect("expected mock mesh peer response");
2379
2380 assert_eq!(resolved.0, data);
2381 assert_eq!(resolved.1, "peer-a-pub");
2382 }
2383
2384 #[tokio::test]
2385 async fn request_from_peers_with_source_waits_full_timeout_for_last_generic_peer() {
2386 let state = WebRTCState::new_with_routing_and_cashu(
2387 SelectionStrategy::TitForTat,
2388 true,
2389 RequestDispatchConfig {
2390 initial_fanout: 1,
2391 hedge_fanout: 1,
2392 max_fanout: 1,
2393 hedge_interval_ms: 50,
2394 },
2395 Duration::from_millis(400),
2396 CashuRoutingConfig::default(),
2397 None,
2398 None,
2399 );
2400 let data = b"slow-offline-over-ble".to_vec();
2401 let hash_hex = hex::encode(hashtree_core::sha256(&data));
2402
2403 state.peers.write().await.insert(
2404 "peer-a".to_string(),
2405 PeerEntry {
2406 peer_id: PeerId::new("peer-a-pub".to_string()),
2407 direction: PeerDirection::Outbound,
2408 state: ConnectionState::Connected,
2409 last_seen: Instant::now(),
2410 peer: Some(MeshPeer::mock_for_tests(
2411 TestMeshPeer::with_delayed_response(
2412 Some(data.clone()),
2413 Duration::from_millis(200),
2414 ),
2415 )),
2416 pool: PeerPool::Other,
2417 transport: PeerTransport::Bluetooth,
2418 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2419 bytes_sent: 0,
2420 bytes_received: 0,
2421 },
2422 );
2423
2424 let resolved = state
2425 .request_from_peers_with_source(&hash_hex)
2426 .await
2427 .expect("expected delayed mock mesh peer response");
2428
2429 assert_eq!(resolved.0, data);
2430 assert_eq!(resolved.1, "peer-a-pub");
2431 }
2432
2433 #[tokio::test]
2434 async fn dispatch_signaling_message_is_noop_when_signaling_disabled() {
2435 let keys = Keys::generate();
2436 let mut config = WebRTCConfig::default();
2437 config.signaling_enabled = false;
2438 let manager = WebRTCManager::new(keys, config);
2439 let peer_id = PeerId::new("peer-a-pub".to_string());
2440 let peer_key = peer_id.to_string();
2441 let peer = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
2442 let peer_ref = peer.mock_ref().expect("mock peer").clone();
2443
2444 manager.state.peers.write().await.insert(
2445 peer_key,
2446 PeerEntry {
2447 peer_id,
2448 direction: PeerDirection::Outbound,
2449 state: ConnectionState::Connected,
2450 last_seen: Instant::now(),
2451 peer: Some(peer),
2452 pool: PeerPool::Other,
2453 transport: PeerTransport::Bluetooth,
2454 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2455 bytes_sent: 0,
2456 bytes_received: 0,
2457 },
2458 );
2459
2460 let (relay_tx, _) = tokio::sync::broadcast::channel(4);
2461 manager
2462 .dispatch_signaling_message(
2463 SignalingMessage::Hello {
2464 peer_id: manager.my_peer_id.to_string(),
2465 roots: Vec::new(),
2466 },
2467 &relay_tx,
2468 )
2469 .await;
2470
2471 assert_eq!(peer_ref.sent_frame_count().await, 0);
2472 }
2473
2474 #[tokio::test]
2475 async fn failed_peer_cleanup_does_not_hold_peer_map_lock_while_closing() {
2476 let keys = Keys::generate();
2477 let manager = Arc::new(WebRTCManager::new(keys, WebRTCConfig::default()));
2478 let peer_id = PeerId::new("peer-a-pub".to_string());
2479 let peer_key = peer_id.to_string();
2480
2481 manager.state.peers.write().await.insert(
2482 peer_key.clone(),
2483 PeerEntry {
2484 peer_id: peer_id.clone(),
2485 direction: PeerDirection::Outbound,
2486 state: ConnectionState::Connected,
2487 last_seen: Instant::now(),
2488 peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_close(
2489 Duration::from_millis(200),
2490 ))),
2491 pool: PeerPool::Other,
2492 transport: PeerTransport::Bluetooth,
2493 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2494 bytes_sent: 0,
2495 bytes_received: 0,
2496 },
2497 );
2498
2499 let (relay_tx, _) = tokio::sync::broadcast::channel(4);
2500 let manager_for_task = manager.clone();
2501 let peer_id_for_task = peer_id.clone();
2502 let cleanup_task = tokio::spawn(async move {
2503 manager_for_task
2504 .handle_peer_state_event(PeerStateEvent::Failed(peer_id_for_task), &relay_tx)
2505 .await;
2506 });
2507
2508 tokio::time::sleep(Duration::from_millis(20)).await;
2509
2510 let remaining = tokio::time::timeout(Duration::from_millis(50), async {
2511 manager.state.peers.read().await.len()
2512 })
2513 .await
2514 .expect("peer map read should not block on close");
2515
2516 assert_eq!(remaining, 0);
2517 cleanup_task.await.expect("cleanup task");
2518 }
2519
2520 #[tokio::test]
2521 async fn resolve_root_from_peers_does_not_hold_peer_map_lock_while_querying() {
2522 let keys = Keys::generate();
2523 let manager = Arc::new(WebRTCManager::new(keys.clone(), WebRTCConfig::default()));
2524 let owner_keys = Keys::generate();
2525 let owner_pubkey = owner_keys.public_key().to_hex();
2526 let tree_name = "video";
2527 let hash = "ab".repeat(32);
2528 let event = EventBuilder::new(
2529 Kind::Custom(super::super::root_events::HASHTREE_KIND),
2530 "",
2531 [
2532 Tag::parse(&["d", tree_name]).unwrap(),
2533 Tag::parse(&["l", super::super::root_events::HASHTREE_LABEL]).unwrap(),
2534 Tag::parse(&["hash", &hash]).unwrap(),
2535 ],
2536 )
2537 .to_event(&owner_keys)
2538 .unwrap();
2539
2540 let peer_id = PeerId::new("peer-a-pub".to_string());
2541 let peer_key = peer_id.to_string();
2542
2543 manager.state.peers.write().await.insert(
2544 peer_key.clone(),
2545 PeerEntry {
2546 peer_id,
2547 direction: PeerDirection::Outbound,
2548 state: ConnectionState::Connected,
2549 last_seen: Instant::now(),
2550 peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_events(
2551 vec![event],
2552 Duration::from_millis(200),
2553 ))),
2554 pool: PeerPool::Other,
2555 transport: PeerTransport::Bluetooth,
2556 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2557 bytes_sent: 0,
2558 bytes_received: 0,
2559 },
2560 );
2561
2562 let manager_for_task = manager.clone();
2563 let owner_pubkey_for_task = owner_pubkey.clone();
2564 let resolve_task = tokio::spawn(async move {
2565 manager_for_task
2566 .state
2567 .resolve_root_from_peers(
2568 &owner_pubkey_for_task,
2569 tree_name,
2570 Duration::from_millis(500),
2571 )
2572 .await
2573 });
2574
2575 tokio::time::sleep(Duration::from_millis(20)).await;
2576
2577 let manager_for_writer = manager.clone();
2578 let peer_key_for_writer = peer_key.clone();
2579 let writer_task = tokio::spawn(async move {
2580 let mut peers = manager_for_writer.state.peers.write().await;
2581 if let Some(entry) = peers.get_mut(&peer_key_for_writer) {
2582 entry.bytes_received += 1;
2583 }
2584 });
2585
2586 tokio::time::sleep(Duration::from_millis(20)).await;
2587
2588 let status_count = tokio::time::timeout(Duration::from_millis(50), async {
2589 manager.state.peers.read().await.len()
2590 })
2591 .await
2592 .expect("peer map read should not block on root query");
2593
2594 assert_eq!(status_count, 1);
2595 assert!(resolve_task.await.expect("resolve task").is_some());
2596 writer_task.await.expect("writer task");
2597 }
2598
2599 #[test]
2600 fn test_formal_timed_seen_set_rejects_duplicates() {
2601 let mut seen = TimedSeenSet::new(4, Duration::from_secs(60));
2602 assert!(seen.insert_if_new("frame-1".to_string()));
2603 assert!(!seen.insert_if_new("frame-1".to_string()));
2604 assert!(seen.insert_if_new("frame-2".to_string()));
2605 }
2606
2607 #[test]
2608 fn test_formal_timed_seen_set_evicts_oldest_when_capacity_exceeded() {
2609 let mut seen = TimedSeenSet::new(2, Duration::from_secs(60));
2610 assert!(seen.insert_if_new("a".to_string()));
2611 assert!(seen.insert_if_new("b".to_string()));
2612 assert!(seen.insert_if_new("c".to_string()));
2613
2614 assert!(seen.insert_if_new("a".to_string()));
2616 assert!(!seen.insert_if_new("a".to_string()));
2617 }
2618
2619 #[test]
2620 fn test_request_dispatch_normalization_caps_to_available_peers() {
2621 let normalized = normalize_dispatch_config(
2622 RequestDispatchConfig {
2623 initial_fanout: 8,
2624 hedge_fanout: 6,
2625 max_fanout: 5,
2626 hedge_interval_ms: 120,
2627 },
2628 3,
2629 );
2630 assert_eq!(normalized.max_fanout, 3);
2631 assert_eq!(normalized.initial_fanout, 3);
2632 assert_eq!(normalized.hedge_fanout, 3);
2633 }
2634
2635 #[test]
2636 fn test_hedged_wave_plan_matches_dispatch_policy() {
2637 let plan = build_hedged_wave_plan(
2638 7,
2639 RequestDispatchConfig {
2640 initial_fanout: 2,
2641 hedge_fanout: 3,
2642 max_fanout: 6,
2643 hedge_interval_ms: 120,
2644 },
2645 );
2646 assert_eq!(plan, vec![2, 3, 1]);
2647 }
2648}