1use anyhow::Result;
13use futures::{SinkExt, StreamExt};
14use nostr::{nips::nip44, ClientMessage, EventBuilder, Filter, JsonUtil, Keys, Kind, PublicKey, RelayMessage, Tag};
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use tokio::sync::{mpsc, Mutex, RwLock};
19use tokio_tungstenite::{connect_async, tungstenite::Message};
20use tracing::{debug, error, info, warn};
21
22use crate::nostr_relay::NostrRelay;
23use super::peer::{ContentStore, Peer, PendingRequest};
24use super::types::{
25 PeerDirection, PeerId, PeerPool, PeerStateEvent, PeerStatus, SignalingMessage, WebRTCConfig, WEBRTC_KIND, HELLO_TAG,
26};
27
28pub type PeerClassifier = Arc<dyn Fn(&str) -> PeerPool + Send + Sync>;
30
31#[derive(Debug, Clone, PartialEq)]
33pub enum ConnectionState {
34 Discovered,
35 Connecting,
36 Connected,
37 Failed,
38}
39
40impl std::fmt::Display for ConnectionState {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 match self {
43 ConnectionState::Discovered => write!(f, "discovered"),
44 ConnectionState::Connecting => write!(f, "connecting"),
45 ConnectionState::Connected => write!(f, "connected"),
46 ConnectionState::Failed => write!(f, "failed"),
47 }
48 }
49}
50
51pub struct PeerEntry {
53 pub peer_id: PeerId,
54 pub direction: PeerDirection,
55 pub state: ConnectionState,
56 pub last_seen: Instant,
57 pub peer: Option<Peer>,
58 pub pool: PeerPool,
59 pub bytes_sent: u64,
60 pub bytes_received: u64,
61}
62
63pub struct WebRTCState {
65 pub peers: RwLock<HashMap<String, PeerEntry>>,
66 pub connected_count: std::sync::atomic::AtomicUsize,
67 pub bytes_sent: std::sync::atomic::AtomicU64,
69 pub bytes_received: std::sync::atomic::AtomicU64,
71}
72
73impl WebRTCState {
74 pub fn new() -> Self {
75 Self {
76 peers: RwLock::new(HashMap::new()),
77 connected_count: std::sync::atomic::AtomicUsize::new(0),
78 bytes_sent: std::sync::atomic::AtomicU64::new(0),
79 bytes_received: std::sync::atomic::AtomicU64::new(0),
80 }
81 }
82
83 pub fn get_bandwidth(&self) -> (u64, u64) {
85 (
86 self.bytes_sent.load(std::sync::atomic::Ordering::Relaxed),
87 self.bytes_received.load(std::sync::atomic::Ordering::Relaxed),
88 )
89 }
90
91 pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
93 self.bytes_sent.fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
94 if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
95 entry.bytes_sent += bytes;
96 }
97 }
98
99 pub async fn record_received(&self, peer_id: &str, bytes: u64) {
101 self.bytes_received.fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
102 if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
103 entry.bytes_received += bytes;
104 }
105 }
106
107 pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
111 use super::types::{DataRequest, MAX_HTL, encode_request};
112
113 let peers = self.peers.read().await;
114
115 let peer_refs: Vec<_> = peers
118 .values()
119 .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
120 .filter_map(|p| {
121 p.peer.as_ref().map(|peer| {
122 (p.peer_id.short(), peer.data_channel.clone(), peer.pending_requests.clone())
123 })
124 })
125 .collect();
126
127 drop(peers); let mut connected_peers: Vec<(String, Arc<Mutex<HashMap<String, PendingRequest>>>, Arc<webrtc::data_channel::RTCDataChannel>)> = Vec::new();
131 for (peer_id, dc_mutex, pending) in peer_refs {
132 let dc_guard = dc_mutex.lock().await;
133 if let Some(dc) = dc_guard.as_ref() {
134 connected_peers.push((peer_id, pending, dc.clone()));
135 }
136 }
137
138 if connected_peers.is_empty() {
139 debug!("No connected peers to query for {}", &hash_hex[..8.min(hash_hex.len())]);
140 return None;
141 }
142
143 debug!(
144 "Querying {} connected peers for {} (sequential with 500ms delay)",
145 connected_peers.len(),
146 &hash_hex[..8.min(hash_hex.len())]
147 );
148
149 let hash_bytes = match hex::decode(hash_hex) {
151 Ok(b) => b,
152 Err(_) => return None,
153 };
154
155 for (_i, (peer_id, pending_requests, dc)) in connected_peers.into_iter().enumerate() {
157 debug!("Querying peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
158
159 let (tx, rx) = tokio::sync::oneshot::channel();
161
162 {
164 let mut pending = pending_requests.lock().await;
165 pending.insert(
166 hash_hex.to_string(),
167 super::PendingRequest {
168 hash: hash_bytes.clone(),
169 response_tx: tx,
170 },
171 );
172 }
173
174 let req = DataRequest {
176 h: hash_bytes.clone(),
177 htl: MAX_HTL,
178 };
179 if let Ok(wire) = encode_request(&req) {
180 let wire_len = wire.len() as u64;
181 if dc.send(&bytes::Bytes::from(wire)).await.is_ok() {
182 self.record_sent(&peer_id, wire_len).await;
183 match tokio::time::timeout(std::time::Duration::from_millis(500), rx).await {
185 Ok(Ok(Some(data))) => {
186 self.record_received(&peer_id, data.len() as u64).await;
187 debug!("Got response from peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
188 return Some(data);
189 }
190 _ => {
191 debug!("No response from peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
193 }
194 }
195 }
196 }
197
198 let mut pending = pending_requests.lock().await;
200 pending.remove(hash_hex);
201 }
202
203 debug!("No peer had data for {}", &hash_hex[..8.min(hash_hex.len())]);
204 None
205 }
206}
207
208pub struct WebRTCManager {
210 config: WebRTCConfig,
211 my_peer_id: PeerId,
212 keys: Keys,
213 state: Arc<WebRTCState>,
214 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
215 shutdown_rx: tokio::sync::watch::Receiver<bool>,
216 signaling_tx: mpsc::Sender<SignalingMessage>,
218 signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
219 store: Option<Arc<dyn ContentStore>>,
221 peer_classifier: PeerClassifier,
223 nostr_relay: Option<Arc<NostrRelay>>,
225 state_event_tx: mpsc::Sender<PeerStateEvent>,
227 state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
228}
229
230impl WebRTCManager {
231 pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
233 let pubkey = keys.public_key().to_hex();
234 let my_peer_id = PeerId::new(pubkey, None);
235 let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
236 let (signaling_tx, signaling_rx) = mpsc::channel(100);
237 let (state_event_tx, state_event_rx) = mpsc::channel(100);
238
239 let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
241
242 Self {
243 config,
244 my_peer_id,
245 keys,
246 state: Arc::new(WebRTCState::new()),
247 shutdown: Arc::new(shutdown),
248 shutdown_rx,
249 signaling_tx,
250 signaling_rx: Some(signaling_rx),
251 store: None,
252 peer_classifier,
253 nostr_relay: None,
254 state_event_tx,
255 state_event_rx: Some(state_event_rx),
256 }
257 }
258
259 pub fn new_with_classifier(keys: Keys, config: WebRTCConfig, classifier: PeerClassifier) -> Self {
261 let mut manager = Self::new(keys, config);
262 manager.peer_classifier = classifier;
263 manager
264 }
265
266 pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
268 let mut manager = Self::new(keys, config);
269 manager.store = Some(store);
270 manager
271 }
272
273 pub fn new_with_store_and_classifier(
275 keys: Keys,
276 config: WebRTCConfig,
277 store: Arc<dyn ContentStore>,
278 classifier: PeerClassifier,
279 ) -> Self {
280 let mut manager = Self::new(keys, config);
281 manager.store = Some(store);
282 manager.peer_classifier = classifier;
283 manager
284 }
285
286 pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
288 self.store = Some(store);
289 }
290
291 pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
293 self.peer_classifier = classifier;
294 }
295
296 pub fn set_nostr_relay(&mut self, relay: Arc<NostrRelay>) {
298 self.nostr_relay = Some(relay);
299 }
300
301 pub fn my_peer_id(&self) -> &PeerId {
303 &self.my_peer_id
304 }
305
306 pub fn state(&self) -> Arc<WebRTCState> {
308 self.state.clone()
309 }
310
311 pub fn shutdown(&self) {
313 let _ = self.shutdown.send(true);
314 }
315
316 pub async fn connected_count(&self) -> usize {
318 self.state
319 .connected_count
320 .load(std::sync::atomic::Ordering::Relaxed)
321 }
322
323 pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
325 self.state
326 .peers
327 .read()
328 .await
329 .values()
330 .map(|p| PeerStatus {
331 peer_id: p.peer_id.to_string(),
332 pubkey: p.peer_id.pubkey.clone(),
333 state: p.state.to_string(),
334 direction: p.direction,
335 connected_at: Some(p.last_seen),
336 pool: p.pool,
337 })
338 .collect()
339 }
340
341 pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
345 let peers = self.state.peers.read().await;
346 let mut follows_connected = 0;
347 let mut follows_active = 0;
348 let mut other_connected = 0;
349 let mut other_active = 0;
350
351 for entry in peers.values() {
352 let is_active = entry.state == ConnectionState::Connected
355 || entry.state == ConnectionState::Connecting;
356
357 match entry.pool {
358 PeerPool::Follows => {
359 if is_active {
360 follows_active += 1;
361 }
362 if entry.state == ConnectionState::Connected {
363 follows_connected += 1;
364 }
365 }
366 PeerPool::Other => {
367 if is_active {
368 other_active += 1;
369 }
370 if entry.state == ConnectionState::Connected {
371 other_connected += 1;
372 }
373 }
374 }
375 }
376
377 (follows_connected, follows_active, other_connected, other_active)
378 }
379
380 fn can_accept_peer(&self, pool: PeerPool, pool_counts: &(usize, usize, usize, usize)) -> bool {
382 let (_, follows_active, _, other_active) = *pool_counts;
383 match pool {
384 PeerPool::Follows => follows_active < self.config.pools.follows.max_connections,
385 PeerPool::Other => other_active < self.config.pools.other.max_connections,
386 }
387 }
388
389 #[allow(dead_code)]
391 fn is_pool_satisfied(&self, pool: PeerPool, pool_counts: &(usize, usize, usize, usize)) -> bool {
392 let (follows_connected, _, other_connected, _) = *pool_counts;
393 match pool {
394 PeerPool::Follows => follows_connected >= self.config.pools.follows.satisfied_connections,
395 PeerPool::Other => other_connected >= self.config.pools.other.satisfied_connections,
396 }
397 }
398
399 #[allow(dead_code)]
401 fn is_satisfied(&self, pool_counts: &(usize, usize, usize, usize)) -> bool {
402 self.is_pool_satisfied(PeerPool::Follows, pool_counts)
403 && self.is_pool_satisfied(PeerPool::Other, pool_counts)
404 }
405
406 fn should_initiate(&self, their_uuid: &str) -> bool {
409 self.my_peer_id.uuid < their_uuid.to_string()
410 }
411
412 pub async fn run(&mut self) -> Result<()> {
414 info!(
415 "Starting WebRTC manager with peer ID: {}",
416 self.my_peer_id.short()
417 );
418
419 let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr::Event)>(100);
420
421 let mut signaling_rx = self.signaling_rx.take().expect("signaling_rx already taken");
423
424 let mut state_event_rx = self.state_event_rx.take().expect("state_event_rx already taken");
426
427 let (relay_write_tx, _) = tokio::sync::broadcast::channel::<SignalingMessage>(100);
429
430 for relay_url in &self.config.relays {
432 let url = relay_url.clone();
433 let event_tx = event_tx.clone();
434 let shutdown_rx = self.shutdown_rx.clone();
435 let keys = self.keys.clone();
436 let my_peer_id = self.my_peer_id.clone();
437 let hello_interval = Duration::from_millis(self.config.hello_interval_ms);
438 let relay_write_rx = relay_write_tx.subscribe();
439
440 tokio::spawn(async move {
441 if let Err(e) = Self::relay_task(
442 url.clone(),
443 event_tx,
444 shutdown_rx,
445 keys,
446 my_peer_id,
447 hello_interval,
448 relay_write_rx,
449 )
450 .await
451 {
452 error!("Relay {} error: {}", url, e);
453 }
454 });
455 }
456
457 let mut shutdown_rx = self.shutdown_rx.clone();
459 let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
461 loop {
462 tokio::select! {
463 _ = shutdown_rx.changed() => {
464 if *shutdown_rx.borrow() {
465 info!("WebRTC manager shutting down");
466 break;
467 }
468 }
469 Some((relay, event)) = event_rx.recv() => {
470 if let Err(e) = self.handle_event(&relay, &event, &relay_write_tx).await {
471 debug!("Error handling event from {}: {}", relay, e);
472 }
473 }
474 Some(msg) = signaling_rx.recv() => {
475 let _ = relay_write_tx.send(msg);
477 }
478 Some(event) = state_event_rx.recv() => {
479 self.handle_peer_state_event(event).await;
481 }
482 _ = cleanup_interval.tick() => {
483 self.cleanup_stale_peers().await;
485 }
486 }
487 }
488
489 Ok(())
490 }
491
492 async fn relay_task(
494 url: String,
495 event_tx: mpsc::Sender<(String, nostr::Event)>,
496 mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
497 keys: Keys,
498 my_peer_id: PeerId,
499 hello_interval: Duration,
500 mut signaling_rx: tokio::sync::broadcast::Receiver<SignalingMessage>,
501 ) -> Result<()> {
502 info!("Connecting to relay: {}", url);
503
504 let (ws_stream, _) = connect_async(&url).await?;
505 let (mut write, mut read) = ws_stream.split();
506
507 let hello_filter = Filter::new()
511 .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
512 .custom_tag(
513 nostr::SingleLetterTag::lowercase(nostr::Alphabet::L),
514 vec![HELLO_TAG],
515 )
516 .since(nostr::Timestamp::now() - Duration::from_secs(60));
517
518 let directed_filter = Filter::new()
519 .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
520 .custom_tag(
521 nostr::SingleLetterTag::lowercase(nostr::Alphabet::P),
522 vec![keys.public_key().to_hex()],
523 )
524 .since(nostr::Timestamp::now() - Duration::from_secs(60));
525
526 let sub_id = nostr::SubscriptionId::generate();
527 let sub_msg = ClientMessage::req(sub_id.clone(), vec![hello_filter, directed_filter]);
528 write.send(Message::Text(sub_msg.as_json().into())).await?;
529
530 info!("Subscribed to {} for WebRTC events (kind {})", url, WEBRTC_KIND);
531
532 let mut last_hello = Instant::now() - hello_interval; let mut hello_ticker = tokio::time::interval(Duration::from_secs(1));
534
535 loop {
536 tokio::select! {
537 _ = shutdown_rx.changed() => {
538 if *shutdown_rx.borrow() {
539 break;
540 }
541 }
542 _ = hello_ticker.tick() => {
543 if last_hello.elapsed() >= hello_interval {
545 let hello = SignalingMessage::hello(&my_peer_id.uuid);
546 if let Ok(event) = Self::create_signaling_event(&keys, &hello).await {
547 let msg = ClientMessage::event(event);
548 if write.send(Message::Text(msg.as_json().into())).await.is_ok() {
549 debug!("Sent hello to {}", url);
550 }
551 }
552 last_hello = Instant::now();
553 }
554 }
555 Ok(signaling_msg) = signaling_rx.recv() => {
557 info!("Sending {} via {}", signaling_msg.msg_type(), url);
558 if let Ok(event) = Self::create_signaling_event(&keys, &signaling_msg).await {
559 let event_id = event.id.to_string();
560 let msg = ClientMessage::event(event);
561 if write.send(Message::Text(msg.as_json().into())).await.is_ok() {
562 info!("Sent {} to {} (event id: {})", signaling_msg.msg_type(), url, &event_id[..16]);
563 }
564 }
565 }
566 msg = read.next() => {
567 match msg {
568 Some(Ok(Message::Text(text))) => {
569 if let Ok(relay_msg) = RelayMessage::from_json(&text) {
570 if let RelayMessage::Event { event, .. } = relay_msg {
571 let _ = event_tx.send((url.clone(), *event)).await;
572 }
573 }
574 }
575 Some(Err(e)) => {
576 error!("WebSocket error from {}: {}", url, e);
577 break;
578 }
579 None => {
580 warn!("WebSocket closed: {}", url);
581 break;
582 }
583 _ => {}
584 }
585 }
586 }
587 }
588
589 Ok(())
590 }
591
592 async fn create_signaling_event(keys: &Keys, msg: &SignalingMessage) -> Result<nostr::Event> {
598 if let Some(recipient_str) = msg.recipient() {
600 if let Some(peer_id) = PeerId::from_string(recipient_str) {
602 let recipient_pubkey = PublicKey::from_hex(&peer_id.pubkey)?;
603
604 let seal = serde_json::json!({
606 "pubkey": keys.public_key().to_hex(),
607 "kind": WEBRTC_KIND,
608 "content": serde_json::to_string(msg)?,
609 "tags": []
610 });
611
612 let ephemeral_keys = Keys::generate();
614
615 let encrypted_content = nip44::encrypt(
617 ephemeral_keys.secret_key(),
618 &recipient_pubkey,
619 &seal.to_string(),
620 nip44::Version::V2
621 )?;
622
623 let created_at = nostr::Timestamp::now();
625 let expiration = created_at + Duration::from_secs(5 * 60); let tags = vec![
628 Tag::parse(&["p", &recipient_pubkey.to_hex()])?,
629 Tag::parse(&["expiration", &expiration.as_u64().to_string()])?,
630 ];
631
632 let event = EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), encrypted_content, tags)
633 .to_event(&ephemeral_keys)?;
634
635 return Ok(event);
636 }
637 }
638
639 let tags = vec![
641 Tag::parse(&["l", HELLO_TAG])?,
642 Tag::parse(&["peerId", msg.peer_id()])?,
643 ];
644
645 let event = EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), "", tags)
646 .to_event(keys)?;
647
648 Ok(event)
649 }
650
651 async fn handle_event(
657 &self,
658 relay: &str,
659 event: &nostr::Event,
660 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
661 ) -> Result<()> {
662 if event.kind != Kind::Ephemeral(WEBRTC_KIND as u16) {
664 return Ok(());
665 }
666
667 let get_tag = |name: &str| -> Option<String> {
669 event.tags.iter().find_map(|tag| {
670 let v: Vec<String> = tag.clone().to_vec();
671 if v.len() >= 2 && v[0] == name {
672 Some(v[1].clone())
673 } else {
674 None
675 }
676 })
677 };
678
679 let l_tag = get_tag("l");
681 if l_tag.as_deref() == Some(HELLO_TAG) {
682 let sender_pubkey = event.pubkey.to_hex();
683
684 if sender_pubkey == self.my_peer_id.pubkey {
686 return Ok(());
687 }
688
689 if let Some(their_uuid) = get_tag("peerId") {
690 debug!("Received hello from {} via {}", &sender_pubkey[..8], relay);
691 self.handle_hello(&sender_pubkey, &their_uuid, relay_write_tx)
692 .await?;
693 }
694 return Ok(());
695 }
696
697 let p_tag = get_tag("p");
699 if p_tag.as_deref() != Some(&self.keys.public_key().to_hex()) {
700 return Ok(());
702 }
703
704 if event.content.is_empty() {
706 return Ok(());
707 }
708
709 let seal: serde_json::Value = match nip44::decrypt(self.keys.secret_key(), &event.pubkey, &event.content) {
711 Ok(plaintext) => {
712 match serde_json::from_str(&plaintext) {
713 Ok(v) => v,
714 Err(_) => return Ok(()),
715 }
716 }
717 Err(_) => {
718 return Ok(());
720 }
721 };
722
723 let sender_pubkey = seal.get("pubkey")
725 .and_then(|v| v.as_str())
726 .ok_or_else(|| anyhow::anyhow!("Missing pubkey in seal"))?;
727
728 if sender_pubkey == self.my_peer_id.pubkey {
730 return Ok(());
731 }
732
733 let content = seal.get("content")
734 .and_then(|v| v.as_str())
735 .ok_or_else(|| anyhow::anyhow!("Missing content in seal"))?;
736
737 let raw_msg: serde_json::Value = serde_json::from_str(content)?;
738 let msg_type = raw_msg.get("type").and_then(|v| v.as_str()).unwrap_or("");
739
740 if raw_msg.get("targetPeerId").is_some() {
742 let target_peer = raw_msg
743 .get("targetPeerId")
744 .and_then(|v| v.as_str())
745 .unwrap_or("");
746 if target_peer != self.my_peer_id.to_string() {
747 return Ok(());
748 }
749
750 let peer_id = raw_msg.get("peerId").and_then(|v| v.as_str()).unwrap_or("");
751 let their_uuid = peer_id.split(':').nth(1).unwrap_or(peer_id);
752
753 match msg_type {
754 "offer" => {
755 let sdp = raw_msg.get("sdp").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
756 let offer = serde_json::json!({ "type": "offer", "sdp": sdp });
757 self.handle_offer(&sender_pubkey, their_uuid, offer, relay_write_tx).await?;
758 }
759 "answer" => {
760 let sdp = raw_msg.get("sdp").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
761 let answer = serde_json::json!({ "type": "answer", "sdp": sdp });
762 self.handle_answer(&sender_pubkey, their_uuid, answer).await?;
763 }
764 "candidate" => {
765 let candidate = raw_msg.get("candidate").and_then(|v| v.as_str()).unwrap_or("");
766 if !candidate.is_empty() {
767 let candidate_json = serde_json::json!({
768 "candidate": candidate,
769 "sdpMid": raw_msg.get("sdpMid"),
770 "sdpMLineIndex": raw_msg.get("sdpMLineIndex"),
771 });
772 self.handle_candidate(&sender_pubkey, their_uuid, candidate_json).await?;
773 }
774 }
775 "candidates" => {
776 let candidates = raw_msg
777 .get("candidates")
778 .and_then(|v| v.as_array())
779 .map(|entries| {
780 entries.iter().filter_map(|entry| {
781 if let Some(candidate_str) = entry.get("candidate").and_then(|v| v.as_str()).or_else(|| entry.as_str()) {
782 Some(serde_json::json!({
783 "candidate": candidate_str,
784 "sdpMid": entry.get("sdpMid"),
785 "sdpMLineIndex": entry.get("sdpMLineIndex"),
786 }))
787 } else {
788 None
789 }
790 }).collect::<Vec<_>>()
791 })
792 .unwrap_or_default();
793 self.handle_candidates(&sender_pubkey, their_uuid, candidates).await?;
794 }
795 _ => {}
796 }
797
798 return Ok(());
799 }
800
801 let msg: SignalingMessage = serde_json::from_value(raw_msg)?;
802
803 debug!(
804 "Received {} from {} via {} (gift-wrapped)",
805 msg.msg_type(),
806 &sender_pubkey[..8],
807 relay
808 );
809
810 match msg {
811 SignalingMessage::Hello { .. } => {
812 return Ok(());
814 }
815 SignalingMessage::Offer {
816 recipient,
817 peer_id: their_uuid,
818 offer,
819 } => {
820 if recipient != self.my_peer_id.to_string() {
821 return Ok(()); }
823 if let Err(e) = self.handle_offer(&sender_pubkey, &their_uuid, offer, relay_write_tx)
824 .await {
825 error!("handle_offer FAILED: sender={}, uuid={}, error={:?}", &sender_pubkey[..8.min(sender_pubkey.len())], their_uuid, e);
826 return Err(e);
827 }
828 }
829 SignalingMessage::Answer {
830 recipient,
831 peer_id: their_uuid,
832 answer,
833 } => {
834 if recipient != self.my_peer_id.to_string() {
835 return Ok(());
836 }
837 self.handle_answer(&sender_pubkey, &their_uuid, answer)
838 .await?;
839 }
840 SignalingMessage::Candidate {
841 recipient,
842 peer_id: their_uuid,
843 candidate,
844 } => {
845 if recipient != self.my_peer_id.to_string() {
846 return Ok(());
847 }
848 self.handle_candidate(&sender_pubkey, &their_uuid, candidate)
849 .await?;
850 }
851 SignalingMessage::Candidates {
852 recipient,
853 peer_id: their_uuid,
854 candidates,
855 } => {
856 if recipient != self.my_peer_id.to_string() {
857 return Ok(());
858 }
859 self.handle_candidates(&sender_pubkey, &their_uuid, candidates)
860 .await?;
861 }
862 }
863
864 Ok(())
865 }
866
867 async fn handle_hello(
869 &self,
870 sender_pubkey: &str,
871 their_uuid: &str,
872 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
873 ) -> Result<()> {
874 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
875 let peer_key = full_peer_id.to_string();
876
877 {
879 let peers = self.state.peers.read().await;
880 if let Some(entry) = peers.get(&peer_key) {
881 if entry.state == ConnectionState::Connected
883 || entry.state == ConnectionState::Connecting
884 {
885 return Ok(());
886 }
887 }
888 }
889
890 let pool = (self.peer_classifier)(sender_pubkey);
892
893 let pool_counts = self.get_pool_counts().await;
895 if !self.can_accept_peer(pool, &pool_counts) {
896 debug!("Ignoring hello from {} - pool {:?} is full", full_peer_id.short(), pool);
897 return Ok(());
898 }
899
900 let should_initiate = self.should_initiate(their_uuid);
902
903 let pool_satisfied = self.is_pool_satisfied(pool, &pool_counts);
906 let will_initiate = should_initiate && !pool_satisfied;
907
908 info!(
909 "Discovered peer: {} (pool: {:?}, initiate: {}, pool_satisfied: {})",
910 full_peer_id.short(),
911 pool,
912 will_initiate,
913 pool_satisfied
914 );
915
916 if !will_initiate && pool_satisfied {
919 debug!("Pool {:?} is satisfied, not tracking peer {}", pool, full_peer_id.short());
920 return Ok(());
921 }
922
923 {
925 let mut peers = self.state.peers.write().await;
926 peers.insert(
927 peer_key.clone(),
928 PeerEntry {
929 peer_id: full_peer_id.clone(),
930 direction: if will_initiate {
931 PeerDirection::Outbound
932 } else {
933 PeerDirection::Inbound
934 },
935 state: ConnectionState::Discovered,
936 last_seen: Instant::now(),
937 peer: None,
938 pool,
939 bytes_sent: 0,
940 bytes_received: 0,
941 },
942 );
943 }
944
945 if will_initiate {
947 self.initiate_connection(&full_peer_id, pool, relay_write_tx)
948 .await?;
949 }
950
951 Ok(())
952 }
953
954 async fn initiate_connection(
956 &self,
957 peer_id: &PeerId,
958 pool: PeerPool,
959 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
960 ) -> Result<()> {
961 let peer_key = peer_id.to_string();
962
963 info!("Initiating connection to {} (pool: {:?})", peer_id.short(), pool);
964
965 let mut peer = Peer::new_with_store_and_events(
967 peer_id.clone(),
968 PeerDirection::Outbound,
969 self.my_peer_id.clone(),
970 self.signaling_tx.clone(),
971 self.config.stun_servers.clone(),
972 self.store.clone(),
973 Some(self.state_event_tx.clone()),
974 self.nostr_relay.clone(),
975 )
976 .await?;
977
978 peer.setup_handlers().await?;
979
980 let offer = peer.connect().await?;
982
983 {
985 let mut peers = self.state.peers.write().await;
986 if let Some(entry) = peers.get_mut(&peer_key) {
987 entry.state = ConnectionState::Connecting;
988 entry.peer = Some(peer);
989 entry.pool = pool;
990 }
991 }
992
993 let offer_msg = SignalingMessage::Offer {
995 offer,
996 recipient: peer_id.to_string(),
997 peer_id: self.my_peer_id.uuid.clone(),
998 };
999 if relay_write_tx.send(offer_msg).is_err() {
1000 warn!("Failed to broadcast offer to {}", peer_id.short());
1001 }
1002
1003 info!("Sent offer to {}", peer_id.short());
1004
1005 Ok(())
1006 }
1007
1008 async fn handle_offer(
1010 &self,
1011 sender_pubkey: &str,
1012 their_uuid: &str,
1013 offer: serde_json::Value,
1014 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1015 ) -> Result<()> {
1016 debug!("handle_offer ENTRY: sender={}, uuid={}", &sender_pubkey[..8.min(sender_pubkey.len())], their_uuid);
1017 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1018 let peer_key = full_peer_id.to_string();
1019
1020 let pool = (self.peer_classifier)(sender_pubkey);
1022
1023 info!("Received offer from {} (pool: {:?})", full_peer_id.short(), pool);
1024
1025 {
1027 let peers = self.state.peers.read().await;
1028 debug!("Checking for existing peer, peer_key: {}, known_peers: {}", peer_key, peers.len());
1029 if let Some(entry) = peers.get(&peer_key) {
1030 if entry.peer.is_some() {
1032 debug!("Already have peer {} with connection, skipping offer", full_peer_id.short());
1033 return Ok(());
1034 }
1035 debug!("Peer {} exists but has no connection, proceeding", full_peer_id.short());
1036 } else {
1037 debug!("Peer {} not found in peers map, will create new entry", full_peer_id.short());
1038 }
1039 }
1040
1041 let pool_counts = self.get_pool_counts().await;
1043 debug!("Pool counts: {:?}, checking can_accept_peer for {:?}", pool_counts, pool);
1044 if !self.can_accept_peer(pool, &pool_counts) {
1045 warn!("Rejecting offer from {} - pool {:?} is full", full_peer_id.short(), pool);
1046 return Ok(());
1047 }
1048 debug!("Pool check passed for {}", full_peer_id.short());
1049
1050 debug!("Creating peer connection for {}", full_peer_id.short());
1052 let mut peer = Peer::new_with_store_and_events(
1053 full_peer_id.clone(),
1054 PeerDirection::Inbound,
1055 self.my_peer_id.clone(),
1056 self.signaling_tx.clone(),
1057 self.config.stun_servers.clone(),
1058 self.store.clone(),
1059 Some(self.state_event_tx.clone()),
1060 self.nostr_relay.clone(),
1061 )
1062 .await?;
1063 debug!("Peer connection created for {}", full_peer_id.short());
1064
1065 peer.setup_handlers().await?;
1066 debug!("Handlers set up for {}", full_peer_id.short());
1067
1068 let answer = peer.handle_offer(offer).await?;
1070 debug!("Answer created for {}", full_peer_id.short());
1071
1072 {
1074 let mut peers = self.state.peers.write().await;
1075 peers.insert(
1076 peer_key,
1077 PeerEntry {
1078 peer_id: full_peer_id.clone(),
1079 direction: PeerDirection::Inbound,
1080 state: ConnectionState::Connecting,
1081 last_seen: Instant::now(),
1082 peer: Some(peer),
1083 pool,
1084 bytes_sent: 0,
1085 bytes_received: 0,
1086 },
1087 );
1088 }
1089
1090 let answer_msg = SignalingMessage::Answer {
1094 answer,
1095 recipient: full_peer_id.to_string(),
1096 peer_id: self.my_peer_id.uuid.clone(),
1097 };
1098 if relay_write_tx.send(answer_msg).is_err() {
1099 warn!("Failed to send answer to {}", full_peer_id.short());
1100 }
1101 info!("Sent answer to {}", full_peer_id.short());
1102
1103 Ok(())
1104 }
1105
1106 async fn handle_answer(
1108 &self,
1109 sender_pubkey: &str,
1110 their_uuid: &str,
1111 answer: serde_json::Value,
1112 ) -> Result<()> {
1113 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1114 let peer_key = full_peer_id.to_string();
1115
1116 info!("Received answer from {}", full_peer_id.short());
1117
1118 let mut peers = self.state.peers.write().await;
1119 if let Some(entry) = peers.get_mut(&peer_key) {
1120 if entry.state == ConnectionState::Connected {
1122 debug!("Ignoring duplicate answer from {} - already connected", full_peer_id.short());
1123 return Ok(());
1124 }
1125 if let Some(ref mut peer) = entry.peer {
1126 use webrtc::peer_connection::signaling_state::RTCSignalingState;
1128 let signaling_state = peer.signaling_state();
1129 if signaling_state != RTCSignalingState::HaveLocalOffer {
1130 debug!(
1131 "Ignoring answer from {} - signaling state is {:?}, not HaveLocalOffer",
1132 full_peer_id.short(),
1133 signaling_state
1134 );
1135 return Ok(());
1136 }
1137 peer.handle_answer(answer).await?;
1138 info!("Applied answer from {}", full_peer_id.short());
1139 } else {
1140 debug!("Peer {} has no connection object", full_peer_id.short());
1141 }
1142 } else {
1143 debug!("No peer found for key: {}", peer_key);
1144 }
1145
1146 Ok(())
1147 }
1148
1149 async fn handle_candidate(
1151 &self,
1152 sender_pubkey: &str,
1153 their_uuid: &str,
1154 candidate: serde_json::Value,
1155 ) -> Result<()> {
1156 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1157 let peer_key = full_peer_id.to_string();
1158
1159 info!("Received ICE candidate from {}", full_peer_id.short());
1160
1161 let mut peers = self.state.peers.write().await;
1162 if let Some(entry) = peers.get_mut(&peer_key) {
1163 if let Some(ref mut peer) = entry.peer {
1164 peer.handle_candidate(candidate).await?;
1165 }
1166 }
1167
1168 Ok(())
1169 }
1170
1171 async fn handle_candidates(
1173 &self,
1174 sender_pubkey: &str,
1175 their_uuid: &str,
1176 candidates: Vec<serde_json::Value>,
1177 ) -> Result<()> {
1178 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1179 let peer_key = full_peer_id.to_string();
1180
1181 debug!(
1182 "Received {} candidates from {}",
1183 candidates.len(),
1184 full_peer_id.short()
1185 );
1186
1187 let mut peers = self.state.peers.write().await;
1188 if let Some(entry) = peers.get_mut(&peer_key) {
1189 if let Some(ref mut peer) = entry.peer {
1190 for candidate in candidates {
1191 if let Err(e) = peer.handle_candidate(candidate).await {
1192 debug!("Failed to add candidate: {}", e);
1193 }
1194 }
1195 }
1196 }
1197
1198 Ok(())
1199 }
1200
1201 async fn handle_peer_state_event(&self, event: PeerStateEvent) {
1203 match event {
1204 PeerStateEvent::Connected(peer_id) => {
1205 let peer_key = peer_id.to_string();
1206 let mut peers = self.state.peers.write().await;
1207 if let Some(entry) = peers.get_mut(&peer_key) {
1208 if entry.state != ConnectionState::Connected {
1209 info!("Peer {} connected (via state event)", peer_id.short());
1210 entry.state = ConnectionState::Connected;
1211 self.state.connected_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1213 }
1214 }
1215 }
1216 PeerStateEvent::Failed(peer_id) => {
1217 let peer_key = peer_id.to_string();
1218 info!("Peer {} connection failed - removing from pool", peer_id.short());
1219 let mut peers = self.state.peers.write().await;
1220 if let Some(entry) = peers.remove(&peer_key) {
1221 if entry.state == ConnectionState::Connected {
1223 self.state.connected_count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
1224 }
1225 if let Some(peer) = entry.peer {
1227 let _ = peer.close().await;
1228 }
1229 }
1230 }
1231 PeerStateEvent::Disconnected(peer_id) => {
1232 let peer_key = peer_id.to_string();
1233 info!("Peer {} disconnected - removing from pool", peer_id.short());
1234 let mut peers = self.state.peers.write().await;
1235 if let Some(entry) = peers.remove(&peer_key) {
1236 if entry.state == ConnectionState::Connected {
1238 self.state.connected_count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
1239 }
1240 if let Some(peer) = entry.peer {
1242 let _ = peer.close().await;
1243 }
1244 }
1245 }
1246 }
1247 }
1248
1249 async fn cleanup_stale_peers(&self) {
1251 let mut peers = self.state.peers.write().await;
1252 let mut connected_count = 0;
1253 let mut to_remove = Vec::new();
1254 let stale_timeout = Duration::from_secs(60); for (key, entry) in peers.iter_mut() {
1257 if let Some(ref peer) = entry.peer {
1258 if peer.is_connected() {
1260 if entry.state != ConnectionState::Connected {
1261 info!("Peer {} is now connected (sync fallback)", entry.peer_id.short());
1262 entry.state = ConnectionState::Connected;
1263 }
1264 connected_count += 1;
1265 } else if entry.state == ConnectionState::Connecting && entry.last_seen.elapsed() > stale_timeout {
1266 info!("Removing stale peer {} (stuck in Connecting for {:?})", entry.peer_id.short(), entry.last_seen.elapsed());
1268 to_remove.push(key.clone());
1269 }
1270 } else if entry.state == ConnectionState::Discovered && entry.last_seen.elapsed() > stale_timeout {
1271 debug!("Removing stale discovered peer {}", entry.peer_id.short());
1273 to_remove.push(key.clone());
1274 }
1275 }
1276
1277 for key in to_remove {
1279 if let Some(entry) = peers.remove(&key) {
1280 if let Some(peer) = entry.peer {
1281 let _ = peer.close().await;
1282 }
1283 }
1284 }
1285
1286 self.state
1287 .connected_count
1288 .store(connected_count, std::sync::atomic::Ordering::Relaxed);
1289 }
1290}
1291
1292#[allow(dead_code)]
1294#[derive(Debug, Clone)]
1295pub struct PeerState {
1296 pub peer_id: PeerId,
1297 pub direction: PeerDirection,
1298 pub state: String,
1299 pub last_seen: Instant,
1300}