1use anyhow::Result;
13use futures::{SinkExt, StreamExt};
14use hashtree_webrtc::{
15 build_hedged_wave_plan, normalize_dispatch_config, sync_selector_peers, PeerSelector,
16};
17use nostr::{
18 nips::nip44, ClientMessage, EventBuilder, Filter, JsonUtil, Keys, Kind, PublicKey,
19 RelayMessage, Tag,
20};
21use std::collections::{BTreeSet, HashMap};
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24use tokio::sync::{mpsc, Mutex, RwLock};
25use tokio_tungstenite::{connect_async, tungstenite::Message};
26use tracing::{debug, error, info, warn};
27
28use super::bluetooth::{BluetoothMesh, BluetoothPeerRegistrar, BluetoothRuntimeContext};
29use super::cashu::{CashuMintMetadataStore, CashuQuoteState, CashuRoutingConfig, NegotiatedQuote};
30use super::local_bus::SharedLocalNostrBus;
31use super::multicast::MulticastNostrBus;
32use super::peer::{ContentStore, Peer, PendingRequest};
33use super::root_events::{
34 build_root_filter, hashtree_event_identifier, is_hashtree_labeled_event, pick_latest_event,
35 root_event_from_peer, PeerRootEvent,
36};
37use super::session::MeshPeer;
38use super::types::{
39 decrement_htl_with_policy, encode_quote_request, encode_request, should_forward_htl,
40 validate_mesh_frame, DataQuoteRequest, DataRequest, MeshNostrFrame, MeshNostrPayload,
41 PeerDirection, PeerId, PeerPool, PeerStateEvent, PeerStatus, RequestDispatchConfig,
42 SignalingMessage, TimedSeenSet, WebRTCConfig, HELLO_TAG, MESH_DEFAULT_HTL, MESH_EVENT_POLICY,
43 WEBRTC_KIND,
44};
45use super::wifi_aware::{mobile_wifi_aware_bridge, WifiAwareNostrBus, WIFI_AWARE_SOURCE};
46use crate::cashu_helper::CashuPaymentClient;
47use crate::nostr_relay::NostrRelay;
48
49pub type PeerClassifier = Arc<dyn Fn(&str) -> PeerPool + Send + Sync>;
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
54pub enum PeerTransport {
55 WebRtc,
56 Bluetooth,
57}
58
59impl PeerTransport {
60 pub const fn as_str(self) -> &'static str {
61 match self {
62 PeerTransport::WebRtc => "webrtc",
63 PeerTransport::Bluetooth => "bluetooth",
64 }
65 }
66}
67
68impl std::fmt::Display for PeerTransport {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 f.write_str((*self).as_str())
71 }
72}
73
74fn bluetooth_nostr_only_mode() -> bool {
75 matches!(
76 std::env::var("HTREE_BLUETOOTH_NOSTR_ONLY").ok().as_deref(),
77 Some("1" | "true" | "TRUE" | "yes" | "YES")
78 )
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
83pub enum PeerSignalPath {
84 Relay,
85 Multicast,
86 WifiAware,
87 Bluetooth,
88}
89
90impl PeerSignalPath {
91 pub const fn as_str(self) -> &'static str {
92 match self {
93 PeerSignalPath::Relay => "relay",
94 PeerSignalPath::Multicast => "multicast",
95 PeerSignalPath::WifiAware => WIFI_AWARE_SOURCE,
96 PeerSignalPath::Bluetooth => "bluetooth",
97 }
98 }
99
100 pub fn from_source_name(source: &str) -> Self {
101 match source {
102 "multicast" => PeerSignalPath::Multicast,
103 WIFI_AWARE_SOURCE => PeerSignalPath::WifiAware,
104 "bluetooth" => PeerSignalPath::Bluetooth,
105 _ => PeerSignalPath::Relay,
106 }
107 }
108}
109
110impl std::fmt::Display for PeerSignalPath {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 f.write_str((*self).as_str())
113 }
114}
115
116#[derive(Debug, Clone, PartialEq)]
118pub enum ConnectionState {
119 Discovered,
120 Connecting,
121 Connected,
122 Failed,
123}
124
125impl std::fmt::Display for ConnectionState {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 match self {
128 ConnectionState::Discovered => write!(f, "discovered"),
129 ConnectionState::Connecting => write!(f, "connecting"),
130 ConnectionState::Connected => write!(f, "connected"),
131 ConnectionState::Failed => write!(f, "failed"),
132 }
133 }
134}
135
136pub struct PeerEntry {
138 pub peer_id: PeerId,
139 pub direction: PeerDirection,
140 pub state: ConnectionState,
141 pub last_seen: Instant,
142 pub peer: Option<MeshPeer>,
143 pub pool: PeerPool,
144 pub transport: PeerTransport,
145 pub signal_paths: BTreeSet<PeerSignalPath>,
146 pub bytes_sent: u64,
147 pub bytes_received: u64,
148}
149
150pub struct WebRTCState {
152 pub peers: RwLock<HashMap<String, PeerEntry>>,
153 pub connected_count: std::sync::atomic::AtomicUsize,
154 pub bytes_sent: std::sync::atomic::AtomicU64,
156 pub bytes_received: std::sync::atomic::AtomicU64,
158 pub mesh_received: std::sync::atomic::AtomicU64,
160 pub mesh_forwarded: std::sync::atomic::AtomicU64,
162 pub mesh_dropped_duplicate: std::sync::atomic::AtomicU64,
164 peer_selector: Arc<RwLock<PeerSelector>>,
166 request_dispatch: RequestDispatchConfig,
168 request_timeout: Duration,
170 cashu_quotes: Arc<CashuQuoteState>,
172 local_buses: RwLock<Vec<SharedLocalNostrBus>>,
175}
176const SEEN_FRAME_CAP: usize = 4096;
177const SEEN_FRAME_TTL: Duration = Duration::from_secs(120);
178const SEEN_EVENT_CAP: usize = 8192;
179const SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
180
181type PendingRequestsMap = Arc<Mutex<HashMap<String, PendingRequest>>>;
182type ConnectedPeer = (
183 String,
184 PendingRequestsMap,
185 Arc<webrtc::data_channel::RTCDataChannel>,
186);
187type ConnectedSession = (String, MeshPeer, PeerTransport);
188
189impl WebRTCState {
190 pub fn new() -> Self {
191 let cfg = WebRTCConfig::default();
192 Self::new_with_routing_and_cashu(
193 cfg.request_selection_strategy,
194 cfg.request_fairness_enabled,
195 cfg.request_dispatch,
196 Duration::from_millis(cfg.message_timeout_ms),
197 CashuRoutingConfig::default(),
198 None,
199 None,
200 )
201 }
202
203 pub fn new_with_routing(
204 selection_strategy: super::types::SelectionStrategy,
205 fairness_enabled: bool,
206 request_dispatch: RequestDispatchConfig,
207 ) -> Self {
208 let cfg = WebRTCConfig::default();
209 Self::new_with_routing_and_cashu(
210 selection_strategy,
211 fairness_enabled,
212 request_dispatch,
213 Duration::from_millis(cfg.message_timeout_ms),
214 CashuRoutingConfig::default(),
215 None,
216 None,
217 )
218 }
219
220 pub fn new_with_routing_and_cashu(
221 selection_strategy: super::types::SelectionStrategy,
222 fairness_enabled: bool,
223 request_dispatch: RequestDispatchConfig,
224 request_timeout: Duration,
225 cashu_routing: CashuRoutingConfig,
226 payment_client: Option<Arc<dyn CashuPaymentClient>>,
227 mint_metadata: Option<Arc<CashuMintMetadataStore>>,
228 ) -> Self {
229 let mut selector = PeerSelector::with_strategy(selection_strategy);
230 selector.set_fairness(fairness_enabled);
231 let peer_selector = Arc::new(RwLock::new(selector));
232 let cashu_quotes = Arc::new(if let Some(mint_metadata) = mint_metadata {
233 CashuQuoteState::new_with_mint_metadata(
234 cashu_routing,
235 peer_selector.clone(),
236 payment_client,
237 mint_metadata,
238 )
239 } else {
240 CashuQuoteState::new(cashu_routing, peer_selector.clone(), payment_client)
241 });
242 Self {
243 peers: RwLock::new(HashMap::new()),
244 connected_count: std::sync::atomic::AtomicUsize::new(0),
245 bytes_sent: std::sync::atomic::AtomicU64::new(0),
246 bytes_received: std::sync::atomic::AtomicU64::new(0),
247 mesh_received: std::sync::atomic::AtomicU64::new(0),
248 mesh_forwarded: std::sync::atomic::AtomicU64::new(0),
249 mesh_dropped_duplicate: std::sync::atomic::AtomicU64::new(0),
250 peer_selector,
251 request_dispatch,
252 request_timeout,
253 cashu_quotes,
254 local_buses: RwLock::new(Vec::new()),
255 }
256 }
257
258 pub async fn set_local_buses(&self, buses: Vec<SharedLocalNostrBus>) {
259 *self.local_buses.write().await = buses;
260 }
261
262 pub async fn add_local_bus(&self, bus: SharedLocalNostrBus) {
263 self.local_buses.write().await.push(bus);
264 }
265
266 pub async fn set_multicast_bus(&self, bus: Option<Arc<MulticastNostrBus>>) {
267 let buses = bus
268 .into_iter()
269 .map(|bus| bus as SharedLocalNostrBus)
270 .collect();
271 self.set_local_buses(buses).await;
272 }
273
274 pub async fn reset_runtime_state(&self) {
277 self.set_local_buses(Vec::new()).await;
278 let peers = {
279 let mut peers = self.peers.write().await;
280 std::mem::take(&mut *peers)
281 };
282 self.connected_count
283 .store(0, std::sync::atomic::Ordering::Relaxed);
284 for entry in peers.into_values() {
285 if let Some(peer) = entry.peer {
286 let _ = peer.close().await;
287 }
288 }
289 }
290
291 pub fn get_bandwidth(&self) -> (u64, u64) {
293 (
294 self.bytes_sent.load(std::sync::atomic::Ordering::Relaxed),
295 self.bytes_received
296 .load(std::sync::atomic::Ordering::Relaxed),
297 )
298 }
299
300 pub fn get_mesh_stats(&self) -> (u64, u64, u64) {
301 (
302 self.mesh_received
303 .load(std::sync::atomic::Ordering::Relaxed),
304 self.mesh_forwarded
305 .load(std::sync::atomic::Ordering::Relaxed),
306 self.mesh_dropped_duplicate
307 .load(std::sync::atomic::Ordering::Relaxed),
308 )
309 }
310
311 pub fn record_mesh_received(&self) {
312 self.mesh_received
313 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
314 }
315
316 pub fn record_mesh_forwarded(&self, count: u64) {
317 self.mesh_forwarded
318 .fetch_add(count, std::sync::atomic::Ordering::Relaxed);
319 }
320
321 pub fn record_mesh_duplicate_drop(&self) {
322 self.mesh_dropped_duplicate
323 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
324 }
325
326 pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
328 self.bytes_sent
329 .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
330 if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
331 entry.bytes_sent += bytes;
332 }
333 }
334
335 pub async fn record_received(&self, peer_id: &str, bytes: u64) {
337 self.bytes_received
338 .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
339 if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
340 entry.bytes_received += bytes;
341 }
342 }
343
344 pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
348 self.request_from_peers_with_source(hash_hex)
349 .await
350 .map(|(data, _peer_id)| data)
351 }
352
353 pub async fn request_from_peers_with_source(
355 &self,
356 hash_hex: &str,
357 ) -> Option<(Vec<u8>, String)> {
358 use super::types::BLOB_REQUEST_POLICY;
359
360 let peers = self.peers.read().await;
361
362 let peer_refs: Vec<_> = peers
363 .values()
364 .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
365 .filter_map(|p| {
366 p.peer
367 .clone()
368 .map(|peer| (p.peer_id.to_string(), peer, p.transport))
369 })
370 .collect();
371
372 drop(peers); let mut connected_peers: Vec<ConnectedPeer> = Vec::new();
375 let mut connected_sessions: Vec<ConnectedSession> = Vec::new();
376 for (peer_id, peer, transport) in peer_refs {
377 if !peer.is_ready() {
378 continue;
379 }
380 if bluetooth_nostr_only_mode() && transport == PeerTransport::Bluetooth {
381 continue;
382 }
383 if let Some(webrtc_peer) = peer.as_webrtc() {
384 let dc_guard = webrtc_peer.data_channel.lock().await;
385 if let Some(dc) = dc_guard.as_ref() {
386 connected_peers.push((
387 peer_id.clone(),
388 webrtc_peer.pending_requests.clone(),
389 dc.clone(),
390 ));
391 }
392 }
393 connected_sessions.push((peer_id, peer, transport));
394 }
395
396 if connected_sessions.is_empty() {
397 debug!(
398 "No connected peers to query for {}",
399 &hash_hex[..8.min(hash_hex.len())]
400 );
401 return None;
402 }
403
404 let hash_bytes = match hex::decode(hash_hex) {
406 Ok(b) => b,
407 Err(_) => return None,
408 };
409
410 let expected_hash: [u8; 32] = match hash_bytes.as_slice().try_into() {
411 Ok(h) => h,
412 Err(_) => {
413 debug!(
414 "Invalid hash length {}, expected 32 bytes",
415 hash_bytes.len()
416 );
417 return None;
418 }
419 };
420
421 let connected_peer_ids: Vec<String> = connected_sessions
422 .iter()
423 .map(|(peer_id, _, _)| peer_id.clone())
424 .collect();
425 sync_selector_peers(self.peer_selector.as_ref(), &connected_peer_ids).await;
426
427 let ordered_peer_ids = self.peer_selector.write().await.select_peers();
428 let mut quote_by_peer: HashMap<
429 String,
430 (
431 PendingRequestsMap,
432 Arc<webrtc::data_channel::RTCDataChannel>,
433 ),
434 > = connected_peers
435 .iter()
436 .cloned()
437 .map(|(peer_id, pending, dc)| (peer_id, (pending, dc)))
438 .collect();
439 let mut ordered_quote_peers: Vec<ConnectedPeer> = Vec::new();
440 for peer_id in &ordered_peer_ids {
441 if let Some((pending, dc)) = quote_by_peer.remove(peer_id) {
442 ordered_quote_peers.push((peer_id.clone(), pending, dc));
443 }
444 }
445 for (peer_id, (pending, dc)) in quote_by_peer {
446 ordered_quote_peers.push((peer_id, pending, dc));
447 }
448
449 let mut by_peer: HashMap<String, (MeshPeer, PeerTransport)> = connected_sessions
450 .into_iter()
451 .map(|(peer_id, peer, transport)| (peer_id, (peer, transport)))
452 .collect();
453
454 let mut ordered_peers: Vec<ConnectedSession> = Vec::new();
455 for peer_id in ordered_peer_ids {
456 if let Some((peer, transport)) = by_peer.remove(&peer_id) {
457 ordered_peers.push((peer_id, peer, transport));
458 }
459 }
460 for (peer_id, (peer, transport)) in by_peer {
461 ordered_peers.push((peer_id, peer, transport));
462 }
463
464 let dispatch = normalize_dispatch_config(self.request_dispatch, ordered_peers.len());
465 let wave_plan = build_hedged_wave_plan(ordered_peers.len(), dispatch);
466 if wave_plan.is_empty() {
467 return None;
468 }
469
470 debug!(
471 "Querying {} peers for {} (strategy order + hedged waves {:?})",
472 ordered_peers.len(),
473 &hash_hex[..8.min(hash_hex.len())],
474 wave_plan
475 );
476
477 if let Some((requested_mint, payment_sat, quote_ttl_ms)) =
478 self.cashu_quotes.requester_quote_terms().await
479 {
480 if let Some(quote) = self
481 .request_quote_from_peers(
482 &hash_bytes,
483 requested_mint,
484 payment_sat,
485 quote_ttl_ms,
486 &ordered_quote_peers,
487 )
488 .await
489 {
490 if let Some(data) = self
491 .request_from_single_peer(
492 hash_hex,
493 &hash_bytes,
494 expected_hash,
495 "e.peer_id,
496 Some("e),
497 &ordered_quote_peers,
498 )
499 .await
500 {
501 debug!(
502 "Got quoted response from peer {} for {}",
503 quote.peer_id,
504 &hash_hex[..8.min(hash_hex.len())]
505 );
506 return Some((data, quote.peer_id));
507 }
508 }
509 }
510
511 let request = DataRequest {
512 h: hash_bytes.clone(),
513 htl: BLOB_REQUEST_POLICY.max_htl,
514 q: None,
515 };
516 let wire = match encode_request(&request) {
517 Ok(w) => w,
518 Err(_) => return None,
519 };
520 let wire_len = wire.len() as u64;
521 let hedge_wait_window = Duration::from_millis(dispatch.hedge_interval_ms.max(1));
522 let mut next_peer_idx = 0usize;
523 for wave_size in wave_plan {
524 let from = next_peer_idx;
525 let to = (next_peer_idx + wave_size).min(ordered_peers.len());
526 next_peer_idx = to;
527
528 if from == to {
529 continue;
530 }
531
532 let (result_tx, mut result_rx) =
533 mpsc::channel::<(String, Instant, Result<Option<Vec<u8>>>)>(to - from);
534
535 for (peer_id, peer, transport) in &ordered_peers[from..to] {
536 if *transport != PeerTransport::Bluetooth {
537 self.record_sent(peer_id, wire_len).await;
538 }
539 self.peer_selector
540 .write()
541 .await
542 .record_request(peer_id, wire_len);
543
544 let peer_id = peer_id.clone();
545 let peer = peer.clone();
546 let hash_hex = hash_hex.to_string();
547 let result_tx = result_tx.clone();
548 let per_request_timeout = self.request_timeout;
549 tokio::spawn(async move {
550 let started = Instant::now();
551 let result = peer.request(&hash_hex, per_request_timeout).await;
552 let _ = result_tx.send((peer_id, started, result)).await;
553 });
554 }
555 drop(result_tx);
556
557 let is_last_wave = next_peer_idx >= ordered_peers.len();
558 let deadline = Instant::now()
559 + if is_last_wave {
560 self.request_timeout
561 } else {
562 hedge_wait_window
563 };
564 loop {
565 let now = Instant::now();
566 if now >= deadline {
567 break;
568 }
569 let remaining = deadline.saturating_duration_since(now);
570 match tokio::time::timeout(remaining, result_rx.recv()).await {
571 Ok(Some((peer_id, started, Ok(Some(data))))) => {
572 let rtt_ms = started.elapsed().as_millis() as u64;
573 if hashtree_core::sha256(&data) == expected_hash {
574 let should_record = {
575 let peers = self.peers.read().await;
576 peers
577 .get(&peer_id)
578 .map(|entry| entry.transport != PeerTransport::Bluetooth)
579 .unwrap_or(true)
580 };
581 if should_record {
582 self.record_received(&peer_id, data.len() as u64).await;
583 }
584 self.peer_selector.write().await.record_success(
585 &peer_id,
586 rtt_ms,
587 data.len() as u64,
588 );
589 debug!(
590 "Got response from peer {} for {}",
591 peer_id,
592 &hash_hex[..8.min(hash_hex.len())]
593 );
594 return Some((data, peer_id));
595 }
596 self.peer_selector.write().await.record_failure(&peer_id);
597 }
598 Ok(Some((peer_id, _, Ok(None)))) | Ok(Some((peer_id, _, Err(_)))) => {
599 self.peer_selector.write().await.record_timeout(&peer_id);
600 }
601 Ok(None) | Err(_) => break,
602 }
603 }
604 }
605
606 debug!(
607 "No peer had data for {}",
608 &hash_hex[..8.min(hash_hex.len())]
609 );
610 None
611 }
612
613 async fn request_quote_from_peers(
614 &self,
615 hash_bytes: &[u8],
616 requested_mint: String,
617 payment_sat: u64,
618 quote_ttl_ms: u32,
619 ordered_peers: &[ConnectedPeer],
620 ) -> Option<NegotiatedQuote> {
621 if ordered_peers.is_empty() || quote_ttl_ms == 0 {
622 return None;
623 }
624
625 let dispatch = normalize_dispatch_config(self.request_dispatch, ordered_peers.len());
626 let wave_plan = build_hedged_wave_plan(ordered_peers.len(), dispatch);
627 if wave_plan.is_empty() {
628 return None;
629 }
630
631 let hash_hex = hex::encode(hash_bytes);
632 let mut rx = self
633 .cashu_quotes
634 .register_pending_quote(hash_hex.clone(), Some(requested_mint.clone()), payment_sat)
635 .await;
636 let quote_request = DataQuoteRequest {
637 h: hash_bytes.to_vec(),
638 p: payment_sat,
639 t: quote_ttl_ms,
640 m: Some(requested_mint),
641 };
642 let wire = match encode_quote_request("e_request) {
643 Ok(wire) => wire,
644 Err(_) => {
645 self.cashu_quotes.clear_pending_quote(&hash_hex).await;
646 return None;
647 }
648 };
649 let deadline = Instant::now() + self.request_timeout;
650 let mut sent_total = 0usize;
651 let mut next_peer_idx = 0usize;
652
653 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
654 let from = next_peer_idx;
655 let to = (next_peer_idx + wave_size).min(ordered_peers.len());
656 for (_, _, dc) in &ordered_peers[from..to] {
657 if dc.send(&bytes::Bytes::copy_from_slice(&wire)).await.is_ok() {
658 sent_total += 1;
659 }
660 }
661 next_peer_idx = to;
662
663 if sent_total == 0 {
664 if next_peer_idx >= ordered_peers.len() {
665 break;
666 }
667 continue;
668 }
669
670 let now = Instant::now();
671 if now >= deadline {
672 break;
673 }
674 let remaining = deadline.saturating_duration_since(now);
675 let is_last_wave =
676 wave_idx + 1 == wave_plan.len() || next_peer_idx >= ordered_peers.len();
677 let wait = if is_last_wave {
678 remaining
679 } else if dispatch.hedge_interval_ms == 0 {
680 Duration::ZERO
681 } else {
682 Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
683 };
684
685 if wait.is_zero() {
686 continue;
687 }
688
689 match tokio::time::timeout(wait, &mut rx).await {
690 Ok(Ok(Some(quote))) => {
691 self.cashu_quotes.clear_pending_quote(&hash_hex).await;
692 return Some(quote);
693 }
694 Ok(Ok(None)) | Ok(Err(_)) => break,
695 Err(_) => {}
696 }
697 }
698
699 self.cashu_quotes.clear_pending_quote(&hash_hex).await;
700 None
701 }
702
703 async fn request_from_single_peer(
704 &self,
705 hash_hex: &str,
706 hash_bytes: &[u8],
707 expected_hash: [u8; 32],
708 target_peer_id: &str,
709 quote: Option<&NegotiatedQuote>,
710 ordered_peers: &[ConnectedPeer],
711 ) -> Option<Vec<u8>> {
712 use super::types::BLOB_REQUEST_POLICY;
713
714 let (pending_requests, dc) = ordered_peers
715 .iter()
716 .find(|(peer_id, _, _)| peer_id == target_peer_id)
717 .map(|(_, pending_requests, dc)| (pending_requests.clone(), dc.clone()))?;
718
719 let request = DataRequest {
720 h: hash_bytes.to_vec(),
721 htl: BLOB_REQUEST_POLICY.max_htl,
722 q: quote.map(|quote| quote.quote_id),
723 };
724 let wire = encode_request(&request).ok()?;
725 let wire_len = wire.len() as u64;
726 let sent_at = Instant::now();
727 let (tx, mut rx) = tokio::sync::oneshot::channel();
728
729 {
730 let mut pending = pending_requests.lock().await;
731 pending.insert(
732 hash_hex.to_string(),
733 if let Some(quote) = quote {
734 PendingRequest::quoted(
735 hash_bytes.to_vec(),
736 tx,
737 quote.quote_id,
738 quote.mint_url.clone().unwrap_or_default(),
739 quote.payment_sat,
740 )
741 } else {
742 PendingRequest::standard(hash_bytes.to_vec(), tx)
743 },
744 );
745 }
746
747 if dc
748 .send(&bytes::Bytes::copy_from_slice(&wire))
749 .await
750 .is_err()
751 {
752 let mut pending = pending_requests.lock().await;
753 pending.remove(hash_hex);
754 self.peer_selector
755 .write()
756 .await
757 .record_failure(target_peer_id);
758 return None;
759 }
760
761 self.record_sent(target_peer_id, wire_len).await;
762 self.peer_selector
763 .write()
764 .await
765 .record_request(target_peer_id, wire_len);
766
767 let wait_timeout = if let Some(quote) = quote {
768 let multiplier = quote.payment_sat.clamp(1, 32) as u128;
769 let extra_ms = self
770 .cashu_quotes
771 .settlement_timeout()
772 .as_millis()
773 .saturating_mul(multiplier);
774 self.request_timeout + Duration::from_millis(extra_ms.min(u64::MAX as u128) as u64)
775 } else {
776 self.request_timeout
777 };
778
779 match tokio::time::timeout(wait_timeout, &mut rx).await {
780 Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == expected_hash => {
781 let rtt_ms = sent_at.elapsed().as_millis() as u64;
782 self.record_received(target_peer_id, data.len() as u64)
783 .await;
784 self.peer_selector.write().await.record_success(
785 target_peer_id,
786 rtt_ms,
787 data.len() as u64,
788 );
789 Some(data)
790 }
791 Ok(Ok(Some(_))) => {
792 self.peer_selector
793 .write()
794 .await
795 .record_failure(target_peer_id);
796 let pending = pending_requests.lock().await.remove(hash_hex);
797 if let Some(pending) = pending {
798 if let Some(quoted) = pending.quoted {
799 if let Some(in_flight) = quoted.in_flight_payment {
800 let _ = self
801 .cashu_quotes
802 .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
803 .await;
804 }
805 }
806 }
807 None
808 }
809 Ok(Ok(None)) | Ok(Err(_)) | Err(_) => {
810 let pending = pending_requests.lock().await.remove(hash_hex);
811 if let Some(pending) = pending {
812 if let Some(quoted) = pending.quoted {
813 if let Some(in_flight) = quoted.in_flight_payment {
814 let _ = self
815 .cashu_quotes
816 .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
817 .await;
818 }
819 }
820 }
821 self.peer_selector
822 .write()
823 .await
824 .record_timeout(target_peer_id);
825 None
826 }
827 }
828 }
829
830 pub async fn resolve_root_from_peers(
832 &self,
833 owner_pubkey: &str,
834 tree_name: &str,
835 per_peer_timeout: Duration,
836 ) -> Option<PeerRootEvent> {
837 let filter = build_root_filter(owner_pubkey, tree_name)?;
838
839 let peer_refs: Vec<_> = {
840 let peers = self.peers.read().await;
841 peers
842 .values()
843 .filter(|entry| entry.state == ConnectionState::Connected)
844 .filter(|entry| {
845 !bluetooth_nostr_only_mode() || entry.transport != PeerTransport::Bluetooth
846 })
847 .filter_map(|entry| {
848 let peer = entry.peer.as_ref()?;
849 if !peer.is_ready() {
850 return None;
851 }
852 Some((entry.peer_id.short(), peer.clone()))
853 })
854 .collect()
855 };
856
857 for (peer_short, peer) in peer_refs {
858 debug!(
859 "Querying peer {} for root event {}/{}",
860 peer_short, owner_pubkey, tree_name
861 );
862 let events = match peer
863 .query_nostr_events(vec![filter.clone()], per_peer_timeout)
864 .await
865 {
866 Ok(events) => events,
867 Err(e) => {
868 debug!(
869 "Peer {} Nostr query failed for {}/{}: {}",
870 peer_short, owner_pubkey, tree_name, e
871 );
872 continue;
873 }
874 };
875 debug!(
876 "Peer {} returned {} Nostr event(s) for {}/{}",
877 peer_short,
878 events.len(),
879 owner_pubkey,
880 tree_name
881 );
882
883 let latest = pick_latest_event(events.iter().filter(|event| {
884 hashtree_event_identifier(event).as_deref() == Some(tree_name)
885 && is_hashtree_labeled_event(event)
886 }));
887 if let Some(event) = latest {
888 if let Some(root) = root_event_from_peer(event, &peer_short, tree_name) {
889 debug!(
890 "Resolved {}/{} via peer {} event {}",
891 owner_pubkey,
892 tree_name,
893 peer_short,
894 event.id.to_hex()
895 );
896 return Some(root);
897 }
898 }
899 }
900
901 None
902 }
903
904 pub async fn resolve_root_from_local_buses_with_source(
905 &self,
906 owner_pubkey: &str,
907 tree_name: &str,
908 timeout: Duration,
909 ) -> Option<(&'static str, PeerRootEvent)> {
910 let buses = self.local_buses.read().await.clone();
911 for bus in buses {
912 if let Some(root) = bus.query_root(owner_pubkey, tree_name, timeout).await {
913 return Some((bus.source_name(), root));
914 }
915 }
916 None
917 }
918
919 pub async fn resolve_root_from_local_buses(
920 &self,
921 owner_pubkey: &str,
922 tree_name: &str,
923 timeout: Duration,
924 ) -> Option<PeerRootEvent> {
925 self.resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
926 .await
927 .map(|(_, root)| root)
928 }
929
930 pub async fn resolve_root_from_multicast(
931 &self,
932 owner_pubkey: &str,
933 tree_name: &str,
934 timeout: Duration,
935 ) -> Option<PeerRootEvent> {
936 self.resolve_root_from_local_buses(owner_pubkey, tree_name, timeout)
937 .await
938 }
939}
940
941impl Default for WebRTCState {
942 fn default() -> Self {
943 Self::new()
944 }
945}
946
947pub type PeerRouterState = WebRTCState;
948
949pub struct WebRTCManager {
951 config: WebRTCConfig,
952 my_peer_id: PeerId,
953 keys: Keys,
954 state: Arc<WebRTCState>,
955 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
956 shutdown_rx: tokio::sync::watch::Receiver<bool>,
957 signaling_tx: mpsc::Sender<SignalingMessage>,
959 signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
960 store: Option<Arc<dyn ContentStore>>,
962 peer_classifier: PeerClassifier,
964 nostr_relay: Option<Arc<NostrRelay>>,
966 local_buses: Vec<SharedLocalNostrBus>,
967 state_event_tx: mpsc::Sender<PeerStateEvent>,
969 state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
970 mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
972 mesh_frame_rx: Option<mpsc::Receiver<(PeerId, MeshNostrFrame)>>,
973 seen_frame_ids: Arc<Mutex<TimedSeenSet>>,
974 seen_event_ids: Arc<Mutex<TimedSeenSet>>,
975}
976
977impl WebRTCManager {
978 pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
980 let pubkey = keys.public_key().to_hex();
981 let my_peer_id = PeerId::new(pubkey, None);
982 let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
983 let (signaling_tx, signaling_rx) = mpsc::channel(100);
984 let (state_event_tx, state_event_rx) = mpsc::channel(100);
985 let (mesh_frame_tx, mesh_frame_rx) = mpsc::channel(256);
986 let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
987 config.request_selection_strategy,
988 config.request_fairness_enabled,
989 config.request_dispatch,
990 Duration::from_millis(config.message_timeout_ms),
991 CashuRoutingConfig::default(),
992 None,
993 None,
994 ));
995
996 let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
998
999 Self {
1000 config,
1001 my_peer_id,
1002 keys,
1003 state,
1004 shutdown: Arc::new(shutdown),
1005 shutdown_rx,
1006 signaling_tx,
1007 signaling_rx: Some(signaling_rx),
1008 store: None,
1009 peer_classifier,
1010 nostr_relay: None,
1011 local_buses: Vec::new(),
1012 state_event_tx,
1013 state_event_rx: Some(state_event_rx),
1014 mesh_frame_tx,
1015 mesh_frame_rx: Some(mesh_frame_rx),
1016 seen_frame_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1017 SEEN_FRAME_CAP,
1018 SEEN_FRAME_TTL,
1019 ))),
1020 seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
1021 SEEN_EVENT_CAP,
1022 SEEN_EVENT_TTL,
1023 ))),
1024 }
1025 }
1026
1027 pub fn new_with_state(keys: Keys, config: WebRTCConfig, state: Arc<WebRTCState>) -> Self {
1029 let mut manager = Self::new(keys, config);
1030 manager.state = state;
1031 manager
1032 }
1033
1034 pub fn new_with_classifier(
1036 keys: Keys,
1037 config: WebRTCConfig,
1038 classifier: PeerClassifier,
1039 ) -> Self {
1040 let mut manager = Self::new(keys, config);
1041 manager.peer_classifier = classifier;
1042 manager
1043 }
1044
1045 pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
1047 let mut manager = Self::new(keys, config);
1048 manager.store = Some(store);
1049 manager
1050 }
1051
1052 pub fn new_with_store_and_classifier(
1054 keys: Keys,
1055 config: WebRTCConfig,
1056 store: Arc<dyn ContentStore>,
1057 classifier: PeerClassifier,
1058 ) -> Self {
1059 Self::new_with_store_and_classifier_and_cashu(
1060 keys,
1061 config,
1062 store,
1063 classifier,
1064 CashuRoutingConfig::default(),
1065 None,
1066 None,
1067 )
1068 }
1069
1070 pub fn new_with_state_and_store_and_classifier(
1071 keys: Keys,
1072 config: WebRTCConfig,
1073 state: Arc<WebRTCState>,
1074 store: Arc<dyn ContentStore>,
1075 classifier: PeerClassifier,
1076 ) -> Self {
1077 let mut manager = Self::new_with_state(keys, config, state);
1078 manager.store = Some(store);
1079 manager.peer_classifier = classifier;
1080 manager
1081 }
1082
1083 pub fn new_with_store_and_classifier_and_cashu(
1084 keys: Keys,
1085 config: WebRTCConfig,
1086 store: Arc<dyn ContentStore>,
1087 classifier: PeerClassifier,
1088 cashu_routing: CashuRoutingConfig,
1089 payment_client: Option<Arc<dyn CashuPaymentClient>>,
1090 mint_metadata: Option<Arc<CashuMintMetadataStore>>,
1091 ) -> Self {
1092 let mut manager = Self::new(keys, config);
1093 manager.state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1094 manager.config.request_selection_strategy,
1095 manager.config.request_fairness_enabled,
1096 manager.config.request_dispatch,
1097 Duration::from_millis(manager.config.message_timeout_ms),
1098 cashu_routing,
1099 payment_client,
1100 mint_metadata,
1101 ));
1102 manager.store = Some(store);
1103 manager.peer_classifier = classifier;
1104 manager
1105 }
1106
1107 pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
1109 self.store = Some(store);
1110 }
1111
1112 pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
1114 self.peer_classifier = classifier;
1115 }
1116
1117 pub fn set_nostr_relay(&mut self, relay: Arc<NostrRelay>) {
1119 self.nostr_relay = Some(relay);
1120 }
1121
1122 pub fn my_peer_id(&self) -> &PeerId {
1124 &self.my_peer_id
1125 }
1126
1127 pub fn state(&self) -> Arc<WebRTCState> {
1129 self.state.clone()
1130 }
1131
1132 pub fn shutdown_signal(&self) -> Arc<tokio::sync::watch::Sender<bool>> {
1134 self.shutdown.clone()
1135 }
1136
1137 pub fn shutdown(&self) {
1139 let _ = self.shutdown.send(true);
1140 }
1141
1142 pub async fn connected_count(&self) -> usize {
1144 self.state
1145 .connected_count
1146 .load(std::sync::atomic::Ordering::Relaxed)
1147 }
1148
1149 pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
1151 self.state
1152 .peers
1153 .read()
1154 .await
1155 .values()
1156 .map(|p| PeerStatus {
1157 peer_id: p.peer_id.to_string(),
1158 pubkey: p.peer_id.pubkey.clone(),
1159 state: p.state.to_string(),
1160 direction: p.direction,
1161 connected_at: Some(p.last_seen),
1162 pool: p.pool,
1163 })
1164 .collect()
1165 }
1166
1167 pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
1171 let peers = self.state.peers.read().await;
1172 let mut follows_connected = 0;
1173 let mut follows_active = 0;
1174 let mut other_connected = 0;
1175 let mut other_active = 0;
1176
1177 for entry in peers.values() {
1178 let is_active = entry.state == ConnectionState::Connected
1181 || entry.state == ConnectionState::Connecting;
1182
1183 match entry.pool {
1184 PeerPool::Follows => {
1185 if is_active {
1186 follows_active += 1;
1187 }
1188 if entry.state == ConnectionState::Connected {
1189 follows_connected += 1;
1190 }
1191 }
1192 PeerPool::Other => {
1193 if is_active {
1194 other_active += 1;
1195 }
1196 if entry.state == ConnectionState::Connected {
1197 other_connected += 1;
1198 }
1199 }
1200 }
1201 }
1202
1203 (
1204 follows_connected,
1205 follows_active,
1206 other_connected,
1207 other_active,
1208 )
1209 }
1210
1211 fn can_accept_peer(&self, pool: PeerPool, pool_counts: &(usize, usize, usize, usize)) -> bool {
1213 let (_, follows_active, _, other_active) = *pool_counts;
1214 match pool {
1215 PeerPool::Follows => follows_active < self.config.pools.follows.max_connections,
1216 PeerPool::Other => other_active < self.config.pools.other.max_connections,
1217 }
1218 }
1219
1220 #[allow(dead_code)]
1222 fn is_pool_satisfied(
1223 &self,
1224 pool: PeerPool,
1225 pool_counts: &(usize, usize, usize, usize),
1226 ) -> bool {
1227 let (follows_connected, _, other_connected, _) = *pool_counts;
1228 match pool {
1229 PeerPool::Follows => {
1230 follows_connected >= self.config.pools.follows.satisfied_connections
1231 }
1232 PeerPool::Other => other_connected >= self.config.pools.other.satisfied_connections,
1233 }
1234 }
1235
1236 #[allow(dead_code)]
1238 fn is_satisfied(&self, pool_counts: &(usize, usize, usize, usize)) -> bool {
1239 self.is_pool_satisfied(PeerPool::Follows, pool_counts)
1240 && self.is_pool_satisfied(PeerPool::Other, pool_counts)
1241 }
1242
1243 fn should_initiate(&self, their_uuid: &str) -> bool {
1246 self.my_peer_id.uuid.as_str() < their_uuid
1247 }
1248
1249 fn local_bus_max_peers(&self, source: &str) -> Option<usize> {
1250 match source {
1251 "multicast" => Some(self.config.multicast.max_peers),
1252 WIFI_AWARE_SOURCE => Some(self.config.wifi_aware.max_peers),
1253 _ => None,
1254 }
1255 }
1256
1257 fn can_track_local_bus_peer(
1258 &self,
1259 source: &str,
1260 peer_key: &str,
1261 peers: &HashMap<String, PeerEntry>,
1262 ) -> bool {
1263 let Some(max_peers) = self.local_bus_max_peers(source) else {
1264 return true;
1265 };
1266 if peers.contains_key(peer_key) {
1267 return true;
1268 }
1269 if max_peers == 0 {
1270 return false;
1271 }
1272 let signal_path = PeerSignalPath::from_source_name(source);
1273 peers
1274 .values()
1275 .filter(|entry| {
1276 entry.signal_paths.contains(&signal_path) && entry.state != ConnectionState::Failed
1277 })
1278 .count()
1279 < max_peers
1280 }
1281
1282 pub async fn run(&mut self) -> Result<()> {
1284 info!(
1285 "Starting peer router with peer ID: {}",
1286 self.my_peer_id.short()
1287 );
1288
1289 let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr::Event)>(100);
1290
1291 let mut signaling_rx = self
1293 .signaling_rx
1294 .take()
1295 .expect("signaling_rx already taken");
1296
1297 let mut state_event_rx = self
1299 .state_event_rx
1300 .take()
1301 .expect("state_event_rx already taken");
1302 let mut mesh_frame_rx = self
1303 .mesh_frame_rx
1304 .take()
1305 .expect("mesh_frame_rx already taken");
1306
1307 if self.config.bluetooth.is_enabled() {
1308 let bluetooth = BluetoothMesh::new(self.config.bluetooth.clone());
1309 let context = BluetoothRuntimeContext {
1310 my_peer_id: self.my_peer_id.clone(),
1311 store: if bluetooth_nostr_only_mode() {
1312 None
1313 } else {
1314 self.store.clone()
1315 },
1316 nostr_relay: self.nostr_relay.clone(),
1317 mesh_frame_tx: self.mesh_frame_tx.clone(),
1318 registrar: BluetoothPeerRegistrar::new(
1319 self.state.clone(),
1320 self.peer_classifier.clone(),
1321 self.config.pools.clone(),
1322 self.config.bluetooth.max_peers,
1323 ),
1324 };
1325 let _ = bluetooth.start(context).await;
1326 }
1327
1328 let (relay_write_tx, _) = tokio::sync::broadcast::channel::<SignalingMessage>(100);
1330
1331 for relay_url in &self.config.relays {
1333 let url = relay_url.clone();
1334 let event_tx = event_tx.clone();
1335 let shutdown_rx = self.shutdown_rx.clone();
1336 let keys = self.keys.clone();
1337 let relay_write_rx = relay_write_tx.subscribe();
1338
1339 tokio::spawn(async move {
1340 if let Err(e) =
1341 Self::relay_task(url.clone(), event_tx, shutdown_rx, keys, relay_write_rx).await
1342 {
1343 error!("Relay {} error: {}", url, e);
1344 }
1345 });
1346 }
1347
1348 if self.config.multicast.is_enabled() {
1349 if let Some(relay) = self.nostr_relay.clone() {
1350 match MulticastNostrBus::bind(
1351 self.config.multicast.clone(),
1352 self.keys.clone(),
1353 relay,
1354 )
1355 .await
1356 {
1357 Ok(bus) => {
1358 let local_bus: SharedLocalNostrBus = bus.clone();
1359 self.state.add_local_bus(local_bus.clone()).await;
1360 self.local_buses.push(local_bus);
1361 let shutdown_rx = self.shutdown_rx.clone();
1362 let signaling_tx = event_tx.clone();
1363 tokio::spawn(async move {
1364 if let Err(err) = bus.run(shutdown_rx, signaling_tx).await {
1365 error!("Multicast bus error: {}", err);
1366 }
1367 });
1368 }
1369 Err(err) => {
1370 warn!("Failed to start multicast bus: {}", err);
1371 }
1372 }
1373 } else {
1374 warn!("Multicast enabled but Nostr relay is unavailable");
1375 }
1376 }
1377
1378 if self.config.wifi_aware.is_enabled() {
1379 if let Some(relay) = self.nostr_relay.clone() {
1380 if let Some(bridge) = mobile_wifi_aware_bridge() {
1381 let bus = WifiAwareNostrBus::new(
1382 self.config.wifi_aware.clone(),
1383 self.keys.clone(),
1384 relay,
1385 bridge,
1386 );
1387 let local_bus: SharedLocalNostrBus = bus.clone();
1388 self.state.add_local_bus(local_bus.clone()).await;
1389 self.local_buses.push(local_bus);
1390 let shutdown_rx = self.shutdown_rx.clone();
1391 let signaling_tx = event_tx.clone();
1392 let local_peer_id = self.my_peer_id.to_string();
1393 tokio::spawn(async move {
1394 if let Err(err) = bus.run(local_peer_id, shutdown_rx, signaling_tx).await {
1395 error!("Wi-Fi Aware bus error: {}", err);
1396 }
1397 });
1398 } else {
1399 warn!("Wi-Fi Aware enabled but no mobile bridge is installed");
1400 }
1401 } else {
1402 warn!("Wi-Fi Aware enabled but Nostr relay is unavailable");
1403 }
1404 }
1405
1406 let mut shutdown_rx = self.shutdown_rx.clone();
1408 let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
1410 let mut hello_ticker =
1411 tokio::time::interval(Duration::from_millis(self.config.hello_interval_ms));
1412 if self.config.signaling_enabled {
1413 self.dispatch_signaling_message(
1414 SignalingMessage::hello(&self.my_peer_id.uuid),
1415 &relay_write_tx,
1416 )
1417 .await;
1418 }
1419 loop {
1420 tokio::select! {
1421 _ = shutdown_rx.changed() => {
1422 if *shutdown_rx.borrow() {
1423 info!("WebRTC manager shutting down");
1424 break;
1425 }
1426 }
1427 Some((relay, event)) = event_rx.recv() => {
1428 if let Err(e) = self.handle_event(&relay, &event, &relay_write_tx).await {
1429 debug!("Error handling event from {}: {}", relay, e);
1430 }
1431 }
1432 Some(msg) = signaling_rx.recv() => {
1433 self.dispatch_signaling_message(msg, &relay_write_tx).await;
1434 }
1435 Some(event) = state_event_rx.recv() => {
1436 self.handle_peer_state_event(event, &relay_write_tx).await;
1438 }
1439 Some((from_peer_id, frame)) = mesh_frame_rx.recv() => {
1440 self.handle_mesh_frame(from_peer_id, frame, &relay_write_tx).await;
1441 }
1442 _ = hello_ticker.tick(), if self.config.signaling_enabled => {
1443 self.dispatch_signaling_message(
1444 SignalingMessage::hello(&self.my_peer_id.uuid),
1445 &relay_write_tx,
1446 ).await;
1447 }
1448 _ = cleanup_interval.tick() => {
1449 self.cleanup_stale_peers().await;
1451 }
1452 }
1453 }
1454
1455 Ok(())
1456 }
1457
1458 async fn relay_task(
1460 url: String,
1461 event_tx: mpsc::Sender<(String, nostr::Event)>,
1462 mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
1463 keys: Keys,
1464 mut signaling_rx: tokio::sync::broadcast::Receiver<SignalingMessage>,
1465 ) -> Result<()> {
1466 info!("Connecting to relay: {}", url);
1467
1468 let (ws_stream, _) = connect_async(&url).await?;
1469 let (mut write, mut read) = ws_stream.split();
1470
1471 let hello_filter = Filter::new()
1475 .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
1476 .custom_tag(
1477 nostr::SingleLetterTag::lowercase(nostr::Alphabet::L),
1478 vec![HELLO_TAG],
1479 )
1480 .since(nostr::Timestamp::now() - Duration::from_secs(60));
1481
1482 let directed_filter = Filter::new()
1483 .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
1484 .custom_tag(
1485 nostr::SingleLetterTag::lowercase(nostr::Alphabet::P),
1486 vec![keys.public_key().to_hex()],
1487 )
1488 .since(nostr::Timestamp::now() - Duration::from_secs(60));
1489
1490 let sub_id = nostr::SubscriptionId::generate();
1491 let sub_msg = ClientMessage::req(sub_id.clone(), vec![hello_filter, directed_filter]);
1492 write.send(Message::Text(sub_msg.as_json())).await?;
1493
1494 info!(
1495 "Subscribed to {} for WebRTC events (kind {})",
1496 url, WEBRTC_KIND
1497 );
1498
1499 loop {
1500 tokio::select! {
1501 _ = shutdown_rx.changed() => {
1502 if *shutdown_rx.borrow() {
1503 break;
1504 }
1505 }
1506 Ok(signaling_msg) = signaling_rx.recv() => {
1508 info!("Sending {} via {}", signaling_msg.msg_type(), url);
1509 if let Ok(event) = Self::create_signaling_event(&keys, &signaling_msg).await {
1510 let event_id = event.id.to_string();
1511 let msg = ClientMessage::event(event);
1512 if write.send(Message::Text(msg.as_json())).await.is_ok() {
1513 info!("Sent {} to {} (event id: {})", signaling_msg.msg_type(), url, &event_id[..16]);
1514 }
1515 }
1516 }
1517 msg = read.next() => {
1518 match msg {
1519 Some(Ok(Message::Text(text))) => {
1520 if let Ok(RelayMessage::Event { event, .. }) =
1521 RelayMessage::from_json(&text)
1522 {
1523 let _ = event_tx.send((url.clone(), *event)).await;
1524 }
1525 }
1526 Some(Err(e)) => {
1527 error!("WebSocket error from {}: {}", url, e);
1528 break;
1529 }
1530 None => {
1531 warn!("WebSocket closed: {}", url);
1532 break;
1533 }
1534 _ => {}
1535 }
1536 }
1537 }
1538 }
1539
1540 Ok(())
1541 }
1542
1543 async fn mark_seen_frame_id(&self, frame_id: String) -> bool {
1544 let mut seen = self.seen_frame_ids.lock().await;
1545 seen.insert_if_new(frame_id)
1546 }
1547
1548 async fn mark_seen_event_id(&self, event_id: String) -> bool {
1549 let mut seen = self.seen_event_ids.lock().await;
1550 seen.insert_if_new(event_id)
1551 }
1552
1553 async fn dispatch_signaling_message(
1554 &self,
1555 msg: SignalingMessage,
1556 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1557 ) {
1558 if !self.config.signaling_enabled {
1559 debug!(
1560 "Skipping signaling message {} because WebRTC signaling is disabled",
1561 msg.msg_type()
1562 );
1563 return;
1564 }
1565
1566 if relay_write_tx.send(msg.clone()).is_err() {
1567 debug!(
1568 "No relay subscribers for signaling message {}",
1569 msg.msg_type()
1570 );
1571 }
1572
1573 let event = match Self::create_signaling_event(&self.keys, &msg).await {
1574 Ok(event) => event,
1575 Err(e) => {
1576 debug!("Failed to create signaling event for mesh dispatch: {}", e);
1577 return;
1578 }
1579 };
1580
1581 for bus in &self.local_buses {
1582 if let Err(err) = bus.broadcast_event(&event).await {
1583 debug!(
1584 "Failed to broadcast signaling event over {} ({}): {}",
1585 bus.source_name(),
1586 msg.msg_type(),
1587 err
1588 );
1589 }
1590 }
1591
1592 let mut frame =
1593 MeshNostrFrame::new_event(event, &self.my_peer_id.to_string(), MESH_DEFAULT_HTL);
1594 if !self.mark_seen_frame_id(frame.frame_id.clone()).await {
1595 self.state.record_mesh_duplicate_drop();
1596 return;
1597 }
1598 if !self.mark_seen_event_id(frame.event().id.to_hex()).await {
1599 self.state.record_mesh_duplicate_drop();
1600 return;
1601 }
1602
1603 frame.sender_peer_id = self.my_peer_id.to_string();
1605 let forwarded = self.forward_mesh_frame(&frame, None).await;
1606 if forwarded > 0 {
1607 self.state.record_mesh_forwarded(forwarded as u64);
1608 }
1609 }
1610
1611 async fn forward_mesh_frame(
1612 &self,
1613 frame: &MeshNostrFrame,
1614 exclude_peer_id: Option<&str>,
1615 ) -> usize {
1616 let peers = self.state.peers.read().await;
1617 let peer_refs: Vec<_> = peers
1618 .values()
1619 .filter(|entry| entry.state == ConnectionState::Connected)
1620 .filter(|entry| {
1621 entry
1622 .peer
1623 .as_ref()
1624 .map(|peer| peer.is_ready())
1625 .unwrap_or(false)
1626 })
1627 .filter(|entry| {
1628 exclude_peer_id
1629 .map(|exclude| exclude != entry.peer_id.to_string())
1630 .unwrap_or(true)
1631 })
1632 .filter_map(|entry| {
1633 entry.peer.as_ref().map(|peer| {
1634 (
1635 entry.peer_id.to_string(),
1636 entry.peer_id.short(),
1637 peer.clone(),
1638 peer.htl_config(),
1639 )
1640 })
1641 })
1642 .collect();
1643 drop(peers);
1644
1645 let mut forwarded = 0usize;
1646 for (_peer_key, peer_short, peer, htl_cfg) in peer_refs {
1647 let next_htl = decrement_htl_with_policy(frame.htl, &MESH_EVENT_POLICY, &htl_cfg);
1648 if !should_forward_htl(next_htl) {
1649 continue;
1650 }
1651
1652 let mut outbound = frame.clone();
1653 outbound.htl = next_htl;
1654 if peer.send_mesh_frame_text(&outbound).await.is_ok() {
1655 forwarded += 1;
1656 } else {
1657 debug!("Failed to forward mesh frame to {}", peer_short);
1658 }
1659 }
1660
1661 forwarded
1662 }
1663
1664 async fn handle_mesh_frame(
1665 &self,
1666 from_peer_id: PeerId,
1667 frame: MeshNostrFrame,
1668 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1669 ) {
1670 if let Err(reason) = validate_mesh_frame(&frame) {
1671 debug!(
1672 "Ignoring mesh frame from {} (invalid: {})",
1673 from_peer_id.short(),
1674 reason
1675 );
1676 return;
1677 }
1678
1679 if !self.mark_seen_frame_id(frame.frame_id.clone()).await {
1680 self.state.record_mesh_duplicate_drop();
1681 return;
1682 }
1683
1684 let event = match &frame.payload {
1685 MeshNostrPayload::Event { event } => event.clone(),
1686 };
1687
1688 if !self.mark_seen_event_id(event.id.to_hex()).await {
1689 self.state.record_mesh_duplicate_drop();
1690 return;
1691 }
1692
1693 if event.verify().is_err() {
1694 debug!(
1695 "Ignoring mesh event from {} due to invalid signature",
1696 from_peer_id.short()
1697 );
1698 return;
1699 }
1700
1701 self.state.record_mesh_received();
1702
1703 if let Err(e) = self.handle_event("mesh", &event, relay_write_tx).await {
1704 debug!(
1705 "Error handling mesh event from {}: {}",
1706 from_peer_id.short(),
1707 e
1708 );
1709 }
1710
1711 let forwarded = self
1712 .forward_mesh_frame(&frame, Some(&from_peer_id.to_string()))
1713 .await;
1714 if forwarded > 0 {
1715 self.state.record_mesh_forwarded(forwarded as u64);
1716 }
1717 }
1718
1719 async fn create_signaling_event(keys: &Keys, msg: &SignalingMessage) -> Result<nostr::Event> {
1725 if let Some(recipient_str) = msg.recipient() {
1727 if let Some(peer_id) = PeerId::from_string(recipient_str) {
1729 let recipient_pubkey = PublicKey::from_hex(&peer_id.pubkey)?;
1730
1731 let seal = serde_json::json!({
1733 "pubkey": keys.public_key().to_hex(),
1734 "kind": WEBRTC_KIND,
1735 "content": serde_json::to_string(msg)?,
1736 "tags": []
1737 });
1738
1739 let ephemeral_keys = Keys::generate();
1741
1742 let encrypted_content = nip44::encrypt(
1744 ephemeral_keys.secret_key(),
1745 &recipient_pubkey,
1746 seal.to_string(),
1747 nip44::Version::V2,
1748 )?;
1749
1750 let created_at = nostr::Timestamp::now();
1752 let expiration = created_at + Duration::from_secs(5 * 60); let tags = vec![
1755 Tag::parse(&["p", &recipient_pubkey.to_hex()])?,
1756 Tag::parse(&["expiration", &expiration.as_u64().to_string()])?,
1757 ];
1758
1759 let event =
1760 EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), encrypted_content, tags)
1761 .to_event(&ephemeral_keys)?;
1762
1763 return Ok(event);
1764 }
1765 }
1766
1767 let tags = vec![
1769 Tag::parse(&["l", HELLO_TAG])?,
1770 Tag::parse(&["peerId", msg.peer_id()])?,
1771 ];
1772
1773 let event =
1774 EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), "", tags).to_event(keys)?;
1775
1776 Ok(event)
1777 }
1778
1779 async fn handle_event(
1785 &self,
1786 relay: &str,
1787 event: &nostr::Event,
1788 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1789 ) -> Result<()> {
1790 if !self.config.signaling_enabled {
1791 return Ok(());
1792 }
1793
1794 if event.kind != Kind::Ephemeral(WEBRTC_KIND as u16) {
1796 return Ok(());
1797 }
1798
1799 let get_tag = |name: &str| -> Option<String> {
1801 event.tags.iter().find_map(|tag| {
1802 let v: Vec<String> = tag.clone().to_vec();
1803 if v.len() >= 2 && v[0] == name {
1804 Some(v[1].clone())
1805 } else {
1806 None
1807 }
1808 })
1809 };
1810
1811 let l_tag = get_tag("l");
1813 if l_tag.as_deref() == Some(HELLO_TAG) {
1814 let sender_pubkey = event.pubkey.to_hex();
1815
1816 if sender_pubkey == self.my_peer_id.pubkey {
1818 return Ok(());
1819 }
1820
1821 if let Some(their_uuid) = get_tag("peerId") {
1822 debug!("Received hello from {} via {}", &sender_pubkey[..8], relay);
1823 self.handle_hello(&sender_pubkey, &their_uuid, relay, relay_write_tx)
1824 .await?;
1825 }
1826 return Ok(());
1827 }
1828
1829 let p_tag = get_tag("p");
1831 if p_tag.as_deref() != Some(&self.keys.public_key().to_hex()) {
1832 return Ok(());
1834 }
1835
1836 if event.content.is_empty() {
1838 return Ok(());
1839 }
1840
1841 let seal: serde_json::Value =
1843 match nip44::decrypt(self.keys.secret_key(), &event.pubkey, &event.content) {
1844 Ok(plaintext) => match serde_json::from_str(&plaintext) {
1845 Ok(v) => v,
1846 Err(_) => return Ok(()),
1847 },
1848 Err(_) => {
1849 return Ok(());
1851 }
1852 };
1853
1854 let sender_pubkey = seal
1856 .get("pubkey")
1857 .and_then(|v| v.as_str())
1858 .ok_or_else(|| anyhow::anyhow!("Missing pubkey in seal"))?;
1859
1860 if sender_pubkey == self.my_peer_id.pubkey {
1862 return Ok(());
1863 }
1864
1865 let content = seal
1866 .get("content")
1867 .and_then(|v| v.as_str())
1868 .ok_or_else(|| anyhow::anyhow!("Missing content in seal"))?;
1869
1870 let raw_msg: serde_json::Value = serde_json::from_str(content)?;
1871 let msg_type = raw_msg.get("type").and_then(|v| v.as_str()).unwrap_or("");
1872
1873 if raw_msg.get("targetPeerId").is_some() {
1875 let target_peer = raw_msg
1876 .get("targetPeerId")
1877 .and_then(|v| v.as_str())
1878 .unwrap_or("");
1879 if target_peer != self.my_peer_id.to_string() {
1880 return Ok(());
1881 }
1882
1883 let peer_id = raw_msg.get("peerId").and_then(|v| v.as_str()).unwrap_or("");
1884 let their_uuid = peer_id.split(':').nth(1).unwrap_or(peer_id);
1885
1886 match msg_type {
1887 "offer" => {
1888 let sdp = raw_msg
1889 .get("sdp")
1890 .and_then(|v| v.as_str())
1891 .ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
1892 let offer = serde_json::json!({ "type": "offer", "sdp": sdp });
1893 self.handle_offer(sender_pubkey, their_uuid, offer, relay, relay_write_tx)
1894 .await?;
1895 }
1896 "answer" => {
1897 let sdp = raw_msg
1898 .get("sdp")
1899 .and_then(|v| v.as_str())
1900 .ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
1901 let answer = serde_json::json!({ "type": "answer", "sdp": sdp });
1902 self.handle_answer(sender_pubkey, their_uuid, answer)
1903 .await?;
1904 }
1905 "candidate" => {
1906 let candidate = raw_msg
1907 .get("candidate")
1908 .and_then(|v| v.as_str())
1909 .unwrap_or("");
1910 if !candidate.is_empty() {
1911 let candidate_json = serde_json::json!({
1912 "candidate": candidate,
1913 "sdpMid": raw_msg.get("sdpMid"),
1914 "sdpMLineIndex": raw_msg.get("sdpMLineIndex"),
1915 });
1916 self.handle_candidate(sender_pubkey, their_uuid, candidate_json)
1917 .await?;
1918 }
1919 }
1920 "candidates" => {
1921 let candidates = raw_msg
1922 .get("candidates")
1923 .and_then(|v| v.as_array())
1924 .map(|entries| {
1925 entries
1926 .iter()
1927 .filter_map(|entry| {
1928 entry
1929 .get("candidate")
1930 .and_then(|v| v.as_str())
1931 .or_else(|| entry.as_str())
1932 .map(|candidate_str| {
1933 serde_json::json!({
1934 "candidate": candidate_str,
1935 "sdpMid": entry.get("sdpMid"),
1936 "sdpMLineIndex": entry.get("sdpMLineIndex"),
1937 })
1938 })
1939 })
1940 .collect::<Vec<_>>()
1941 })
1942 .unwrap_or_default();
1943 self.handle_candidates(sender_pubkey, their_uuid, candidates)
1944 .await?;
1945 }
1946 _ => {}
1947 }
1948
1949 return Ok(());
1950 }
1951
1952 let msg: SignalingMessage = serde_json::from_value(raw_msg)?;
1953
1954 debug!(
1955 "Received {} from {} via {} (gift-wrapped)",
1956 msg.msg_type(),
1957 &sender_pubkey[..8],
1958 relay
1959 );
1960
1961 match msg {
1962 SignalingMessage::Hello { .. } => {
1963 return Ok(());
1965 }
1966 SignalingMessage::Offer {
1967 recipient,
1968 peer_id: their_uuid,
1969 offer,
1970 } => {
1971 if recipient != self.my_peer_id.to_string() {
1972 return Ok(()); }
1974 if let Err(e) = self
1975 .handle_offer(sender_pubkey, &their_uuid, offer, relay, relay_write_tx)
1976 .await
1977 {
1978 error!(
1979 "handle_offer FAILED: sender={}, uuid={}, error={:?}",
1980 &sender_pubkey[..8.min(sender_pubkey.len())],
1981 their_uuid,
1982 e
1983 );
1984 return Err(e);
1985 }
1986 }
1987 SignalingMessage::Answer {
1988 recipient,
1989 peer_id: their_uuid,
1990 answer,
1991 } => {
1992 if recipient != self.my_peer_id.to_string() {
1993 return Ok(());
1994 }
1995 self.handle_answer(sender_pubkey, &their_uuid, answer)
1996 .await?;
1997 }
1998 SignalingMessage::Candidate {
1999 recipient,
2000 peer_id: their_uuid,
2001 candidate,
2002 } => {
2003 if recipient != self.my_peer_id.to_string() {
2004 return Ok(());
2005 }
2006 self.handle_candidate(sender_pubkey, &their_uuid, candidate)
2007 .await?;
2008 }
2009 SignalingMessage::Candidates {
2010 recipient,
2011 peer_id: their_uuid,
2012 candidates,
2013 } => {
2014 if recipient != self.my_peer_id.to_string() {
2015 return Ok(());
2016 }
2017 self.handle_candidates(sender_pubkey, &their_uuid, candidates)
2018 .await?;
2019 }
2020 }
2021
2022 Ok(())
2023 }
2024
2025 async fn handle_hello(
2027 &self,
2028 sender_pubkey: &str,
2029 their_uuid: &str,
2030 source: &str,
2031 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
2032 ) -> Result<()> {
2033 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
2034 let peer_key = full_peer_id.to_string();
2035 let mut already_discovered = false;
2036 let mut signal_paths = BTreeSet::from([PeerSignalPath::from_source_name(source)]);
2037
2038 {
2040 let peers = self.state.peers.read().await;
2041 if !self.can_track_local_bus_peer(source, &peer_key, &peers) {
2042 debug!(
2043 "Ignoring hello from {} via {} - local bus peer limit reached",
2044 full_peer_id.short(),
2045 source
2046 );
2047 return Ok(());
2048 }
2049 if let Some(entry) = peers.get(&peer_key) {
2050 if entry.state == ConnectionState::Connected
2052 || entry.state == ConnectionState::Connecting
2053 {
2054 return Ok(());
2055 }
2056 already_discovered = true;
2057 signal_paths.extend(entry.signal_paths.iter().copied());
2058 }
2059 }
2060
2061 let pool = (self.peer_classifier)(sender_pubkey);
2063
2064 let pool_counts = self.get_pool_counts().await;
2066 if !self.can_accept_peer(pool, &pool_counts) {
2067 debug!(
2068 "Ignoring hello from {} - pool {:?} is full",
2069 full_peer_id.short(),
2070 pool
2071 );
2072 return Ok(());
2073 }
2074
2075 let should_initiate = self.should_initiate(their_uuid);
2077
2078 let pool_satisfied = self.is_pool_satisfied(pool, &pool_counts);
2081 let will_initiate = should_initiate && !pool_satisfied;
2082
2083 info!(
2084 "Discovered peer: {} (pool: {:?}, initiate: {}, pool_satisfied: {})",
2085 full_peer_id.short(),
2086 pool,
2087 will_initiate,
2088 pool_satisfied
2089 );
2090
2091 if !will_initiate && pool_satisfied {
2094 debug!(
2095 "Pool {:?} is satisfied, not tracking peer {}",
2096 pool,
2097 full_peer_id.short()
2098 );
2099 return Ok(());
2100 }
2101
2102 {
2104 let mut peers = self.state.peers.write().await;
2105 peers.insert(
2106 peer_key.clone(),
2107 PeerEntry {
2108 peer_id: full_peer_id.clone(),
2109 direction: if will_initiate {
2110 PeerDirection::Outbound
2111 } else {
2112 PeerDirection::Inbound
2113 },
2114 state: ConnectionState::Discovered,
2115 last_seen: Instant::now(),
2116 peer: None,
2117 pool,
2118 transport: PeerTransport::WebRtc,
2119 signal_paths,
2120 bytes_sent: 0,
2121 bytes_received: 0,
2122 },
2123 );
2124 }
2125
2126 if !will_initiate && !already_discovered {
2129 self.dispatch_signaling_message(
2130 SignalingMessage::hello(&self.my_peer_id.uuid),
2131 relay_write_tx,
2132 )
2133 .await;
2134 }
2135
2136 if will_initiate {
2138 self.initiate_connection(&full_peer_id, pool, relay_write_tx)
2139 .await?;
2140 }
2141
2142 Ok(())
2143 }
2144
2145 async fn initiate_connection(
2147 &self,
2148 peer_id: &PeerId,
2149 pool: PeerPool,
2150 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
2151 ) -> Result<()> {
2152 let peer_key = peer_id.to_string();
2153
2154 info!(
2155 "Initiating connection to {} (pool: {:?})",
2156 peer_id.short(),
2157 pool
2158 );
2159
2160 let peer = Peer::new_with_store_and_events(
2162 peer_id.clone(),
2163 PeerDirection::Outbound,
2164 self.my_peer_id.clone(),
2165 self.signaling_tx.clone(),
2166 self.config.stun_servers.clone(),
2167 self.store.clone(),
2168 Some(self.state_event_tx.clone()),
2169 self.nostr_relay.clone(),
2170 Some(self.mesh_frame_tx.clone()),
2171 Some(self.state.cashu_quotes.clone()),
2172 )
2173 .await?;
2174
2175 peer.setup_handlers().await?;
2176
2177 let offer = peer.connect().await?;
2179
2180 {
2182 let mut peers = self.state.peers.write().await;
2183 if let Some(entry) = peers.get_mut(&peer_key) {
2184 entry.state = ConnectionState::Connecting;
2185 entry.peer = Some(MeshPeer::WebRtc(Arc::new(peer)));
2186 entry.pool = pool;
2187 }
2188 }
2189
2190 let offer_msg = SignalingMessage::Offer {
2192 offer,
2193 recipient: peer_id.to_string(),
2194 peer_id: self.my_peer_id.uuid.clone(),
2195 };
2196 self.dispatch_signaling_message(offer_msg, relay_write_tx)
2197 .await;
2198
2199 info!("Sent offer to {}", peer_id.short());
2200
2201 Ok(())
2202 }
2203
2204 async fn handle_offer(
2206 &self,
2207 sender_pubkey: &str,
2208 their_uuid: &str,
2209 offer: serde_json::Value,
2210 source: &str,
2211 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
2212 ) -> Result<()> {
2213 debug!(
2214 "handle_offer ENTRY: sender={}, uuid={}",
2215 &sender_pubkey[..8.min(sender_pubkey.len())],
2216 their_uuid
2217 );
2218 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
2219 let peer_key = full_peer_id.to_string();
2220 let mut signal_paths = BTreeSet::from([PeerSignalPath::from_source_name(source)]);
2221
2222 let pool = (self.peer_classifier)(sender_pubkey);
2224
2225 info!(
2226 "Received offer from {} (pool: {:?})",
2227 full_peer_id.short(),
2228 pool
2229 );
2230
2231 {
2233 let peers = self.state.peers.read().await;
2234 if !self.can_track_local_bus_peer(source, &peer_key, &peers) {
2235 warn!(
2236 "Rejecting offer from {} via {} - local bus peer limit reached",
2237 full_peer_id.short(),
2238 source
2239 );
2240 return Ok(());
2241 }
2242 debug!(
2243 "Checking for existing peer, peer_key: {}, known_peers: {}",
2244 peer_key,
2245 peers.len()
2246 );
2247 if let Some(entry) = peers.get(&peer_key) {
2248 if entry.peer.is_some() {
2250 debug!(
2251 "Already have peer {} with connection, skipping offer",
2252 full_peer_id.short()
2253 );
2254 return Ok(());
2255 }
2256 signal_paths.extend(entry.signal_paths.iter().copied());
2257 debug!(
2258 "Peer {} exists but has no connection, proceeding",
2259 full_peer_id.short()
2260 );
2261 } else {
2262 debug!(
2263 "Peer {} not found in peers map, will create new entry",
2264 full_peer_id.short()
2265 );
2266 }
2267 }
2268
2269 let pool_counts = self.get_pool_counts().await;
2271 debug!(
2272 "Pool counts: {:?}, checking can_accept_peer for {:?}",
2273 pool_counts, pool
2274 );
2275 if !self.can_accept_peer(pool, &pool_counts) {
2276 warn!(
2277 "Rejecting offer from {} - pool {:?} is full",
2278 full_peer_id.short(),
2279 pool
2280 );
2281 return Ok(());
2282 }
2283 debug!("Pool check passed for {}", full_peer_id.short());
2284
2285 debug!("Creating peer connection for {}", full_peer_id.short());
2287 let peer = Peer::new_with_store_and_events(
2288 full_peer_id.clone(),
2289 PeerDirection::Inbound,
2290 self.my_peer_id.clone(),
2291 self.signaling_tx.clone(),
2292 self.config.stun_servers.clone(),
2293 self.store.clone(),
2294 Some(self.state_event_tx.clone()),
2295 self.nostr_relay.clone(),
2296 Some(self.mesh_frame_tx.clone()),
2297 Some(self.state.cashu_quotes.clone()),
2298 )
2299 .await?;
2300 debug!("Peer connection created for {}", full_peer_id.short());
2301
2302 peer.setup_handlers().await?;
2303 debug!("Handlers set up for {}", full_peer_id.short());
2304
2305 let answer = peer.handle_offer(offer).await?;
2307 debug!("Answer created for {}", full_peer_id.short());
2308
2309 {
2311 let mut peers = self.state.peers.write().await;
2312 peers.insert(
2313 peer_key,
2314 PeerEntry {
2315 peer_id: full_peer_id.clone(),
2316 direction: PeerDirection::Inbound,
2317 state: ConnectionState::Connecting,
2318 last_seen: Instant::now(),
2319 peer: Some(MeshPeer::WebRtc(Arc::new(peer))),
2320 pool,
2321 transport: PeerTransport::WebRtc,
2322 signal_paths,
2323 bytes_sent: 0,
2324 bytes_received: 0,
2325 },
2326 );
2327 }
2328
2329 let answer_msg = SignalingMessage::Answer {
2333 answer,
2334 recipient: full_peer_id.to_string(),
2335 peer_id: self.my_peer_id.uuid.clone(),
2336 };
2337 self.dispatch_signaling_message(answer_msg, relay_write_tx)
2338 .await;
2339 info!("Sent answer to {}", full_peer_id.short());
2340
2341 Ok(())
2342 }
2343
2344 async fn handle_answer(
2346 &self,
2347 sender_pubkey: &str,
2348 their_uuid: &str,
2349 answer: serde_json::Value,
2350 ) -> Result<()> {
2351 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
2352 let peer_key = full_peer_id.to_string();
2353
2354 info!("Received answer from {}", full_peer_id.short());
2355
2356 let maybe_peer = {
2357 let peers = self.state.peers.read().await;
2358 peers.get(&peer_key).and_then(|entry| {
2359 if entry.state == ConnectionState::Connected {
2361 debug!(
2362 "Ignoring duplicate answer from {} - already connected",
2363 full_peer_id.short()
2364 );
2365 return None;
2366 }
2367 entry
2368 .peer
2369 .as_ref()
2370 .and_then(|peer| peer.as_webrtc().cloned())
2371 })
2372 };
2373
2374 if let Some(webrtc_peer) = maybe_peer {
2375 use webrtc::peer_connection::signaling_state::RTCSignalingState;
2377 let signaling_state = webrtc_peer.signaling_state();
2378 if signaling_state != RTCSignalingState::HaveLocalOffer {
2379 debug!(
2380 "Ignoring answer from {} - signaling state is {:?}, not HaveLocalOffer",
2381 full_peer_id.short(),
2382 signaling_state
2383 );
2384 return Ok(());
2385 }
2386 webrtc_peer.handle_answer(answer).await?;
2387 info!("Applied answer from {}", full_peer_id.short());
2388 } else {
2389 let peers = self.state.peers.read().await;
2390 if let Some(entry) = peers.get(&peer_key) {
2391 if entry.peer.is_some() {
2392 debug!(
2393 "Peer {} does not use answer signaling",
2394 full_peer_id.short()
2395 );
2396 } else {
2397 debug!("Peer {} has no connection object", full_peer_id.short());
2398 }
2399 } else {
2400 debug!("No peer found for key: {}", peer_key);
2401 }
2402 }
2403
2404 Ok(())
2405 }
2406
2407 async fn handle_candidate(
2409 &self,
2410 sender_pubkey: &str,
2411 their_uuid: &str,
2412 candidate: serde_json::Value,
2413 ) -> Result<()> {
2414 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
2415 let peer_key = full_peer_id.to_string();
2416
2417 info!("Received ICE candidate from {}", full_peer_id.short());
2418
2419 let maybe_peer = {
2420 let peers = self.state.peers.read().await;
2421 peers.get(&peer_key).and_then(|entry| {
2422 entry
2423 .peer
2424 .as_ref()
2425 .and_then(|peer| peer.as_webrtc().cloned())
2426 })
2427 };
2428 if let Some(peer) = maybe_peer {
2429 peer.handle_candidate(candidate).await?;
2430 }
2431
2432 Ok(())
2433 }
2434
2435 async fn handle_candidates(
2437 &self,
2438 sender_pubkey: &str,
2439 their_uuid: &str,
2440 candidates: Vec<serde_json::Value>,
2441 ) -> Result<()> {
2442 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
2443 let peer_key = full_peer_id.to_string();
2444
2445 debug!(
2446 "Received {} candidates from {}",
2447 candidates.len(),
2448 full_peer_id.short()
2449 );
2450
2451 let maybe_peer = {
2452 let peers = self.state.peers.read().await;
2453 peers.get(&peer_key).and_then(|entry| {
2454 entry
2455 .peer
2456 .as_ref()
2457 .and_then(|peer| peer.as_webrtc().cloned())
2458 })
2459 };
2460 if let Some(peer) = maybe_peer {
2461 for candidate in candidates {
2462 if let Err(e) = peer.handle_candidate(candidate).await {
2463 debug!("Failed to add candidate: {}", e);
2464 }
2465 }
2466 }
2467
2468 Ok(())
2469 }
2470
2471 async fn handle_peer_state_event(
2473 &self,
2474 event: PeerStateEvent,
2475 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
2476 ) {
2477 match event {
2478 PeerStateEvent::Connected(peer_id) => {
2479 let peer_key = peer_id.to_string();
2480 let mut emit_hello = false;
2481 let mut peers = self.state.peers.write().await;
2482 if let Some(entry) = peers.get_mut(&peer_key) {
2483 if entry.state != ConnectionState::Connected {
2484 info!("Peer {} connected (via state event)", peer_id.short());
2485 entry.state = ConnectionState::Connected;
2486 emit_hello = true;
2487 self.state
2489 .connected_count
2490 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2491 }
2492 }
2493 drop(peers);
2494 if emit_hello {
2495 self.dispatch_signaling_message(
2496 SignalingMessage::hello(&self.my_peer_id.uuid),
2497 relay_write_tx,
2498 )
2499 .await;
2500 }
2501 }
2502 PeerStateEvent::Failed(peer_id) => {
2503 let peer_key = peer_id.to_string();
2504 info!(
2505 "Peer {} connection failed - removing from pool",
2506 peer_id.short()
2507 );
2508 let removed = {
2509 let mut peers = self.state.peers.write().await;
2510 peers.remove(&peer_key)
2511 };
2512 if let Some(entry) = removed {
2513 if entry.state == ConnectionState::Connected {
2515 self.state
2516 .connected_count
2517 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
2518 }
2519 if let Some(peer) = entry.peer {
2521 let _ = peer.close().await;
2522 }
2523 }
2524 }
2525 PeerStateEvent::Disconnected(peer_id) => {
2526 let peer_key = peer_id.to_string();
2527 info!("Peer {} disconnected - removing from pool", peer_id.short());
2528 let removed = {
2529 let mut peers = self.state.peers.write().await;
2530 peers.remove(&peer_key)
2531 };
2532 if let Some(entry) = removed {
2533 if entry.state == ConnectionState::Connected {
2535 self.state
2536 .connected_count
2537 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
2538 }
2539 if let Some(peer) = entry.peer {
2541 let _ = peer.close().await;
2542 }
2543 }
2544 }
2545 }
2546 }
2547
2548 async fn cleanup_stale_peers(&self) {
2550 let mut peers = self.state.peers.write().await;
2551 let mut connected_count = 0;
2552 let mut to_remove = Vec::new();
2553 let stale_timeout = Duration::from_secs(60); for (key, entry) in peers.iter_mut() {
2556 if let Some(ref peer) = entry.peer {
2557 if peer.is_connected() {
2559 if entry.state != ConnectionState::Connected {
2560 info!(
2561 "Peer {} is now connected (sync fallback)",
2562 entry.peer_id.short()
2563 );
2564 entry.state = ConnectionState::Connected;
2565 }
2566 connected_count += 1;
2567 } else if entry.state == ConnectionState::Connected {
2568 info!(
2569 "Removing disconnected peer {} after transport closed",
2570 entry.peer_id.short()
2571 );
2572 to_remove.push(key.clone());
2573 } else if entry.state == ConnectionState::Connecting
2574 && entry.last_seen.elapsed() > stale_timeout
2575 {
2576 info!(
2578 "Removing stale peer {} (stuck in Connecting for {:?})",
2579 entry.peer_id.short(),
2580 entry.last_seen.elapsed()
2581 );
2582 to_remove.push(key.clone());
2583 }
2584 } else if entry.state == ConnectionState::Discovered
2585 && entry.last_seen.elapsed() > stale_timeout
2586 {
2587 debug!("Removing stale discovered peer {}", entry.peer_id.short());
2589 to_remove.push(key.clone());
2590 }
2591 }
2592
2593 let mut removed_peers = Vec::new();
2595 for key in to_remove {
2596 if let Some(entry) = peers.remove(&key) {
2597 removed_peers.push(entry);
2598 }
2599 }
2600 drop(peers);
2601
2602 for entry in removed_peers {
2603 if let Some(peer) = entry.peer {
2604 let _ = peer.close().await;
2605 }
2606 }
2607
2608 self.state
2609 .connected_count
2610 .store(connected_count, std::sync::atomic::Ordering::Relaxed);
2611 }
2612}
2613
2614pub type PeerRouter = WebRTCManager;
2615
2616#[allow(dead_code)]
2618#[derive(Debug, Clone)]
2619pub struct PeerState {
2620 pub peer_id: PeerId,
2621 pub direction: PeerDirection,
2622 pub state: String,
2623 pub last_seen: Instant,
2624}
2625
2626#[cfg(test)]
2627mod tests {
2628 use super::*;
2629 use crate::webrtc::root_events::PeerRootEvent;
2630 use crate::webrtc::session::TestMeshPeer;
2631 use crate::webrtc::SelectionStrategy;
2632 use anyhow::Result as AnyResult;
2633 use async_trait::async_trait;
2634 use nostr::{EventBuilder, Keys, Tag};
2635 use std::time::Duration;
2636
2637 struct TestLocalBus {
2638 source: &'static str,
2639 root: Option<PeerRootEvent>,
2640 }
2641
2642 #[async_trait]
2643 impl super::super::LocalNostrBus for TestLocalBus {
2644 fn source_name(&self) -> &'static str {
2645 self.source
2646 }
2647
2648 async fn broadcast_event(&self, _event: &nostr::Event) -> AnyResult<()> {
2649 Ok(())
2650 }
2651
2652 async fn query_root(
2653 &self,
2654 _owner_pubkey: &str,
2655 _tree_name: &str,
2656 _timeout: Duration,
2657 ) -> Option<PeerRootEvent> {
2658 self.root.clone()
2659 }
2660 }
2661
2662 #[test]
2663 fn root_event_from_peer_extracts_tags() {
2664 let keys = Keys::generate();
2665 let hash = "ab".repeat(32);
2666 let event = EventBuilder::new(
2667 Kind::Custom(super::super::root_events::HASHTREE_KIND),
2668 "",
2669 [
2670 Tag::parse(&["d", "repo"]).unwrap(),
2671 Tag::parse(&["l", super::super::root_events::HASHTREE_LABEL]).unwrap(),
2672 Tag::parse(&["hash", &hash]).unwrap(),
2673 Tag::parse(&["encryptedKey", &"11".repeat(32)]).unwrap(),
2674 ],
2675 )
2676 .to_event(&keys)
2677 .unwrap();
2678
2679 let parsed = root_event_from_peer(&event, "peer-a", "repo").unwrap();
2680 let expected_encrypted = "11".repeat(32);
2681 assert_eq!(parsed.hash, hash);
2682 assert_eq!(parsed.peer_id, "peer-a");
2683 assert_eq!(
2684 parsed.encrypted_key.as_deref(),
2685 Some(expected_encrypted.as_str())
2686 );
2687 assert!(parsed.key.is_none());
2688 }
2689
2690 #[test]
2691 fn pick_latest_event_prefers_higher_event_id_on_timestamp_tie() {
2692 let keys = Keys::generate();
2693 let created_at = nostr::Timestamp::from_secs(1_700_000_000);
2694 let event_a = EventBuilder::new(
2695 Kind::Custom(super::super::root_events::HASHTREE_KIND),
2696 "",
2697 [],
2698 )
2699 .custom_created_at(created_at)
2700 .to_event(&keys)
2701 .unwrap();
2702 let event_b = EventBuilder::new(
2703 Kind::Custom(super::super::root_events::HASHTREE_KIND),
2704 "",
2705 [],
2706 )
2707 .custom_created_at(created_at)
2708 .to_event(&keys)
2709 .unwrap();
2710
2711 let expected = if event_a.id > event_b.id {
2712 event_a.id
2713 } else {
2714 event_b.id
2715 };
2716 let picked = pick_latest_event([&event_a, &event_b]).unwrap();
2717 assert_eq!(picked.id, expected);
2718 }
2719
2720 #[tokio::test]
2721 async fn resolve_root_from_local_buses_returns_source_and_first_match() {
2722 let state = WebRTCState::new();
2723 let root = PeerRootEvent {
2724 hash: "ab".repeat(32),
2725 key: None,
2726 encrypted_key: None,
2727 self_encrypted_key: None,
2728 event_id: "event-1".to_string(),
2729 created_at: 1,
2730 peer_id: "bus-peer".to_string(),
2731 };
2732
2733 state
2734 .set_local_buses(vec![
2735 Arc::new(TestLocalBus {
2736 source: "empty",
2737 root: None,
2738 }),
2739 Arc::new(TestLocalBus {
2740 source: "mock-bus",
2741 root: Some(root.clone()),
2742 }),
2743 ])
2744 .await;
2745
2746 let resolved = state
2747 .resolve_root_from_local_buses_with_source("owner", "tree", Duration::from_millis(10))
2748 .await
2749 .expect("expected root from local bus");
2750
2751 assert_eq!(resolved.0, "mock-bus");
2752 assert_eq!(resolved.1.hash, root.hash);
2753 assert_eq!(resolved.1.peer_id, root.peer_id);
2754 }
2755
2756 #[tokio::test]
2757 async fn can_track_local_bus_peer_enforces_wifi_aware_limit() {
2758 let keys = Keys::generate();
2759 let mut config = WebRTCConfig::default();
2760 config.wifi_aware.enabled = true;
2761 config.wifi_aware.max_peers = 1;
2762 let manager = WebRTCManager::new(keys, config);
2763 let existing_peer = PeerId::new("peer-a".to_string(), Some("sess-a".to_string()));
2764 let existing_key = existing_peer.to_string();
2765 let mut peers = HashMap::new();
2766 peers.insert(
2767 existing_key.clone(),
2768 PeerEntry {
2769 peer_id: existing_peer,
2770 direction: PeerDirection::Outbound,
2771 state: ConnectionState::Discovered,
2772 last_seen: Instant::now(),
2773 peer: None,
2774 pool: PeerPool::Other,
2775 transport: PeerTransport::WebRtc,
2776 signal_paths: BTreeSet::from([PeerSignalPath::WifiAware]),
2777 bytes_sent: 0,
2778 bytes_received: 0,
2779 },
2780 );
2781
2782 assert!(manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, &existing_key, &peers,));
2783 assert!(!manager.can_track_local_bus_peer(WIFI_AWARE_SOURCE, "peer-b:sess-b", &peers,));
2784 assert!(manager.can_track_local_bus_peer("relay", "peer-c:sess-c", &peers));
2785 }
2786
2787 #[tokio::test]
2788 async fn request_from_peers_with_source_accepts_generic_mesh_peers() {
2789 let state = WebRTCState::new();
2790 let data = b"offline-over-ble".to_vec();
2791 let hash_hex = hex::encode(hashtree_core::sha256(&data));
2792
2793 state.peers.write().await.insert(
2794 "peer-a".to_string(),
2795 PeerEntry {
2796 peer_id: PeerId::new("peer-a-pub".to_string(), Some("session-a".to_string())),
2797 direction: PeerDirection::Outbound,
2798 state: ConnectionState::Connected,
2799 last_seen: Instant::now(),
2800 peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_response(Some(
2801 data.clone(),
2802 )))),
2803 pool: PeerPool::Other,
2804 transport: PeerTransport::Bluetooth,
2805 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2806 bytes_sent: 0,
2807 bytes_received: 0,
2808 },
2809 );
2810
2811 let resolved = state
2812 .request_from_peers_with_source(&hash_hex)
2813 .await
2814 .expect("expected mock mesh peer response");
2815
2816 assert_eq!(resolved.0, data);
2817 assert_eq!(resolved.1, "peer-a-pub:session-a");
2818 }
2819
2820 #[tokio::test]
2821 async fn request_from_peers_with_source_waits_full_timeout_for_last_generic_peer() {
2822 let state = WebRTCState::new_with_routing_and_cashu(
2823 SelectionStrategy::TitForTat,
2824 true,
2825 RequestDispatchConfig {
2826 initial_fanout: 1,
2827 hedge_fanout: 1,
2828 max_fanout: 1,
2829 hedge_interval_ms: 50,
2830 },
2831 Duration::from_millis(400),
2832 CashuRoutingConfig::default(),
2833 None,
2834 None,
2835 );
2836 let data = b"slow-offline-over-ble".to_vec();
2837 let hash_hex = hex::encode(hashtree_core::sha256(&data));
2838
2839 state.peers.write().await.insert(
2840 "peer-a".to_string(),
2841 PeerEntry {
2842 peer_id: PeerId::new("peer-a-pub".to_string(), Some("session-a".to_string())),
2843 direction: PeerDirection::Outbound,
2844 state: ConnectionState::Connected,
2845 last_seen: Instant::now(),
2846 peer: Some(MeshPeer::mock_for_tests(
2847 TestMeshPeer::with_delayed_response(
2848 Some(data.clone()),
2849 Duration::from_millis(200),
2850 ),
2851 )),
2852 pool: PeerPool::Other,
2853 transport: PeerTransport::Bluetooth,
2854 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2855 bytes_sent: 0,
2856 bytes_received: 0,
2857 },
2858 );
2859
2860 let resolved = state
2861 .request_from_peers_with_source(&hash_hex)
2862 .await
2863 .expect("expected delayed mock mesh peer response");
2864
2865 assert_eq!(resolved.0, data);
2866 assert_eq!(resolved.1, "peer-a-pub:session-a");
2867 }
2868
2869 #[tokio::test]
2870 async fn dispatch_signaling_message_is_noop_when_signaling_disabled() {
2871 let keys = Keys::generate();
2872 let mut config = WebRTCConfig::default();
2873 config.signaling_enabled = false;
2874 let manager = WebRTCManager::new(keys, config);
2875 let peer_id = PeerId::new("peer-a-pub".to_string(), Some("session-a".to_string()));
2876 let peer_key = peer_id.to_string();
2877 let peer = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
2878 let peer_ref = peer.mock_ref().expect("mock peer").clone();
2879
2880 manager.state.peers.write().await.insert(
2881 peer_key,
2882 PeerEntry {
2883 peer_id,
2884 direction: PeerDirection::Outbound,
2885 state: ConnectionState::Connected,
2886 last_seen: Instant::now(),
2887 peer: Some(peer),
2888 pool: PeerPool::Other,
2889 transport: PeerTransport::Bluetooth,
2890 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2891 bytes_sent: 0,
2892 bytes_received: 0,
2893 },
2894 );
2895
2896 let (relay_tx, _) = tokio::sync::broadcast::channel(4);
2897 manager
2898 .dispatch_signaling_message(
2899 SignalingMessage::hello(&manager.my_peer_id.uuid),
2900 &relay_tx,
2901 )
2902 .await;
2903
2904 assert_eq!(peer_ref.sent_frame_count().await, 0);
2905 }
2906
2907 #[tokio::test]
2908 async fn failed_peer_cleanup_does_not_hold_peer_map_lock_while_closing() {
2909 let keys = Keys::generate();
2910 let manager = Arc::new(WebRTCManager::new(keys, WebRTCConfig::default()));
2911 let peer_id = PeerId::new("peer-a-pub".to_string(), Some("session-a".to_string()));
2912 let peer_key = peer_id.to_string();
2913
2914 manager.state.peers.write().await.insert(
2915 peer_key.clone(),
2916 PeerEntry {
2917 peer_id: peer_id.clone(),
2918 direction: PeerDirection::Outbound,
2919 state: ConnectionState::Connected,
2920 last_seen: Instant::now(),
2921 peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_close(
2922 Duration::from_millis(200),
2923 ))),
2924 pool: PeerPool::Other,
2925 transport: PeerTransport::Bluetooth,
2926 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2927 bytes_sent: 0,
2928 bytes_received: 0,
2929 },
2930 );
2931
2932 let (relay_tx, _) = tokio::sync::broadcast::channel(4);
2933 let manager_for_task = manager.clone();
2934 let peer_id_for_task = peer_id.clone();
2935 let cleanup_task = tokio::spawn(async move {
2936 manager_for_task
2937 .handle_peer_state_event(PeerStateEvent::Failed(peer_id_for_task), &relay_tx)
2938 .await;
2939 });
2940
2941 tokio::time::sleep(Duration::from_millis(20)).await;
2942
2943 let remaining = tokio::time::timeout(Duration::from_millis(50), async {
2944 manager.state.peers.read().await.len()
2945 })
2946 .await
2947 .expect("peer map read should not block on close");
2948
2949 assert_eq!(remaining, 0);
2950 cleanup_task.await.expect("cleanup task");
2951 }
2952
2953 #[tokio::test]
2954 async fn resolve_root_from_peers_does_not_hold_peer_map_lock_while_querying() {
2955 let keys = Keys::generate();
2956 let manager = Arc::new(WebRTCManager::new(keys.clone(), WebRTCConfig::default()));
2957 let owner_keys = Keys::generate();
2958 let owner_pubkey = owner_keys.public_key().to_hex();
2959 let tree_name = "video";
2960 let hash = "ab".repeat(32);
2961 let event = EventBuilder::new(
2962 Kind::Custom(super::super::root_events::HASHTREE_KIND),
2963 "",
2964 [
2965 Tag::parse(&["d", tree_name]).unwrap(),
2966 Tag::parse(&["l", super::super::root_events::HASHTREE_LABEL]).unwrap(),
2967 Tag::parse(&["hash", &hash]).unwrap(),
2968 ],
2969 )
2970 .to_event(&owner_keys)
2971 .unwrap();
2972
2973 let peer_id = PeerId::new("peer-a-pub".to_string(), Some("session-a".to_string()));
2974 let peer_key = peer_id.to_string();
2975
2976 manager.state.peers.write().await.insert(
2977 peer_key.clone(),
2978 PeerEntry {
2979 peer_id,
2980 direction: PeerDirection::Outbound,
2981 state: ConnectionState::Connected,
2982 last_seen: Instant::now(),
2983 peer: Some(MeshPeer::mock_for_tests(TestMeshPeer::with_delayed_events(
2984 vec![event],
2985 Duration::from_millis(200),
2986 ))),
2987 pool: PeerPool::Other,
2988 transport: PeerTransport::Bluetooth,
2989 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
2990 bytes_sent: 0,
2991 bytes_received: 0,
2992 },
2993 );
2994
2995 let manager_for_task = manager.clone();
2996 let owner_pubkey_for_task = owner_pubkey.clone();
2997 let resolve_task = tokio::spawn(async move {
2998 manager_for_task
2999 .state
3000 .resolve_root_from_peers(
3001 &owner_pubkey_for_task,
3002 tree_name,
3003 Duration::from_millis(500),
3004 )
3005 .await
3006 });
3007
3008 tokio::time::sleep(Duration::from_millis(20)).await;
3009
3010 let manager_for_writer = manager.clone();
3011 let peer_key_for_writer = peer_key.clone();
3012 let writer_task = tokio::spawn(async move {
3013 let mut peers = manager_for_writer.state.peers.write().await;
3014 if let Some(entry) = peers.get_mut(&peer_key_for_writer) {
3015 entry.bytes_received += 1;
3016 }
3017 });
3018
3019 tokio::time::sleep(Duration::from_millis(20)).await;
3020
3021 let status_count = tokio::time::timeout(Duration::from_millis(50), async {
3022 manager.state.peers.read().await.len()
3023 })
3024 .await
3025 .expect("peer map read should not block on root query");
3026
3027 assert_eq!(status_count, 1);
3028 assert!(resolve_task.await.expect("resolve task").is_some());
3029 writer_task.await.expect("writer task");
3030 }
3031
3032 #[test]
3033 fn test_formal_timed_seen_set_rejects_duplicates() {
3034 let mut seen = TimedSeenSet::new(4, Duration::from_secs(60));
3035 assert!(seen.insert_if_new("frame-1".to_string()));
3036 assert!(!seen.insert_if_new("frame-1".to_string()));
3037 assert!(seen.insert_if_new("frame-2".to_string()));
3038 }
3039
3040 #[test]
3041 fn test_formal_timed_seen_set_evicts_oldest_when_capacity_exceeded() {
3042 let mut seen = TimedSeenSet::new(2, Duration::from_secs(60));
3043 assert!(seen.insert_if_new("a".to_string()));
3044 assert!(seen.insert_if_new("b".to_string()));
3045 assert!(seen.insert_if_new("c".to_string()));
3046
3047 assert!(seen.insert_if_new("a".to_string()));
3049 assert!(!seen.insert_if_new("a".to_string()));
3050 }
3051
3052 #[test]
3053 fn test_request_dispatch_normalization_caps_to_available_peers() {
3054 let normalized = normalize_dispatch_config(
3055 RequestDispatchConfig {
3056 initial_fanout: 8,
3057 hedge_fanout: 6,
3058 max_fanout: 5,
3059 hedge_interval_ms: 120,
3060 },
3061 3,
3062 );
3063 assert_eq!(normalized.max_fanout, 3);
3064 assert_eq!(normalized.initial_fanout, 3);
3065 assert_eq!(normalized.hedge_fanout, 3);
3066 }
3067
3068 #[test]
3069 fn test_hedged_wave_plan_matches_dispatch_policy() {
3070 let plan = build_hedged_wave_plan(
3071 7,
3072 RequestDispatchConfig {
3073 initial_fanout: 2,
3074 hedge_fanout: 3,
3075 max_fanout: 6,
3076 hedge_interval_ms: 120,
3077 },
3078 );
3079 assert_eq!(plan, vec![2, 3, 1]);
3080 }
3081}