1use anyhow::Result;
13use futures::{SinkExt, StreamExt};
14use hashtree_webrtc::{
15 build_hedged_wave_plan, normalize_dispatch_config, sync_selector_peers, PeerSelector,
16};
17use nostr::{
18 nips::nip44, Alphabet, ClientMessage, EventBuilder, Filter, JsonUtil, Keys, Kind, PublicKey,
19 RelayMessage, SingleLetterTag, Tag,
20};
21use std::collections::HashMap;
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24use tokio::sync::{mpsc, Mutex, RwLock};
25use tokio_tungstenite::{connect_async, tungstenite::Message};
26use tracing::{debug, error, info, warn};
27
28use super::cashu::{CashuMintMetadataStore, CashuQuoteState, CashuRoutingConfig, NegotiatedQuote};
29use super::peer::{ContentStore, Peer, PendingRequest};
30use super::types::{
31 decrement_htl_with_policy, encode_quote_request, encode_request, should_forward_htl,
32 validate_mesh_frame, DataQuoteRequest, DataRequest, MeshNostrFrame, MeshNostrPayload,
33 PeerDirection, PeerId, PeerPool, PeerStateEvent, PeerStatus, RequestDispatchConfig,
34 SignalingMessage, TimedSeenSet, WebRTCConfig, HELLO_TAG, MESH_DEFAULT_HTL, MESH_EVENT_POLICY,
35 WEBRTC_KIND,
36};
37use crate::cashu_helper::CashuPaymentClient;
38use crate::nostr_relay::NostrRelay;
39
40pub type PeerClassifier = Arc<dyn Fn(&str) -> PeerPool + Send + Sync>;
42
43#[derive(Debug, Clone, PartialEq)]
45pub enum ConnectionState {
46 Discovered,
47 Connecting,
48 Connected,
49 Failed,
50}
51
52impl std::fmt::Display for ConnectionState {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 match self {
55 ConnectionState::Discovered => write!(f, "discovered"),
56 ConnectionState::Connecting => write!(f, "connecting"),
57 ConnectionState::Connected => write!(f, "connected"),
58 ConnectionState::Failed => write!(f, "failed"),
59 }
60 }
61}
62
63pub struct PeerEntry {
65 pub peer_id: PeerId,
66 pub direction: PeerDirection,
67 pub state: ConnectionState,
68 pub last_seen: Instant,
69 pub peer: Option<Peer>,
70 pub pool: PeerPool,
71 pub bytes_sent: u64,
72 pub bytes_received: u64,
73}
74
75pub struct WebRTCState {
77 pub peers: RwLock<HashMap<String, PeerEntry>>,
78 pub connected_count: std::sync::atomic::AtomicUsize,
79 pub bytes_sent: std::sync::atomic::AtomicU64,
81 pub bytes_received: std::sync::atomic::AtomicU64,
83 pub mesh_received: std::sync::atomic::AtomicU64,
85 pub mesh_forwarded: std::sync::atomic::AtomicU64,
87 pub mesh_dropped_duplicate: std::sync::atomic::AtomicU64,
89 peer_selector: Arc<RwLock<PeerSelector>>,
91 request_dispatch: RequestDispatchConfig,
93 request_timeout: Duration,
95 cashu_quotes: Arc<CashuQuoteState>,
97}
98
99#[derive(Debug, Clone)]
100pub struct PeerRootEvent {
101 pub hash: String,
102 pub key: Option<String>,
103 pub encrypted_key: Option<String>,
104 pub self_encrypted_key: Option<String>,
105 pub event_id: String,
106 pub created_at: u64,
107 pub peer_id: String,
108}
109
110const HASHTREE_KIND: u16 = 30078;
111const HASHTREE_LABEL: &str = "hashtree";
112const SEEN_FRAME_CAP: usize = 4096;
113const SEEN_FRAME_TTL: Duration = Duration::from_secs(120);
114const SEEN_EVENT_CAP: usize = 8192;
115const SEEN_EVENT_TTL: Duration = Duration::from_secs(600);
116
117type PendingRequestsMap = Arc<Mutex<HashMap<String, PendingRequest>>>;
118type ConnectedPeer = (
119 String,
120 PendingRequestsMap,
121 Arc<webrtc::data_channel::RTCDataChannel>,
122);
123
124fn hashtree_event_identifier(event: &nostr::Event) -> Option<String> {
125 event.tags.iter().find_map(|tag| {
126 let slice = tag.as_slice();
127 if slice.len() >= 2 && slice[0].as_str() == "d" {
128 Some(slice[1].to_string())
129 } else {
130 None
131 }
132 })
133}
134
135fn is_hashtree_labeled_event(event: &nostr::Event) -> bool {
136 event.tags.iter().any(|tag| {
137 let slice = tag.as_slice();
138 slice.len() >= 2 && slice[0].as_str() == "l" && slice[1].as_str() == HASHTREE_LABEL
139 })
140}
141
142fn pick_latest_event<'a, I>(events: I) -> Option<&'a nostr::Event>
143where
144 I: IntoIterator<Item = &'a nostr::Event>,
145{
146 events.into_iter().max_by(|a, b| {
147 let ordering = a.created_at.cmp(&b.created_at);
148 if ordering == std::cmp::Ordering::Equal {
149 a.id.cmp(&b.id)
150 } else {
151 ordering
152 }
153 })
154}
155
156fn root_event_from_peer(
157 event: &nostr::Event,
158 peer_id: &str,
159 tree_name: &str,
160) -> Option<PeerRootEvent> {
161 if hashtree_event_identifier(event).as_deref() != Some(tree_name)
162 || !is_hashtree_labeled_event(event)
163 {
164 return None;
165 }
166
167 let mut key = None;
168 let mut encrypted_key = None;
169 let mut self_encrypted_key = None;
170 let mut hash_tag = None;
171
172 for tag in &event.tags {
173 let slice = tag.as_slice();
174 if slice.len() < 2 {
175 continue;
176 }
177 match slice[0].as_str() {
178 "hash" => hash_tag = Some(slice[1].to_string()),
179 "key" => key = Some(slice[1].to_string()),
180 "encryptedKey" => encrypted_key = Some(slice[1].to_string()),
181 "selfEncryptedKey" => self_encrypted_key = Some(slice[1].to_string()),
182 _ => {}
183 }
184 }
185
186 let hash = hash_tag.or_else(|| {
187 if event.content.is_empty() {
188 None
189 } else {
190 Some(event.content.clone())
191 }
192 })?;
193
194 Some(PeerRootEvent {
195 hash,
196 key,
197 encrypted_key,
198 self_encrypted_key,
199 event_id: event.id.to_hex(),
200 created_at: event.created_at.as_u64(),
201 peer_id: peer_id.to_string(),
202 })
203}
204
205impl WebRTCState {
206 pub fn new() -> Self {
207 let cfg = WebRTCConfig::default();
208 Self::new_with_routing_and_cashu(
209 cfg.request_selection_strategy,
210 cfg.request_fairness_enabled,
211 cfg.request_dispatch,
212 Duration::from_millis(cfg.message_timeout_ms),
213 CashuRoutingConfig::default(),
214 None,
215 None,
216 )
217 }
218
219 pub fn new_with_routing(
220 selection_strategy: super::types::SelectionStrategy,
221 fairness_enabled: bool,
222 request_dispatch: RequestDispatchConfig,
223 ) -> Self {
224 let cfg = WebRTCConfig::default();
225 Self::new_with_routing_and_cashu(
226 selection_strategy,
227 fairness_enabled,
228 request_dispatch,
229 Duration::from_millis(cfg.message_timeout_ms),
230 CashuRoutingConfig::default(),
231 None,
232 None,
233 )
234 }
235
236 pub fn new_with_routing_and_cashu(
237 selection_strategy: super::types::SelectionStrategy,
238 fairness_enabled: bool,
239 request_dispatch: RequestDispatchConfig,
240 request_timeout: Duration,
241 cashu_routing: CashuRoutingConfig,
242 payment_client: Option<Arc<dyn CashuPaymentClient>>,
243 mint_metadata: Option<Arc<CashuMintMetadataStore>>,
244 ) -> Self {
245 let mut selector = PeerSelector::with_strategy(selection_strategy);
246 selector.set_fairness(fairness_enabled);
247 let peer_selector = Arc::new(RwLock::new(selector));
248 let cashu_quotes = Arc::new(if let Some(mint_metadata) = mint_metadata {
249 CashuQuoteState::new_with_mint_metadata(
250 cashu_routing,
251 peer_selector.clone(),
252 payment_client,
253 mint_metadata,
254 )
255 } else {
256 CashuQuoteState::new(cashu_routing, peer_selector.clone(), payment_client)
257 });
258 Self {
259 peers: RwLock::new(HashMap::new()),
260 connected_count: std::sync::atomic::AtomicUsize::new(0),
261 bytes_sent: std::sync::atomic::AtomicU64::new(0),
262 bytes_received: std::sync::atomic::AtomicU64::new(0),
263 mesh_received: std::sync::atomic::AtomicU64::new(0),
264 mesh_forwarded: std::sync::atomic::AtomicU64::new(0),
265 mesh_dropped_duplicate: std::sync::atomic::AtomicU64::new(0),
266 peer_selector,
267 request_dispatch,
268 request_timeout,
269 cashu_quotes,
270 }
271 }
272
273 pub fn get_bandwidth(&self) -> (u64, u64) {
275 (
276 self.bytes_sent.load(std::sync::atomic::Ordering::Relaxed),
277 self.bytes_received
278 .load(std::sync::atomic::Ordering::Relaxed),
279 )
280 }
281
282 pub fn get_mesh_stats(&self) -> (u64, u64, u64) {
283 (
284 self.mesh_received
285 .load(std::sync::atomic::Ordering::Relaxed),
286 self.mesh_forwarded
287 .load(std::sync::atomic::Ordering::Relaxed),
288 self.mesh_dropped_duplicate
289 .load(std::sync::atomic::Ordering::Relaxed),
290 )
291 }
292
293 pub fn record_mesh_received(&self) {
294 self.mesh_received
295 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
296 }
297
298 pub fn record_mesh_forwarded(&self, count: u64) {
299 self.mesh_forwarded
300 .fetch_add(count, std::sync::atomic::Ordering::Relaxed);
301 }
302
303 pub fn record_mesh_duplicate_drop(&self) {
304 self.mesh_dropped_duplicate
305 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
306 }
307
308 pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
310 self.bytes_sent
311 .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
312 if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
313 entry.bytes_sent += bytes;
314 }
315 }
316
317 pub async fn record_received(&self, peer_id: &str, bytes: u64) {
319 self.bytes_received
320 .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
321 if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
322 entry.bytes_received += bytes;
323 }
324 }
325
326 pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
330 self.request_from_peers_with_source(hash_hex)
331 .await
332 .map(|(data, _peer_id)| data)
333 }
334
335 pub async fn request_from_peers_with_source(
337 &self,
338 hash_hex: &str,
339 ) -> Option<(Vec<u8>, String)> {
340 use super::types::BLOB_REQUEST_POLICY;
341 use tokio::sync::oneshot::error::TryRecvError;
342
343 let peers = self.peers.read().await;
344
345 let peer_refs: Vec<_> = peers
348 .values()
349 .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
350 .filter_map(|p| {
351 p.peer.as_ref().map(|peer| {
352 (
353 p.peer_id.to_string(),
354 peer.data_channel.clone(),
355 peer.pending_requests.clone(),
356 )
357 })
358 })
359 .collect();
360
361 drop(peers); let mut connected_peers: Vec<ConnectedPeer> = Vec::new();
365 for (peer_id, dc_mutex, pending) in peer_refs {
366 let dc_guard = dc_mutex.lock().await;
367 if let Some(dc) = dc_guard.as_ref() {
368 connected_peers.push((peer_id, pending, dc.clone()));
369 }
370 }
371
372 if connected_peers.is_empty() {
373 debug!(
374 "No connected peers to query for {}",
375 &hash_hex[..8.min(hash_hex.len())]
376 );
377 return None;
378 }
379
380 let hash_bytes = match hex::decode(hash_hex) {
382 Ok(b) => b,
383 Err(_) => return None,
384 };
385
386 let expected_hash: [u8; 32] = match hash_bytes.as_slice().try_into() {
387 Ok(h) => h,
388 Err(_) => {
389 debug!(
390 "Invalid hash length {}, expected 32 bytes",
391 hash_bytes.len()
392 );
393 return None;
394 }
395 };
396
397 let connected_peer_ids: Vec<String> = connected_peers
398 .iter()
399 .map(|(peer_id, _, _)| peer_id.clone())
400 .collect();
401 sync_selector_peers(self.peer_selector.as_ref(), &connected_peer_ids).await;
402
403 let ordered_peer_ids = self.peer_selector.write().await.select_peers();
404 let mut by_peer: HashMap<
405 String,
406 (
407 PendingRequestsMap,
408 Arc<webrtc::data_channel::RTCDataChannel>,
409 ),
410 > = connected_peers
411 .into_iter()
412 .map(|(peer_id, pending, dc)| (peer_id, (pending, dc)))
413 .collect();
414
415 let mut ordered_peers: Vec<ConnectedPeer> = Vec::new();
416 for peer_id in ordered_peer_ids {
417 if let Some((pending, dc)) = by_peer.remove(&peer_id) {
418 ordered_peers.push((peer_id, pending, dc));
419 }
420 }
421 for (peer_id, (pending, dc)) in by_peer {
422 ordered_peers.push((peer_id, pending, dc));
423 }
424
425 let dispatch = normalize_dispatch_config(self.request_dispatch, ordered_peers.len());
426 let wave_plan = build_hedged_wave_plan(ordered_peers.len(), dispatch);
427 if wave_plan.is_empty() {
428 return None;
429 }
430
431 debug!(
432 "Querying {} peers for {} (strategy order + hedged waves {:?})",
433 ordered_peers.len(),
434 &hash_hex[..8.min(hash_hex.len())],
435 wave_plan
436 );
437
438 if let Some((requested_mint, payment_sat, quote_ttl_ms)) =
439 self.cashu_quotes.requester_quote_terms().await
440 {
441 if let Some(quote) = self
442 .request_quote_from_peers(
443 &hash_bytes,
444 requested_mint,
445 payment_sat,
446 quote_ttl_ms,
447 &ordered_peers,
448 )
449 .await
450 {
451 if let Some(data) = self
452 .request_from_single_peer(
453 hash_hex,
454 &hash_bytes,
455 expected_hash,
456 "e.peer_id,
457 Some("e),
458 &ordered_peers,
459 )
460 .await
461 {
462 debug!(
463 "Got quoted response from peer {} for {}",
464 quote.peer_id,
465 &hash_hex[..8.min(hash_hex.len())]
466 );
467 return Some((data, quote.peer_id));
468 }
469 }
470 }
471
472 let request = DataRequest {
473 h: hash_bytes.clone(),
474 htl: BLOB_REQUEST_POLICY.max_htl,
475 q: None,
476 };
477 let wire = match encode_request(&request) {
478 Ok(w) => w,
479 Err(_) => return None,
480 };
481 let wire_len = wire.len() as u64;
482 let wait_window = Duration::from_millis(dispatch.hedge_interval_ms.max(1));
483
484 let mut next_peer_idx = 0usize;
485 for wave_size in wave_plan {
486 let from = next_peer_idx;
487 let to = (next_peer_idx + wave_size).min(ordered_peers.len());
488 next_peer_idx = to;
489
490 #[allow(clippy::type_complexity)]
491 let mut outstanding: Vec<(
492 String,
493 Arc<Mutex<HashMap<String, PendingRequest>>>,
494 Instant,
495 tokio::sync::oneshot::Receiver<Option<Vec<u8>>>,
496 )> = Vec::new();
497
498 for (peer_id, pending_requests, dc) in &ordered_peers[from..to] {
499 let (tx, rx) = tokio::sync::oneshot::channel();
500 {
501 let mut pending = pending_requests.lock().await;
502 pending.insert(
503 hash_hex.to_string(),
504 PendingRequest::standard(hash_bytes.clone(), tx),
505 );
506 }
507
508 if dc.send(&bytes::Bytes::copy_from_slice(&wire)).await.is_ok() {
509 self.record_sent(peer_id, wire_len).await;
510 self.peer_selector
511 .write()
512 .await
513 .record_request(peer_id, wire_len);
514 outstanding.push((
515 peer_id.clone(),
516 pending_requests.clone(),
517 Instant::now(),
518 rx,
519 ));
520 } else {
521 let mut pending = pending_requests.lock().await;
522 pending.remove(hash_hex);
523 self.peer_selector.write().await.record_failure(peer_id);
524 }
525 }
526
527 if outstanding.is_empty() {
528 continue;
529 }
530
531 let deadline = Instant::now() + wait_window;
532 let mut success: Option<(String, Vec<u8>, u64)> = None;
533 while !outstanding.is_empty() && Instant::now() < deadline {
534 let mut i = 0usize;
535 while i < outstanding.len() {
536 let mut drop_entry = false;
537 let (peer_id, pending_requests, sent_at, rx) = &mut outstanding[i];
538 match rx.try_recv() {
539 Ok(Some(data)) => {
540 let rtt_ms = sent_at.elapsed().as_millis() as u64;
541 if hashtree_core::sha256(&data) == expected_hash {
542 success = Some((peer_id.clone(), data, rtt_ms));
543 break;
544 }
545 self.peer_selector.write().await.record_failure(peer_id);
546 let mut pending = pending_requests.lock().await;
547 pending.remove(hash_hex);
548 drop_entry = true;
549 }
550 Ok(None) => {
551 let mut pending = pending_requests.lock().await;
552 pending.remove(hash_hex);
553 drop_entry = true;
554 }
555 Err(TryRecvError::Closed) => {
556 let mut pending = pending_requests.lock().await;
557 pending.remove(hash_hex);
558 drop_entry = true;
559 }
560 Err(TryRecvError::Empty) => {}
561 }
562
563 if drop_entry {
564 outstanding.swap_remove(i);
565 } else {
566 i += 1;
567 }
568 }
569
570 if success.is_some() {
571 break;
572 }
573
574 let now = Instant::now();
575 if now >= deadline {
576 break;
577 }
578 tokio::time::sleep(Duration::from_millis(10).min(deadline - now)).await;
579 }
580
581 if let Some((peer_id, data, rtt_ms)) = success {
582 self.record_received(&peer_id, data.len() as u64).await;
583 self.peer_selector.write().await.record_success(
584 &peer_id,
585 rtt_ms,
586 data.len() as u64,
587 );
588
589 for (other_peer_id, pending_requests, _, _) in outstanding {
590 if other_peer_id != peer_id {
591 let mut pending = pending_requests.lock().await;
592 pending.remove(hash_hex);
593 }
594 }
595
596 debug!(
597 "Got response from peer {} for {}",
598 peer_id,
599 &hash_hex[..8.min(hash_hex.len())]
600 );
601 return Some((data, peer_id));
602 }
603
604 for (peer_id, pending_requests, _, _) in outstanding {
605 let mut pending = pending_requests.lock().await;
606 pending.remove(hash_hex);
607 self.peer_selector.write().await.record_timeout(&peer_id);
608 }
609 }
610
611 debug!(
612 "No peer had data for {}",
613 &hash_hex[..8.min(hash_hex.len())]
614 );
615 None
616 }
617
618 async fn request_quote_from_peers(
619 &self,
620 hash_bytes: &[u8],
621 requested_mint: String,
622 payment_sat: u64,
623 quote_ttl_ms: u32,
624 ordered_peers: &[ConnectedPeer],
625 ) -> Option<NegotiatedQuote> {
626 if ordered_peers.is_empty() || quote_ttl_ms == 0 {
627 return None;
628 }
629
630 let dispatch = normalize_dispatch_config(self.request_dispatch, ordered_peers.len());
631 let wave_plan = build_hedged_wave_plan(ordered_peers.len(), dispatch);
632 if wave_plan.is_empty() {
633 return None;
634 }
635
636 let hash_hex = hex::encode(hash_bytes);
637 let mut rx = self
638 .cashu_quotes
639 .register_pending_quote(hash_hex.clone(), Some(requested_mint.clone()), payment_sat)
640 .await;
641 let quote_request = DataQuoteRequest {
642 h: hash_bytes.to_vec(),
643 p: payment_sat,
644 t: quote_ttl_ms,
645 m: Some(requested_mint),
646 };
647 let wire = match encode_quote_request("e_request) {
648 Ok(wire) => wire,
649 Err(_) => {
650 self.cashu_quotes.clear_pending_quote(&hash_hex).await;
651 return None;
652 }
653 };
654 let deadline = Instant::now() + self.request_timeout;
655 let mut sent_total = 0usize;
656 let mut next_peer_idx = 0usize;
657
658 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
659 let from = next_peer_idx;
660 let to = (next_peer_idx + wave_size).min(ordered_peers.len());
661 for (_, _, dc) in &ordered_peers[from..to] {
662 if dc.send(&bytes::Bytes::copy_from_slice(&wire)).await.is_ok() {
663 sent_total += 1;
664 }
665 }
666 next_peer_idx = to;
667
668 if sent_total == 0 {
669 if next_peer_idx >= ordered_peers.len() {
670 break;
671 }
672 continue;
673 }
674
675 let now = Instant::now();
676 if now >= deadline {
677 break;
678 }
679 let remaining = deadline.saturating_duration_since(now);
680 let is_last_wave =
681 wave_idx + 1 == wave_plan.len() || next_peer_idx >= ordered_peers.len();
682 let wait = if is_last_wave {
683 remaining
684 } else if dispatch.hedge_interval_ms == 0 {
685 Duration::ZERO
686 } else {
687 Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
688 };
689
690 if wait.is_zero() {
691 continue;
692 }
693
694 match tokio::time::timeout(wait, &mut rx).await {
695 Ok(Ok(Some(quote))) => {
696 self.cashu_quotes.clear_pending_quote(&hash_hex).await;
697 return Some(quote);
698 }
699 Ok(Ok(None)) | Ok(Err(_)) => break,
700 Err(_) => {}
701 }
702 }
703
704 self.cashu_quotes.clear_pending_quote(&hash_hex).await;
705 None
706 }
707
708 async fn request_from_single_peer(
709 &self,
710 hash_hex: &str,
711 hash_bytes: &[u8],
712 expected_hash: [u8; 32],
713 target_peer_id: &str,
714 quote: Option<&NegotiatedQuote>,
715 ordered_peers: &[ConnectedPeer],
716 ) -> Option<Vec<u8>> {
717 use super::types::BLOB_REQUEST_POLICY;
718
719 let (pending_requests, dc) = ordered_peers
720 .iter()
721 .find(|(peer_id, _, _)| peer_id == target_peer_id)
722 .map(|(_, pending_requests, dc)| (pending_requests.clone(), dc.clone()))?;
723
724 let request = DataRequest {
725 h: hash_bytes.to_vec(),
726 htl: BLOB_REQUEST_POLICY.max_htl,
727 q: quote.map(|quote| quote.quote_id),
728 };
729 let wire = encode_request(&request).ok()?;
730 let wire_len = wire.len() as u64;
731 let sent_at = Instant::now();
732 let (tx, mut rx) = tokio::sync::oneshot::channel();
733
734 {
735 let mut pending = pending_requests.lock().await;
736 pending.insert(
737 hash_hex.to_string(),
738 if let Some(quote) = quote {
739 PendingRequest::quoted(
740 hash_bytes.to_vec(),
741 tx,
742 quote.quote_id,
743 quote.mint_url.clone().unwrap_or_default(),
744 quote.payment_sat,
745 )
746 } else {
747 PendingRequest::standard(hash_bytes.to_vec(), tx)
748 },
749 );
750 }
751
752 if dc
753 .send(&bytes::Bytes::copy_from_slice(&wire))
754 .await
755 .is_err()
756 {
757 let mut pending = pending_requests.lock().await;
758 pending.remove(hash_hex);
759 self.peer_selector
760 .write()
761 .await
762 .record_failure(target_peer_id);
763 return None;
764 }
765
766 self.record_sent(target_peer_id, wire_len).await;
767 self.peer_selector
768 .write()
769 .await
770 .record_request(target_peer_id, wire_len);
771
772 let wait_timeout = if let Some(quote) = quote {
773 let multiplier = quote.payment_sat.clamp(1, 32) as u128;
774 let extra_ms = self
775 .cashu_quotes
776 .settlement_timeout()
777 .as_millis()
778 .saturating_mul(multiplier);
779 self.request_timeout + Duration::from_millis(extra_ms.min(u64::MAX as u128) as u64)
780 } else {
781 self.request_timeout
782 };
783
784 match tokio::time::timeout(wait_timeout, &mut rx).await {
785 Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == expected_hash => {
786 let rtt_ms = sent_at.elapsed().as_millis() as u64;
787 self.record_received(target_peer_id, data.len() as u64)
788 .await;
789 self.peer_selector.write().await.record_success(
790 target_peer_id,
791 rtt_ms,
792 data.len() as u64,
793 );
794 Some(data)
795 }
796 Ok(Ok(Some(_))) => {
797 self.peer_selector
798 .write()
799 .await
800 .record_failure(target_peer_id);
801 let pending = pending_requests.lock().await.remove(hash_hex);
802 if let Some(pending) = pending {
803 if let Some(quoted) = pending.quoted {
804 if let Some(in_flight) = quoted.in_flight_payment {
805 let _ = self
806 .cashu_quotes
807 .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
808 .await;
809 }
810 }
811 }
812 None
813 }
814 Ok(Ok(None)) | Ok(Err(_)) | Err(_) => {
815 let pending = pending_requests.lock().await.remove(hash_hex);
816 if let Some(pending) = pending {
817 if let Some(quoted) = pending.quoted {
818 if let Some(in_flight) = quoted.in_flight_payment {
819 let _ = self
820 .cashu_quotes
821 .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
822 .await;
823 }
824 }
825 }
826 self.peer_selector
827 .write()
828 .await
829 .record_timeout(target_peer_id);
830 None
831 }
832 }
833 }
834
835 pub async fn resolve_root_from_peers(
837 &self,
838 owner_pubkey: &str,
839 tree_name: &str,
840 per_peer_timeout: Duration,
841 ) -> Option<PeerRootEvent> {
842 let author = PublicKey::from_hex(owner_pubkey).ok()?;
843 let filter = Filter::new()
844 .kind(Kind::Custom(HASHTREE_KIND))
845 .author(author)
846 .custom_tag(
847 SingleLetterTag::lowercase(Alphabet::D),
848 vec![tree_name.to_string()],
849 )
850 .custom_tag(
851 SingleLetterTag::lowercase(Alphabet::L),
852 vec![HASHTREE_LABEL.to_string()],
853 )
854 .limit(50);
855
856 let peers = self.peers.read().await;
857 for entry in peers.values() {
858 if entry.state != ConnectionState::Connected {
859 continue;
860 }
861 let Some(peer) = entry.peer.as_ref() else {
862 continue;
863 };
864 if !peer.has_data_channel() {
865 continue;
866 }
867
868 debug!(
869 "Querying peer {} for root event {}/{}",
870 entry.peer_id.short(),
871 owner_pubkey,
872 tree_name
873 );
874 let events = match peer
875 .query_nostr_events(vec![filter.clone()], per_peer_timeout)
876 .await
877 {
878 Ok(events) => events,
879 Err(e) => {
880 debug!(
881 "Peer {} Nostr query failed for {}/{}: {}",
882 entry.peer_id.short(),
883 owner_pubkey,
884 tree_name,
885 e
886 );
887 continue;
888 }
889 };
890 debug!(
891 "Peer {} returned {} Nostr event(s) for {}/{}",
892 entry.peer_id.short(),
893 events.len(),
894 owner_pubkey,
895 tree_name
896 );
897
898 let latest = pick_latest_event(events.iter().filter(|event| {
899 hashtree_event_identifier(event).as_deref() == Some(tree_name)
900 && is_hashtree_labeled_event(event)
901 }));
902 if let Some(event) = latest {
903 if let Some(root) = root_event_from_peer(event, &entry.peer_id.short(), tree_name) {
904 debug!(
905 "Resolved {}/{} via peer {} event {}",
906 owner_pubkey,
907 tree_name,
908 entry.peer_id.short(),
909 event.id.to_hex()
910 );
911 return Some(root);
912 }
913 }
914 }
915
916 None
917 }
918}
919
920impl Default for WebRTCState {
921 fn default() -> Self {
922 Self::new()
923 }
924}
925
926pub struct WebRTCManager {
928 config: WebRTCConfig,
929 my_peer_id: PeerId,
930 keys: Keys,
931 state: Arc<WebRTCState>,
932 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
933 shutdown_rx: tokio::sync::watch::Receiver<bool>,
934 signaling_tx: mpsc::Sender<SignalingMessage>,
936 signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
937 store: Option<Arc<dyn ContentStore>>,
939 peer_classifier: PeerClassifier,
941 nostr_relay: Option<Arc<NostrRelay>>,
943 state_event_tx: mpsc::Sender<PeerStateEvent>,
945 state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
946 mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
948 mesh_frame_rx: Option<mpsc::Receiver<(PeerId, MeshNostrFrame)>>,
949 seen_frame_ids: Arc<Mutex<TimedSeenSet>>,
950 seen_event_ids: Arc<Mutex<TimedSeenSet>>,
951}
952
953impl WebRTCManager {
954 pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
956 let pubkey = keys.public_key().to_hex();
957 let my_peer_id = PeerId::new(pubkey, None);
958 let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
959 let (signaling_tx, signaling_rx) = mpsc::channel(100);
960 let (state_event_tx, state_event_rx) = mpsc::channel(100);
961 let (mesh_frame_tx, mesh_frame_rx) = mpsc::channel(256);
962 let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
963 config.request_selection_strategy,
964 config.request_fairness_enabled,
965 config.request_dispatch,
966 Duration::from_millis(config.message_timeout_ms),
967 CashuRoutingConfig::default(),
968 None,
969 None,
970 ));
971
972 let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
974
975 Self {
976 config,
977 my_peer_id,
978 keys,
979 state,
980 shutdown: Arc::new(shutdown),
981 shutdown_rx,
982 signaling_tx,
983 signaling_rx: Some(signaling_rx),
984 store: None,
985 peer_classifier,
986 nostr_relay: None,
987 state_event_tx,
988 state_event_rx: Some(state_event_rx),
989 mesh_frame_tx,
990 mesh_frame_rx: Some(mesh_frame_rx),
991 seen_frame_ids: Arc::new(Mutex::new(TimedSeenSet::new(
992 SEEN_FRAME_CAP,
993 SEEN_FRAME_TTL,
994 ))),
995 seen_event_ids: Arc::new(Mutex::new(TimedSeenSet::new(
996 SEEN_EVENT_CAP,
997 SEEN_EVENT_TTL,
998 ))),
999 }
1000 }
1001
1002 pub fn new_with_classifier(
1004 keys: Keys,
1005 config: WebRTCConfig,
1006 classifier: PeerClassifier,
1007 ) -> Self {
1008 let mut manager = Self::new(keys, config);
1009 manager.peer_classifier = classifier;
1010 manager
1011 }
1012
1013 pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
1015 let mut manager = Self::new(keys, config);
1016 manager.store = Some(store);
1017 manager
1018 }
1019
1020 pub fn new_with_store_and_classifier(
1022 keys: Keys,
1023 config: WebRTCConfig,
1024 store: Arc<dyn ContentStore>,
1025 classifier: PeerClassifier,
1026 ) -> Self {
1027 Self::new_with_store_and_classifier_and_cashu(
1028 keys,
1029 config,
1030 store,
1031 classifier,
1032 CashuRoutingConfig::default(),
1033 None,
1034 None,
1035 )
1036 }
1037
1038 pub fn new_with_store_and_classifier_and_cashu(
1039 keys: Keys,
1040 config: WebRTCConfig,
1041 store: Arc<dyn ContentStore>,
1042 classifier: PeerClassifier,
1043 cashu_routing: CashuRoutingConfig,
1044 payment_client: Option<Arc<dyn CashuPaymentClient>>,
1045 mint_metadata: Option<Arc<CashuMintMetadataStore>>,
1046 ) -> Self {
1047 let mut manager = Self::new(keys, config);
1048 manager.state = Arc::new(WebRTCState::new_with_routing_and_cashu(
1049 manager.config.request_selection_strategy,
1050 manager.config.request_fairness_enabled,
1051 manager.config.request_dispatch,
1052 Duration::from_millis(manager.config.message_timeout_ms),
1053 cashu_routing,
1054 payment_client,
1055 mint_metadata,
1056 ));
1057 manager.store = Some(store);
1058 manager.peer_classifier = classifier;
1059 manager
1060 }
1061
1062 pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
1064 self.store = Some(store);
1065 }
1066
1067 pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
1069 self.peer_classifier = classifier;
1070 }
1071
1072 pub fn set_nostr_relay(&mut self, relay: Arc<NostrRelay>) {
1074 self.nostr_relay = Some(relay);
1075 }
1076
1077 pub fn my_peer_id(&self) -> &PeerId {
1079 &self.my_peer_id
1080 }
1081
1082 pub fn state(&self) -> Arc<WebRTCState> {
1084 self.state.clone()
1085 }
1086
1087 pub fn shutdown(&self) {
1089 let _ = self.shutdown.send(true);
1090 }
1091
1092 pub async fn connected_count(&self) -> usize {
1094 self.state
1095 .connected_count
1096 .load(std::sync::atomic::Ordering::Relaxed)
1097 }
1098
1099 pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
1101 self.state
1102 .peers
1103 .read()
1104 .await
1105 .values()
1106 .map(|p| PeerStatus {
1107 peer_id: p.peer_id.to_string(),
1108 pubkey: p.peer_id.pubkey.clone(),
1109 state: p.state.to_string(),
1110 direction: p.direction,
1111 connected_at: Some(p.last_seen),
1112 pool: p.pool,
1113 })
1114 .collect()
1115 }
1116
1117 pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
1121 let peers = self.state.peers.read().await;
1122 let mut follows_connected = 0;
1123 let mut follows_active = 0;
1124 let mut other_connected = 0;
1125 let mut other_active = 0;
1126
1127 for entry in peers.values() {
1128 let is_active = entry.state == ConnectionState::Connected
1131 || entry.state == ConnectionState::Connecting;
1132
1133 match entry.pool {
1134 PeerPool::Follows => {
1135 if is_active {
1136 follows_active += 1;
1137 }
1138 if entry.state == ConnectionState::Connected {
1139 follows_connected += 1;
1140 }
1141 }
1142 PeerPool::Other => {
1143 if is_active {
1144 other_active += 1;
1145 }
1146 if entry.state == ConnectionState::Connected {
1147 other_connected += 1;
1148 }
1149 }
1150 }
1151 }
1152
1153 (
1154 follows_connected,
1155 follows_active,
1156 other_connected,
1157 other_active,
1158 )
1159 }
1160
1161 fn can_accept_peer(&self, pool: PeerPool, pool_counts: &(usize, usize, usize, usize)) -> bool {
1163 let (_, follows_active, _, other_active) = *pool_counts;
1164 match pool {
1165 PeerPool::Follows => follows_active < self.config.pools.follows.max_connections,
1166 PeerPool::Other => other_active < self.config.pools.other.max_connections,
1167 }
1168 }
1169
1170 #[allow(dead_code)]
1172 fn is_pool_satisfied(
1173 &self,
1174 pool: PeerPool,
1175 pool_counts: &(usize, usize, usize, usize),
1176 ) -> bool {
1177 let (follows_connected, _, other_connected, _) = *pool_counts;
1178 match pool {
1179 PeerPool::Follows => {
1180 follows_connected >= self.config.pools.follows.satisfied_connections
1181 }
1182 PeerPool::Other => other_connected >= self.config.pools.other.satisfied_connections,
1183 }
1184 }
1185
1186 #[allow(dead_code)]
1188 fn is_satisfied(&self, pool_counts: &(usize, usize, usize, usize)) -> bool {
1189 self.is_pool_satisfied(PeerPool::Follows, pool_counts)
1190 && self.is_pool_satisfied(PeerPool::Other, pool_counts)
1191 }
1192
1193 fn should_initiate(&self, their_uuid: &str) -> bool {
1196 self.my_peer_id.uuid.as_str() < their_uuid
1197 }
1198
1199 pub async fn run(&mut self) -> Result<()> {
1201 info!(
1202 "Starting WebRTC manager with peer ID: {}",
1203 self.my_peer_id.short()
1204 );
1205
1206 let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr::Event)>(100);
1207
1208 let mut signaling_rx = self
1210 .signaling_rx
1211 .take()
1212 .expect("signaling_rx already taken");
1213
1214 let mut state_event_rx = self
1216 .state_event_rx
1217 .take()
1218 .expect("state_event_rx already taken");
1219 let mut mesh_frame_rx = self
1220 .mesh_frame_rx
1221 .take()
1222 .expect("mesh_frame_rx already taken");
1223
1224 let (relay_write_tx, _) = tokio::sync::broadcast::channel::<SignalingMessage>(100);
1226
1227 for relay_url in &self.config.relays {
1229 let url = relay_url.clone();
1230 let event_tx = event_tx.clone();
1231 let shutdown_rx = self.shutdown_rx.clone();
1232 let keys = self.keys.clone();
1233 let relay_write_rx = relay_write_tx.subscribe();
1234
1235 tokio::spawn(async move {
1236 if let Err(e) =
1237 Self::relay_task(url.clone(), event_tx, shutdown_rx, keys, relay_write_rx).await
1238 {
1239 error!("Relay {} error: {}", url, e);
1240 }
1241 });
1242 }
1243
1244 let mut shutdown_rx = self.shutdown_rx.clone();
1246 let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
1248 let mut hello_ticker =
1249 tokio::time::interval(Duration::from_millis(self.config.hello_interval_ms));
1250 self.dispatch_signaling_message(
1251 SignalingMessage::hello(&self.my_peer_id.uuid),
1252 &relay_write_tx,
1253 )
1254 .await;
1255 loop {
1256 tokio::select! {
1257 _ = shutdown_rx.changed() => {
1258 if *shutdown_rx.borrow() {
1259 info!("WebRTC manager shutting down");
1260 break;
1261 }
1262 }
1263 Some((relay, event)) = event_rx.recv() => {
1264 if let Err(e) = self.handle_event(&relay, &event, &relay_write_tx).await {
1265 debug!("Error handling event from {}: {}", relay, e);
1266 }
1267 }
1268 Some(msg) = signaling_rx.recv() => {
1269 self.dispatch_signaling_message(msg, &relay_write_tx).await;
1270 }
1271 Some(event) = state_event_rx.recv() => {
1272 self.handle_peer_state_event(event, &relay_write_tx).await;
1274 }
1275 Some((from_peer_id, frame)) = mesh_frame_rx.recv() => {
1276 self.handle_mesh_frame(from_peer_id, frame, &relay_write_tx).await;
1277 }
1278 _ = hello_ticker.tick() => {
1279 self.dispatch_signaling_message(
1280 SignalingMessage::hello(&self.my_peer_id.uuid),
1281 &relay_write_tx,
1282 ).await;
1283 }
1284 _ = cleanup_interval.tick() => {
1285 self.cleanup_stale_peers().await;
1287 }
1288 }
1289 }
1290
1291 Ok(())
1292 }
1293
1294 async fn relay_task(
1296 url: String,
1297 event_tx: mpsc::Sender<(String, nostr::Event)>,
1298 mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
1299 keys: Keys,
1300 mut signaling_rx: tokio::sync::broadcast::Receiver<SignalingMessage>,
1301 ) -> Result<()> {
1302 info!("Connecting to relay: {}", url);
1303
1304 let (ws_stream, _) = connect_async(&url).await?;
1305 let (mut write, mut read) = ws_stream.split();
1306
1307 let hello_filter = Filter::new()
1311 .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
1312 .custom_tag(
1313 nostr::SingleLetterTag::lowercase(nostr::Alphabet::L),
1314 vec![HELLO_TAG],
1315 )
1316 .since(nostr::Timestamp::now() - Duration::from_secs(60));
1317
1318 let directed_filter = Filter::new()
1319 .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
1320 .custom_tag(
1321 nostr::SingleLetterTag::lowercase(nostr::Alphabet::P),
1322 vec![keys.public_key().to_hex()],
1323 )
1324 .since(nostr::Timestamp::now() - Duration::from_secs(60));
1325
1326 let sub_id = nostr::SubscriptionId::generate();
1327 let sub_msg = ClientMessage::req(sub_id.clone(), vec![hello_filter, directed_filter]);
1328 write.send(Message::Text(sub_msg.as_json())).await?;
1329
1330 info!(
1331 "Subscribed to {} for WebRTC events (kind {})",
1332 url, WEBRTC_KIND
1333 );
1334
1335 loop {
1336 tokio::select! {
1337 _ = shutdown_rx.changed() => {
1338 if *shutdown_rx.borrow() {
1339 break;
1340 }
1341 }
1342 Ok(signaling_msg) = signaling_rx.recv() => {
1344 info!("Sending {} via {}", signaling_msg.msg_type(), url);
1345 if let Ok(event) = Self::create_signaling_event(&keys, &signaling_msg).await {
1346 let event_id = event.id.to_string();
1347 let msg = ClientMessage::event(event);
1348 if write.send(Message::Text(msg.as_json())).await.is_ok() {
1349 info!("Sent {} to {} (event id: {})", signaling_msg.msg_type(), url, &event_id[..16]);
1350 }
1351 }
1352 }
1353 msg = read.next() => {
1354 match msg {
1355 Some(Ok(Message::Text(text))) => {
1356 if let Ok(RelayMessage::Event { event, .. }) =
1357 RelayMessage::from_json(&text)
1358 {
1359 let _ = event_tx.send((url.clone(), *event)).await;
1360 }
1361 }
1362 Some(Err(e)) => {
1363 error!("WebSocket error from {}: {}", url, e);
1364 break;
1365 }
1366 None => {
1367 warn!("WebSocket closed: {}", url);
1368 break;
1369 }
1370 _ => {}
1371 }
1372 }
1373 }
1374 }
1375
1376 Ok(())
1377 }
1378
1379 async fn mark_seen_frame_id(&self, frame_id: String) -> bool {
1380 let mut seen = self.seen_frame_ids.lock().await;
1381 seen.insert_if_new(frame_id)
1382 }
1383
1384 async fn mark_seen_event_id(&self, event_id: String) -> bool {
1385 let mut seen = self.seen_event_ids.lock().await;
1386 seen.insert_if_new(event_id)
1387 }
1388
1389 async fn dispatch_signaling_message(
1390 &self,
1391 msg: SignalingMessage,
1392 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1393 ) {
1394 if relay_write_tx.send(msg.clone()).is_err() {
1395 debug!(
1396 "No relay subscribers for signaling message {}",
1397 msg.msg_type()
1398 );
1399 }
1400
1401 let event = match Self::create_signaling_event(&self.keys, &msg).await {
1402 Ok(event) => event,
1403 Err(e) => {
1404 debug!("Failed to create signaling event for mesh dispatch: {}", e);
1405 return;
1406 }
1407 };
1408
1409 let mut frame =
1410 MeshNostrFrame::new_event(event, &self.my_peer_id.to_string(), MESH_DEFAULT_HTL);
1411 if !self.mark_seen_frame_id(frame.frame_id.clone()).await {
1412 self.state.record_mesh_duplicate_drop();
1413 return;
1414 }
1415 if !self.mark_seen_event_id(frame.event().id.to_hex()).await {
1416 self.state.record_mesh_duplicate_drop();
1417 return;
1418 }
1419
1420 frame.sender_peer_id = self.my_peer_id.to_string();
1422 let forwarded = self.forward_mesh_frame(&frame, None).await;
1423 if forwarded > 0 {
1424 self.state.record_mesh_forwarded(forwarded as u64);
1425 }
1426 }
1427
1428 async fn forward_mesh_frame(
1429 &self,
1430 frame: &MeshNostrFrame,
1431 exclude_peer_id: Option<&str>,
1432 ) -> usize {
1433 let peers = self.state.peers.read().await;
1434 let peer_refs: Vec<_> = peers
1435 .values()
1436 .filter(|entry| entry.state == ConnectionState::Connected)
1437 .filter(|entry| {
1438 entry
1439 .peer
1440 .as_ref()
1441 .map(|peer| peer.has_data_channel())
1442 .unwrap_or(false)
1443 })
1444 .filter(|entry| {
1445 exclude_peer_id
1446 .map(|exclude| exclude != entry.peer_id.to_string())
1447 .unwrap_or(true)
1448 })
1449 .filter_map(|entry| {
1450 entry.peer.as_ref().map(|peer| {
1451 (
1452 entry.peer_id.to_string(),
1453 entry.peer_id.short(),
1454 peer.data_channel.clone(),
1455 *peer.htl_config(),
1456 )
1457 })
1458 })
1459 .collect();
1460 drop(peers);
1461
1462 let mut forwarded = 0usize;
1463 for (_peer_key, peer_short, dc_mutex, htl_cfg) in peer_refs {
1464 let next_htl = decrement_htl_with_policy(frame.htl, &MESH_EVENT_POLICY, &htl_cfg);
1465 if !should_forward_htl(next_htl) {
1466 continue;
1467 }
1468
1469 let mut outbound = frame.clone();
1470 outbound.htl = next_htl;
1471 let text = match serde_json::to_string(&outbound) {
1472 Ok(text) => text,
1473 Err(e) => {
1474 debug!("Failed to serialize mesh frame for {}: {}", peer_short, e);
1475 continue;
1476 }
1477 };
1478
1479 let dc_guard = dc_mutex.lock().await;
1480 let Some(dc) = dc_guard.as_ref() else {
1481 continue;
1482 };
1483 if dc.ready_state()
1484 != webrtc::data_channel::data_channel_state::RTCDataChannelState::Open
1485 {
1486 continue;
1487 }
1488 if dc.send_text(text).await.is_ok() {
1489 forwarded += 1;
1490 }
1491 }
1492
1493 forwarded
1494 }
1495
1496 async fn handle_mesh_frame(
1497 &self,
1498 from_peer_id: PeerId,
1499 frame: MeshNostrFrame,
1500 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1501 ) {
1502 if let Err(reason) = validate_mesh_frame(&frame) {
1503 debug!(
1504 "Ignoring mesh frame from {} (invalid: {})",
1505 from_peer_id.short(),
1506 reason
1507 );
1508 return;
1509 }
1510
1511 if !self.mark_seen_frame_id(frame.frame_id.clone()).await {
1512 self.state.record_mesh_duplicate_drop();
1513 return;
1514 }
1515
1516 let event = match &frame.payload {
1517 MeshNostrPayload::Event { event } => event.clone(),
1518 };
1519
1520 if !self.mark_seen_event_id(event.id.to_hex()).await {
1521 self.state.record_mesh_duplicate_drop();
1522 return;
1523 }
1524
1525 if event.verify().is_err() {
1526 debug!(
1527 "Ignoring mesh event from {} due to invalid signature",
1528 from_peer_id.short()
1529 );
1530 return;
1531 }
1532
1533 self.state.record_mesh_received();
1534
1535 if let Err(e) = self.handle_event("mesh", &event, relay_write_tx).await {
1536 debug!(
1537 "Error handling mesh event from {}: {}",
1538 from_peer_id.short(),
1539 e
1540 );
1541 }
1542
1543 let forwarded = self
1544 .forward_mesh_frame(&frame, Some(&from_peer_id.to_string()))
1545 .await;
1546 if forwarded > 0 {
1547 self.state.record_mesh_forwarded(forwarded as u64);
1548 }
1549 }
1550
1551 async fn create_signaling_event(keys: &Keys, msg: &SignalingMessage) -> Result<nostr::Event> {
1557 if let Some(recipient_str) = msg.recipient() {
1559 if let Some(peer_id) = PeerId::from_string(recipient_str) {
1561 let recipient_pubkey = PublicKey::from_hex(&peer_id.pubkey)?;
1562
1563 let seal = serde_json::json!({
1565 "pubkey": keys.public_key().to_hex(),
1566 "kind": WEBRTC_KIND,
1567 "content": serde_json::to_string(msg)?,
1568 "tags": []
1569 });
1570
1571 let ephemeral_keys = Keys::generate();
1573
1574 let encrypted_content = nip44::encrypt(
1576 ephemeral_keys.secret_key(),
1577 &recipient_pubkey,
1578 seal.to_string(),
1579 nip44::Version::V2,
1580 )?;
1581
1582 let created_at = nostr::Timestamp::now();
1584 let expiration = created_at + Duration::from_secs(5 * 60); let tags = vec![
1587 Tag::parse(&["p", &recipient_pubkey.to_hex()])?,
1588 Tag::parse(&["expiration", &expiration.as_u64().to_string()])?,
1589 ];
1590
1591 let event =
1592 EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), encrypted_content, tags)
1593 .to_event(&ephemeral_keys)?;
1594
1595 return Ok(event);
1596 }
1597 }
1598
1599 let tags = vec![
1601 Tag::parse(&["l", HELLO_TAG])?,
1602 Tag::parse(&["peerId", msg.peer_id()])?,
1603 ];
1604
1605 let event =
1606 EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), "", tags).to_event(keys)?;
1607
1608 Ok(event)
1609 }
1610
1611 async fn handle_event(
1617 &self,
1618 relay: &str,
1619 event: &nostr::Event,
1620 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1621 ) -> Result<()> {
1622 if event.kind != Kind::Ephemeral(WEBRTC_KIND as u16) {
1624 return Ok(());
1625 }
1626
1627 let get_tag = |name: &str| -> Option<String> {
1629 event.tags.iter().find_map(|tag| {
1630 let v: Vec<String> = tag.clone().to_vec();
1631 if v.len() >= 2 && v[0] == name {
1632 Some(v[1].clone())
1633 } else {
1634 None
1635 }
1636 })
1637 };
1638
1639 let l_tag = get_tag("l");
1641 if l_tag.as_deref() == Some(HELLO_TAG) {
1642 let sender_pubkey = event.pubkey.to_hex();
1643
1644 if sender_pubkey == self.my_peer_id.pubkey {
1646 return Ok(());
1647 }
1648
1649 if let Some(their_uuid) = get_tag("peerId") {
1650 debug!("Received hello from {} via {}", &sender_pubkey[..8], relay);
1651 self.handle_hello(&sender_pubkey, &their_uuid, relay_write_tx)
1652 .await?;
1653 }
1654 return Ok(());
1655 }
1656
1657 let p_tag = get_tag("p");
1659 if p_tag.as_deref() != Some(&self.keys.public_key().to_hex()) {
1660 return Ok(());
1662 }
1663
1664 if event.content.is_empty() {
1666 return Ok(());
1667 }
1668
1669 let seal: serde_json::Value =
1671 match nip44::decrypt(self.keys.secret_key(), &event.pubkey, &event.content) {
1672 Ok(plaintext) => match serde_json::from_str(&plaintext) {
1673 Ok(v) => v,
1674 Err(_) => return Ok(()),
1675 },
1676 Err(_) => {
1677 return Ok(());
1679 }
1680 };
1681
1682 let sender_pubkey = seal
1684 .get("pubkey")
1685 .and_then(|v| v.as_str())
1686 .ok_or_else(|| anyhow::anyhow!("Missing pubkey in seal"))?;
1687
1688 if sender_pubkey == self.my_peer_id.pubkey {
1690 return Ok(());
1691 }
1692
1693 let content = seal
1694 .get("content")
1695 .and_then(|v| v.as_str())
1696 .ok_or_else(|| anyhow::anyhow!("Missing content in seal"))?;
1697
1698 let raw_msg: serde_json::Value = serde_json::from_str(content)?;
1699 let msg_type = raw_msg.get("type").and_then(|v| v.as_str()).unwrap_or("");
1700
1701 if raw_msg.get("targetPeerId").is_some() {
1703 let target_peer = raw_msg
1704 .get("targetPeerId")
1705 .and_then(|v| v.as_str())
1706 .unwrap_or("");
1707 if target_peer != self.my_peer_id.to_string() {
1708 return Ok(());
1709 }
1710
1711 let peer_id = raw_msg.get("peerId").and_then(|v| v.as_str()).unwrap_or("");
1712 let their_uuid = peer_id.split(':').nth(1).unwrap_or(peer_id);
1713
1714 match msg_type {
1715 "offer" => {
1716 let sdp = raw_msg
1717 .get("sdp")
1718 .and_then(|v| v.as_str())
1719 .ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
1720 let offer = serde_json::json!({ "type": "offer", "sdp": sdp });
1721 self.handle_offer(sender_pubkey, their_uuid, offer, relay_write_tx)
1722 .await?;
1723 }
1724 "answer" => {
1725 let sdp = raw_msg
1726 .get("sdp")
1727 .and_then(|v| v.as_str())
1728 .ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
1729 let answer = serde_json::json!({ "type": "answer", "sdp": sdp });
1730 self.handle_answer(sender_pubkey, their_uuid, answer)
1731 .await?;
1732 }
1733 "candidate" => {
1734 let candidate = raw_msg
1735 .get("candidate")
1736 .and_then(|v| v.as_str())
1737 .unwrap_or("");
1738 if !candidate.is_empty() {
1739 let candidate_json = serde_json::json!({
1740 "candidate": candidate,
1741 "sdpMid": raw_msg.get("sdpMid"),
1742 "sdpMLineIndex": raw_msg.get("sdpMLineIndex"),
1743 });
1744 self.handle_candidate(sender_pubkey, their_uuid, candidate_json)
1745 .await?;
1746 }
1747 }
1748 "candidates" => {
1749 let candidates = raw_msg
1750 .get("candidates")
1751 .and_then(|v| v.as_array())
1752 .map(|entries| {
1753 entries
1754 .iter()
1755 .filter_map(|entry| {
1756 entry
1757 .get("candidate")
1758 .and_then(|v| v.as_str())
1759 .or_else(|| entry.as_str())
1760 .map(|candidate_str| {
1761 serde_json::json!({
1762 "candidate": candidate_str,
1763 "sdpMid": entry.get("sdpMid"),
1764 "sdpMLineIndex": entry.get("sdpMLineIndex"),
1765 })
1766 })
1767 })
1768 .collect::<Vec<_>>()
1769 })
1770 .unwrap_or_default();
1771 self.handle_candidates(sender_pubkey, their_uuid, candidates)
1772 .await?;
1773 }
1774 _ => {}
1775 }
1776
1777 return Ok(());
1778 }
1779
1780 let msg: SignalingMessage = serde_json::from_value(raw_msg)?;
1781
1782 debug!(
1783 "Received {} from {} via {} (gift-wrapped)",
1784 msg.msg_type(),
1785 &sender_pubkey[..8],
1786 relay
1787 );
1788
1789 match msg {
1790 SignalingMessage::Hello { .. } => {
1791 return Ok(());
1793 }
1794 SignalingMessage::Offer {
1795 recipient,
1796 peer_id: their_uuid,
1797 offer,
1798 } => {
1799 if recipient != self.my_peer_id.to_string() {
1800 return Ok(()); }
1802 if let Err(e) = self
1803 .handle_offer(sender_pubkey, &their_uuid, offer, relay_write_tx)
1804 .await
1805 {
1806 error!(
1807 "handle_offer FAILED: sender={}, uuid={}, error={:?}",
1808 &sender_pubkey[..8.min(sender_pubkey.len())],
1809 their_uuid,
1810 e
1811 );
1812 return Err(e);
1813 }
1814 }
1815 SignalingMessage::Answer {
1816 recipient,
1817 peer_id: their_uuid,
1818 answer,
1819 } => {
1820 if recipient != self.my_peer_id.to_string() {
1821 return Ok(());
1822 }
1823 self.handle_answer(sender_pubkey, &their_uuid, answer)
1824 .await?;
1825 }
1826 SignalingMessage::Candidate {
1827 recipient,
1828 peer_id: their_uuid,
1829 candidate,
1830 } => {
1831 if recipient != self.my_peer_id.to_string() {
1832 return Ok(());
1833 }
1834 self.handle_candidate(sender_pubkey, &their_uuid, candidate)
1835 .await?;
1836 }
1837 SignalingMessage::Candidates {
1838 recipient,
1839 peer_id: their_uuid,
1840 candidates,
1841 } => {
1842 if recipient != self.my_peer_id.to_string() {
1843 return Ok(());
1844 }
1845 self.handle_candidates(sender_pubkey, &their_uuid, candidates)
1846 .await?;
1847 }
1848 }
1849
1850 Ok(())
1851 }
1852
1853 async fn handle_hello(
1855 &self,
1856 sender_pubkey: &str,
1857 their_uuid: &str,
1858 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1859 ) -> Result<()> {
1860 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1861 let peer_key = full_peer_id.to_string();
1862 let mut already_discovered = false;
1863
1864 {
1866 let peers = self.state.peers.read().await;
1867 if let Some(entry) = peers.get(&peer_key) {
1868 if entry.state == ConnectionState::Connected
1870 || entry.state == ConnectionState::Connecting
1871 {
1872 return Ok(());
1873 }
1874 already_discovered = true;
1875 }
1876 }
1877
1878 let pool = (self.peer_classifier)(sender_pubkey);
1880
1881 let pool_counts = self.get_pool_counts().await;
1883 if !self.can_accept_peer(pool, &pool_counts) {
1884 debug!(
1885 "Ignoring hello from {} - pool {:?} is full",
1886 full_peer_id.short(),
1887 pool
1888 );
1889 return Ok(());
1890 }
1891
1892 let should_initiate = self.should_initiate(their_uuid);
1894
1895 let pool_satisfied = self.is_pool_satisfied(pool, &pool_counts);
1898 let will_initiate = should_initiate && !pool_satisfied;
1899
1900 info!(
1901 "Discovered peer: {} (pool: {:?}, initiate: {}, pool_satisfied: {})",
1902 full_peer_id.short(),
1903 pool,
1904 will_initiate,
1905 pool_satisfied
1906 );
1907
1908 if !will_initiate && pool_satisfied {
1911 debug!(
1912 "Pool {:?} is satisfied, not tracking peer {}",
1913 pool,
1914 full_peer_id.short()
1915 );
1916 return Ok(());
1917 }
1918
1919 {
1921 let mut peers = self.state.peers.write().await;
1922 peers.insert(
1923 peer_key.clone(),
1924 PeerEntry {
1925 peer_id: full_peer_id.clone(),
1926 direction: if will_initiate {
1927 PeerDirection::Outbound
1928 } else {
1929 PeerDirection::Inbound
1930 },
1931 state: ConnectionState::Discovered,
1932 last_seen: Instant::now(),
1933 peer: None,
1934 pool,
1935 bytes_sent: 0,
1936 bytes_received: 0,
1937 },
1938 );
1939 }
1940
1941 if !will_initiate && !already_discovered {
1944 self.dispatch_signaling_message(
1945 SignalingMessage::hello(&self.my_peer_id.uuid),
1946 relay_write_tx,
1947 )
1948 .await;
1949 }
1950
1951 if will_initiate {
1953 self.initiate_connection(&full_peer_id, pool, relay_write_tx)
1954 .await?;
1955 }
1956
1957 Ok(())
1958 }
1959
1960 async fn initiate_connection(
1962 &self,
1963 peer_id: &PeerId,
1964 pool: PeerPool,
1965 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1966 ) -> Result<()> {
1967 let peer_key = peer_id.to_string();
1968
1969 info!(
1970 "Initiating connection to {} (pool: {:?})",
1971 peer_id.short(),
1972 pool
1973 );
1974
1975 let mut peer = Peer::new_with_store_and_events(
1977 peer_id.clone(),
1978 PeerDirection::Outbound,
1979 self.my_peer_id.clone(),
1980 self.signaling_tx.clone(),
1981 self.config.stun_servers.clone(),
1982 self.store.clone(),
1983 Some(self.state_event_tx.clone()),
1984 self.nostr_relay.clone(),
1985 Some(self.mesh_frame_tx.clone()),
1986 Some(self.state.cashu_quotes.clone()),
1987 )
1988 .await?;
1989
1990 peer.setup_handlers().await?;
1991
1992 let offer = peer.connect().await?;
1994
1995 {
1997 let mut peers = self.state.peers.write().await;
1998 if let Some(entry) = peers.get_mut(&peer_key) {
1999 entry.state = ConnectionState::Connecting;
2000 entry.peer = Some(peer);
2001 entry.pool = pool;
2002 }
2003 }
2004
2005 let offer_msg = SignalingMessage::Offer {
2007 offer,
2008 recipient: peer_id.to_string(),
2009 peer_id: self.my_peer_id.uuid.clone(),
2010 };
2011 self.dispatch_signaling_message(offer_msg, relay_write_tx)
2012 .await;
2013
2014 info!("Sent offer to {}", peer_id.short());
2015
2016 Ok(())
2017 }
2018
2019 async fn handle_offer(
2021 &self,
2022 sender_pubkey: &str,
2023 their_uuid: &str,
2024 offer: serde_json::Value,
2025 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
2026 ) -> Result<()> {
2027 debug!(
2028 "handle_offer ENTRY: sender={}, uuid={}",
2029 &sender_pubkey[..8.min(sender_pubkey.len())],
2030 their_uuid
2031 );
2032 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
2033 let peer_key = full_peer_id.to_string();
2034
2035 let pool = (self.peer_classifier)(sender_pubkey);
2037
2038 info!(
2039 "Received offer from {} (pool: {:?})",
2040 full_peer_id.short(),
2041 pool
2042 );
2043
2044 {
2046 let peers = self.state.peers.read().await;
2047 debug!(
2048 "Checking for existing peer, peer_key: {}, known_peers: {}",
2049 peer_key,
2050 peers.len()
2051 );
2052 if let Some(entry) = peers.get(&peer_key) {
2053 if entry.peer.is_some() {
2055 debug!(
2056 "Already have peer {} with connection, skipping offer",
2057 full_peer_id.short()
2058 );
2059 return Ok(());
2060 }
2061 debug!(
2062 "Peer {} exists but has no connection, proceeding",
2063 full_peer_id.short()
2064 );
2065 } else {
2066 debug!(
2067 "Peer {} not found in peers map, will create new entry",
2068 full_peer_id.short()
2069 );
2070 }
2071 }
2072
2073 let pool_counts = self.get_pool_counts().await;
2075 debug!(
2076 "Pool counts: {:?}, checking can_accept_peer for {:?}",
2077 pool_counts, pool
2078 );
2079 if !self.can_accept_peer(pool, &pool_counts) {
2080 warn!(
2081 "Rejecting offer from {} - pool {:?} is full",
2082 full_peer_id.short(),
2083 pool
2084 );
2085 return Ok(());
2086 }
2087 debug!("Pool check passed for {}", full_peer_id.short());
2088
2089 debug!("Creating peer connection for {}", full_peer_id.short());
2091 let mut peer = Peer::new_with_store_and_events(
2092 full_peer_id.clone(),
2093 PeerDirection::Inbound,
2094 self.my_peer_id.clone(),
2095 self.signaling_tx.clone(),
2096 self.config.stun_servers.clone(),
2097 self.store.clone(),
2098 Some(self.state_event_tx.clone()),
2099 self.nostr_relay.clone(),
2100 Some(self.mesh_frame_tx.clone()),
2101 Some(self.state.cashu_quotes.clone()),
2102 )
2103 .await?;
2104 debug!("Peer connection created for {}", full_peer_id.short());
2105
2106 peer.setup_handlers().await?;
2107 debug!("Handlers set up for {}", full_peer_id.short());
2108
2109 let answer = peer.handle_offer(offer).await?;
2111 debug!("Answer created for {}", full_peer_id.short());
2112
2113 {
2115 let mut peers = self.state.peers.write().await;
2116 peers.insert(
2117 peer_key,
2118 PeerEntry {
2119 peer_id: full_peer_id.clone(),
2120 direction: PeerDirection::Inbound,
2121 state: ConnectionState::Connecting,
2122 last_seen: Instant::now(),
2123 peer: Some(peer),
2124 pool,
2125 bytes_sent: 0,
2126 bytes_received: 0,
2127 },
2128 );
2129 }
2130
2131 let answer_msg = SignalingMessage::Answer {
2135 answer,
2136 recipient: full_peer_id.to_string(),
2137 peer_id: self.my_peer_id.uuid.clone(),
2138 };
2139 self.dispatch_signaling_message(answer_msg, relay_write_tx)
2140 .await;
2141 info!("Sent answer to {}", full_peer_id.short());
2142
2143 Ok(())
2144 }
2145
2146 async fn handle_answer(
2148 &self,
2149 sender_pubkey: &str,
2150 their_uuid: &str,
2151 answer: serde_json::Value,
2152 ) -> Result<()> {
2153 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
2154 let peer_key = full_peer_id.to_string();
2155
2156 info!("Received answer from {}", full_peer_id.short());
2157
2158 let mut peers = self.state.peers.write().await;
2159 if let Some(entry) = peers.get_mut(&peer_key) {
2160 if entry.state == ConnectionState::Connected {
2162 debug!(
2163 "Ignoring duplicate answer from {} - already connected",
2164 full_peer_id.short()
2165 );
2166 return Ok(());
2167 }
2168 if let Some(ref mut peer) = entry.peer {
2169 use webrtc::peer_connection::signaling_state::RTCSignalingState;
2171 let signaling_state = peer.signaling_state();
2172 if signaling_state != RTCSignalingState::HaveLocalOffer {
2173 debug!(
2174 "Ignoring answer from {} - signaling state is {:?}, not HaveLocalOffer",
2175 full_peer_id.short(),
2176 signaling_state
2177 );
2178 return Ok(());
2179 }
2180 peer.handle_answer(answer).await?;
2181 info!("Applied answer from {}", full_peer_id.short());
2182 } else {
2183 debug!("Peer {} has no connection object", full_peer_id.short());
2184 }
2185 } else {
2186 debug!("No peer found for key: {}", peer_key);
2187 }
2188
2189 Ok(())
2190 }
2191
2192 async fn handle_candidate(
2194 &self,
2195 sender_pubkey: &str,
2196 their_uuid: &str,
2197 candidate: serde_json::Value,
2198 ) -> Result<()> {
2199 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
2200 let peer_key = full_peer_id.to_string();
2201
2202 info!("Received ICE candidate from {}", full_peer_id.short());
2203
2204 let mut peers = self.state.peers.write().await;
2205 if let Some(entry) = peers.get_mut(&peer_key) {
2206 if let Some(ref mut peer) = entry.peer {
2207 peer.handle_candidate(candidate).await?;
2208 }
2209 }
2210
2211 Ok(())
2212 }
2213
2214 async fn handle_candidates(
2216 &self,
2217 sender_pubkey: &str,
2218 their_uuid: &str,
2219 candidates: Vec<serde_json::Value>,
2220 ) -> Result<()> {
2221 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
2222 let peer_key = full_peer_id.to_string();
2223
2224 debug!(
2225 "Received {} candidates from {}",
2226 candidates.len(),
2227 full_peer_id.short()
2228 );
2229
2230 let mut peers = self.state.peers.write().await;
2231 if let Some(entry) = peers.get_mut(&peer_key) {
2232 if let Some(ref mut peer) = entry.peer {
2233 for candidate in candidates {
2234 if let Err(e) = peer.handle_candidate(candidate).await {
2235 debug!("Failed to add candidate: {}", e);
2236 }
2237 }
2238 }
2239 }
2240
2241 Ok(())
2242 }
2243
2244 async fn handle_peer_state_event(
2246 &self,
2247 event: PeerStateEvent,
2248 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
2249 ) {
2250 match event {
2251 PeerStateEvent::Connected(peer_id) => {
2252 let peer_key = peer_id.to_string();
2253 let mut emit_hello = false;
2254 let mut peers = self.state.peers.write().await;
2255 if let Some(entry) = peers.get_mut(&peer_key) {
2256 if entry.state != ConnectionState::Connected {
2257 info!("Peer {} connected (via state event)", peer_id.short());
2258 entry.state = ConnectionState::Connected;
2259 emit_hello = true;
2260 self.state
2262 .connected_count
2263 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2264 }
2265 }
2266 drop(peers);
2267 if emit_hello {
2268 self.dispatch_signaling_message(
2269 SignalingMessage::hello(&self.my_peer_id.uuid),
2270 relay_write_tx,
2271 )
2272 .await;
2273 }
2274 }
2275 PeerStateEvent::Failed(peer_id) => {
2276 let peer_key = peer_id.to_string();
2277 info!(
2278 "Peer {} connection failed - removing from pool",
2279 peer_id.short()
2280 );
2281 let mut peers = self.state.peers.write().await;
2282 if let Some(entry) = peers.remove(&peer_key) {
2283 if entry.state == ConnectionState::Connected {
2285 self.state
2286 .connected_count
2287 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
2288 }
2289 if let Some(peer) = entry.peer {
2291 let _ = peer.close().await;
2292 }
2293 }
2294 }
2295 PeerStateEvent::Disconnected(peer_id) => {
2296 let peer_key = peer_id.to_string();
2297 info!("Peer {} disconnected - removing from pool", peer_id.short());
2298 let mut peers = self.state.peers.write().await;
2299 if let Some(entry) = peers.remove(&peer_key) {
2300 if entry.state == ConnectionState::Connected {
2302 self.state
2303 .connected_count
2304 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
2305 }
2306 if let Some(peer) = entry.peer {
2308 let _ = peer.close().await;
2309 }
2310 }
2311 }
2312 }
2313 }
2314
2315 async fn cleanup_stale_peers(&self) {
2317 let mut peers = self.state.peers.write().await;
2318 let mut connected_count = 0;
2319 let mut to_remove = Vec::new();
2320 let stale_timeout = Duration::from_secs(60); for (key, entry) in peers.iter_mut() {
2323 if let Some(ref peer) = entry.peer {
2324 if peer.is_connected() {
2326 if entry.state != ConnectionState::Connected {
2327 info!(
2328 "Peer {} is now connected (sync fallback)",
2329 entry.peer_id.short()
2330 );
2331 entry.state = ConnectionState::Connected;
2332 }
2333 connected_count += 1;
2334 } else if entry.state == ConnectionState::Connecting
2335 && entry.last_seen.elapsed() > stale_timeout
2336 {
2337 info!(
2339 "Removing stale peer {} (stuck in Connecting for {:?})",
2340 entry.peer_id.short(),
2341 entry.last_seen.elapsed()
2342 );
2343 to_remove.push(key.clone());
2344 }
2345 } else if entry.state == ConnectionState::Discovered
2346 && entry.last_seen.elapsed() > stale_timeout
2347 {
2348 debug!("Removing stale discovered peer {}", entry.peer_id.short());
2350 to_remove.push(key.clone());
2351 }
2352 }
2353
2354 for key in to_remove {
2356 if let Some(entry) = peers.remove(&key) {
2357 if let Some(peer) = entry.peer {
2358 let _ = peer.close().await;
2359 }
2360 }
2361 }
2362
2363 self.state
2364 .connected_count
2365 .store(connected_count, std::sync::atomic::Ordering::Relaxed);
2366 }
2367}
2368
2369#[allow(dead_code)]
2371#[derive(Debug, Clone)]
2372pub struct PeerState {
2373 pub peer_id: PeerId,
2374 pub direction: PeerDirection,
2375 pub state: String,
2376 pub last_seen: Instant,
2377}
2378
2379#[cfg(test)]
2380mod tests {
2381 use super::*;
2382 use nostr::{EventBuilder, Keys, Tag};
2383
2384 #[test]
2385 fn root_event_from_peer_extracts_tags() {
2386 let keys = Keys::generate();
2387 let hash = "ab".repeat(32);
2388 let event = EventBuilder::new(
2389 Kind::Custom(HASHTREE_KIND),
2390 "",
2391 [
2392 Tag::parse(&["d", "repo"]).unwrap(),
2393 Tag::parse(&["l", HASHTREE_LABEL]).unwrap(),
2394 Tag::parse(&["hash", &hash]).unwrap(),
2395 Tag::parse(&["encryptedKey", &"11".repeat(32)]).unwrap(),
2396 ],
2397 )
2398 .to_event(&keys)
2399 .unwrap();
2400
2401 let parsed = root_event_from_peer(&event, "peer-a", "repo").unwrap();
2402 let expected_encrypted = "11".repeat(32);
2403 assert_eq!(parsed.hash, hash);
2404 assert_eq!(parsed.peer_id, "peer-a");
2405 assert_eq!(
2406 parsed.encrypted_key.as_deref(),
2407 Some(expected_encrypted.as_str())
2408 );
2409 assert!(parsed.key.is_none());
2410 }
2411
2412 #[test]
2413 fn pick_latest_event_prefers_higher_event_id_on_timestamp_tie() {
2414 let keys = Keys::generate();
2415 let created_at = nostr::Timestamp::from_secs(1_700_000_000);
2416 let event_a = EventBuilder::new(Kind::Custom(HASHTREE_KIND), "", [])
2417 .custom_created_at(created_at)
2418 .to_event(&keys)
2419 .unwrap();
2420 let event_b = EventBuilder::new(Kind::Custom(HASHTREE_KIND), "", [])
2421 .custom_created_at(created_at)
2422 .to_event(&keys)
2423 .unwrap();
2424
2425 let expected = if event_a.id > event_b.id {
2426 event_a.id
2427 } else {
2428 event_b.id
2429 };
2430 let picked = pick_latest_event([&event_a, &event_b]).unwrap();
2431 assert_eq!(picked.id, expected);
2432 }
2433
2434 #[test]
2435 fn test_formal_timed_seen_set_rejects_duplicates() {
2436 let mut seen = TimedSeenSet::new(4, Duration::from_secs(60));
2437 assert!(seen.insert_if_new("frame-1".to_string()));
2438 assert!(!seen.insert_if_new("frame-1".to_string()));
2439 assert!(seen.insert_if_new("frame-2".to_string()));
2440 }
2441
2442 #[test]
2443 fn test_formal_timed_seen_set_evicts_oldest_when_capacity_exceeded() {
2444 let mut seen = TimedSeenSet::new(2, Duration::from_secs(60));
2445 assert!(seen.insert_if_new("a".to_string()));
2446 assert!(seen.insert_if_new("b".to_string()));
2447 assert!(seen.insert_if_new("c".to_string()));
2448
2449 assert!(seen.insert_if_new("a".to_string()));
2451 assert!(!seen.insert_if_new("a".to_string()));
2452 }
2453
2454 #[test]
2455 fn test_request_dispatch_normalization_caps_to_available_peers() {
2456 let normalized = normalize_dispatch_config(
2457 RequestDispatchConfig {
2458 initial_fanout: 8,
2459 hedge_fanout: 6,
2460 max_fanout: 5,
2461 hedge_interval_ms: 120,
2462 },
2463 3,
2464 );
2465 assert_eq!(normalized.max_fanout, 3);
2466 assert_eq!(normalized.initial_fanout, 3);
2467 assert_eq!(normalized.hedge_fanout, 3);
2468 }
2469
2470 #[test]
2471 fn test_hedged_wave_plan_matches_dispatch_policy() {
2472 let plan = build_hedged_wave_plan(
2473 7,
2474 RequestDispatchConfig {
2475 initial_fanout: 2,
2476 hedge_fanout: 3,
2477 max_fanout: 6,
2478 hedge_interval_ms: 120,
2479 },
2480 );
2481 assert_eq!(plan, vec![2, 3, 1]);
2482 }
2483}