1use anyhow::Result;
13use futures::{SinkExt, StreamExt};
14use nostr::{
15 nips::nip44, ClientMessage, EventBuilder, Filter, JsonUtil, Keys, Kind, PublicKey,
16 RelayMessage, Tag,
17};
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21use tokio::sync::{mpsc, Mutex, RwLock};
22use tokio_tungstenite::{connect_async, tungstenite::Message};
23use tracing::{debug, error, info, warn};
24
25use super::peer::{ContentStore, Peer, PendingRequest};
26use super::types::{
27 PeerDirection, PeerId, PeerPool, PeerStateEvent, PeerStatus, SignalingMessage, WebRTCConfig,
28 HELLO_TAG, WEBRTC_KIND,
29};
30use crate::nostr_relay::NostrRelay;
31
32pub type PeerClassifier = Arc<dyn Fn(&str) -> PeerPool + Send + Sync>;
34
35#[derive(Debug, Clone, PartialEq)]
37pub enum ConnectionState {
38 Discovered,
39 Connecting,
40 Connected,
41 Failed,
42}
43
44impl std::fmt::Display for ConnectionState {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self {
47 ConnectionState::Discovered => write!(f, "discovered"),
48 ConnectionState::Connecting => write!(f, "connecting"),
49 ConnectionState::Connected => write!(f, "connected"),
50 ConnectionState::Failed => write!(f, "failed"),
51 }
52 }
53}
54
55pub struct PeerEntry {
57 pub peer_id: PeerId,
58 pub direction: PeerDirection,
59 pub state: ConnectionState,
60 pub last_seen: Instant,
61 pub peer: Option<Peer>,
62 pub pool: PeerPool,
63 pub bytes_sent: u64,
64 pub bytes_received: u64,
65}
66
67pub struct WebRTCState {
69 pub peers: RwLock<HashMap<String, PeerEntry>>,
70 pub connected_count: std::sync::atomic::AtomicUsize,
71 pub bytes_sent: std::sync::atomic::AtomicU64,
73 pub bytes_received: std::sync::atomic::AtomicU64,
75}
76
77impl WebRTCState {
78 pub fn new() -> Self {
79 Self {
80 peers: RwLock::new(HashMap::new()),
81 connected_count: std::sync::atomic::AtomicUsize::new(0),
82 bytes_sent: std::sync::atomic::AtomicU64::new(0),
83 bytes_received: std::sync::atomic::AtomicU64::new(0),
84 }
85 }
86
87 pub fn get_bandwidth(&self) -> (u64, u64) {
89 (
90 self.bytes_sent.load(std::sync::atomic::Ordering::Relaxed),
91 self.bytes_received
92 .load(std::sync::atomic::Ordering::Relaxed),
93 )
94 }
95
96 pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
98 self.bytes_sent
99 .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
100 if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
101 entry.bytes_sent += bytes;
102 }
103 }
104
105 pub async fn record_received(&self, peer_id: &str, bytes: u64) {
107 self.bytes_received
108 .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
109 if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
110 entry.bytes_received += bytes;
111 }
112 }
113
114 pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
118 use super::types::{encode_request, DataRequest, MAX_HTL};
119
120 let peers = self.peers.read().await;
121
122 let peer_refs: Vec<_> = peers
125 .values()
126 .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
127 .filter_map(|p| {
128 p.peer.as_ref().map(|peer| {
129 (
130 p.peer_id.short(),
131 peer.data_channel.clone(),
132 peer.pending_requests.clone(),
133 )
134 })
135 })
136 .collect();
137
138 drop(peers); let mut connected_peers: Vec<(
142 String,
143 Arc<Mutex<HashMap<String, PendingRequest>>>,
144 Arc<webrtc::data_channel::RTCDataChannel>,
145 )> = Vec::new();
146 for (peer_id, dc_mutex, pending) in peer_refs {
147 let dc_guard = dc_mutex.lock().await;
148 if let Some(dc) = dc_guard.as_ref() {
149 connected_peers.push((peer_id, pending, dc.clone()));
150 }
151 }
152
153 if connected_peers.is_empty() {
154 debug!(
155 "No connected peers to query for {}",
156 &hash_hex[..8.min(hash_hex.len())]
157 );
158 return None;
159 }
160
161 debug!(
162 "Querying {} connected peers for {} (sequential with 500ms delay)",
163 connected_peers.len(),
164 &hash_hex[..8.min(hash_hex.len())]
165 );
166
167 let hash_bytes = match hex::decode(hash_hex) {
169 Ok(b) => b,
170 Err(_) => return None,
171 };
172
173 for (_i, (peer_id, pending_requests, dc)) in connected_peers.into_iter().enumerate() {
175 debug!(
176 "Querying peer {} for {}",
177 peer_id,
178 &hash_hex[..8.min(hash_hex.len())]
179 );
180
181 let (tx, rx) = tokio::sync::oneshot::channel();
183
184 {
186 let mut pending = pending_requests.lock().await;
187 pending.insert(
188 hash_hex.to_string(),
189 super::PendingRequest {
190 hash: hash_bytes.clone(),
191 response_tx: tx,
192 },
193 );
194 }
195
196 let req = DataRequest {
198 h: hash_bytes.clone(),
199 htl: MAX_HTL,
200 };
201 if let Ok(wire) = encode_request(&req) {
202 let wire_len = wire.len() as u64;
203 if dc.send(&bytes::Bytes::from(wire)).await.is_ok() {
204 self.record_sent(&peer_id, wire_len).await;
205 match tokio::time::timeout(std::time::Duration::from_millis(500), rx).await {
207 Ok(Ok(Some(data))) => {
208 self.record_received(&peer_id, data.len() as u64).await;
209 debug!(
210 "Got response from peer {} for {}",
211 peer_id,
212 &hash_hex[..8.min(hash_hex.len())]
213 );
214 return Some(data);
215 }
216 _ => {
217 debug!(
219 "No response from peer {} for {}",
220 peer_id,
221 &hash_hex[..8.min(hash_hex.len())]
222 );
223 }
224 }
225 }
226 }
227
228 let mut pending = pending_requests.lock().await;
230 pending.remove(hash_hex);
231 }
232
233 debug!(
234 "No peer had data for {}",
235 &hash_hex[..8.min(hash_hex.len())]
236 );
237 None
238 }
239}
240
241pub struct WebRTCManager {
243 config: WebRTCConfig,
244 my_peer_id: PeerId,
245 keys: Keys,
246 state: Arc<WebRTCState>,
247 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
248 shutdown_rx: tokio::sync::watch::Receiver<bool>,
249 signaling_tx: mpsc::Sender<SignalingMessage>,
251 signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
252 store: Option<Arc<dyn ContentStore>>,
254 peer_classifier: PeerClassifier,
256 nostr_relay: Option<Arc<NostrRelay>>,
258 state_event_tx: mpsc::Sender<PeerStateEvent>,
260 state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
261}
262
263impl WebRTCManager {
264 pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
266 let pubkey = keys.public_key().to_hex();
267 let my_peer_id = PeerId::new(pubkey, None);
268 let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
269 let (signaling_tx, signaling_rx) = mpsc::channel(100);
270 let (state_event_tx, state_event_rx) = mpsc::channel(100);
271
272 let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
274
275 Self {
276 config,
277 my_peer_id,
278 keys,
279 state: Arc::new(WebRTCState::new()),
280 shutdown: Arc::new(shutdown),
281 shutdown_rx,
282 signaling_tx,
283 signaling_rx: Some(signaling_rx),
284 store: None,
285 peer_classifier,
286 nostr_relay: None,
287 state_event_tx,
288 state_event_rx: Some(state_event_rx),
289 }
290 }
291
292 pub fn new_with_classifier(
294 keys: Keys,
295 config: WebRTCConfig,
296 classifier: PeerClassifier,
297 ) -> Self {
298 let mut manager = Self::new(keys, config);
299 manager.peer_classifier = classifier;
300 manager
301 }
302
303 pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
305 let mut manager = Self::new(keys, config);
306 manager.store = Some(store);
307 manager
308 }
309
310 pub fn new_with_store_and_classifier(
312 keys: Keys,
313 config: WebRTCConfig,
314 store: Arc<dyn ContentStore>,
315 classifier: PeerClassifier,
316 ) -> Self {
317 let mut manager = Self::new(keys, config);
318 manager.store = Some(store);
319 manager.peer_classifier = classifier;
320 manager
321 }
322
323 pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
325 self.store = Some(store);
326 }
327
328 pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
330 self.peer_classifier = classifier;
331 }
332
333 pub fn set_nostr_relay(&mut self, relay: Arc<NostrRelay>) {
335 self.nostr_relay = Some(relay);
336 }
337
338 pub fn my_peer_id(&self) -> &PeerId {
340 &self.my_peer_id
341 }
342
343 pub fn state(&self) -> Arc<WebRTCState> {
345 self.state.clone()
346 }
347
348 pub fn shutdown(&self) {
350 let _ = self.shutdown.send(true);
351 }
352
353 pub async fn connected_count(&self) -> usize {
355 self.state
356 .connected_count
357 .load(std::sync::atomic::Ordering::Relaxed)
358 }
359
360 pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
362 self.state
363 .peers
364 .read()
365 .await
366 .values()
367 .map(|p| PeerStatus {
368 peer_id: p.peer_id.to_string(),
369 pubkey: p.peer_id.pubkey.clone(),
370 state: p.state.to_string(),
371 direction: p.direction,
372 connected_at: Some(p.last_seen),
373 pool: p.pool,
374 })
375 .collect()
376 }
377
378 pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
382 let peers = self.state.peers.read().await;
383 let mut follows_connected = 0;
384 let mut follows_active = 0;
385 let mut other_connected = 0;
386 let mut other_active = 0;
387
388 for entry in peers.values() {
389 let is_active = entry.state == ConnectionState::Connected
392 || entry.state == ConnectionState::Connecting;
393
394 match entry.pool {
395 PeerPool::Follows => {
396 if is_active {
397 follows_active += 1;
398 }
399 if entry.state == ConnectionState::Connected {
400 follows_connected += 1;
401 }
402 }
403 PeerPool::Other => {
404 if is_active {
405 other_active += 1;
406 }
407 if entry.state == ConnectionState::Connected {
408 other_connected += 1;
409 }
410 }
411 }
412 }
413
414 (
415 follows_connected,
416 follows_active,
417 other_connected,
418 other_active,
419 )
420 }
421
422 fn can_accept_peer(&self, pool: PeerPool, pool_counts: &(usize, usize, usize, usize)) -> bool {
424 let (_, follows_active, _, other_active) = *pool_counts;
425 match pool {
426 PeerPool::Follows => follows_active < self.config.pools.follows.max_connections,
427 PeerPool::Other => other_active < self.config.pools.other.max_connections,
428 }
429 }
430
431 #[allow(dead_code)]
433 fn is_pool_satisfied(
434 &self,
435 pool: PeerPool,
436 pool_counts: &(usize, usize, usize, usize),
437 ) -> bool {
438 let (follows_connected, _, other_connected, _) = *pool_counts;
439 match pool {
440 PeerPool::Follows => {
441 follows_connected >= self.config.pools.follows.satisfied_connections
442 }
443 PeerPool::Other => other_connected >= self.config.pools.other.satisfied_connections,
444 }
445 }
446
447 #[allow(dead_code)]
449 fn is_satisfied(&self, pool_counts: &(usize, usize, usize, usize)) -> bool {
450 self.is_pool_satisfied(PeerPool::Follows, pool_counts)
451 && self.is_pool_satisfied(PeerPool::Other, pool_counts)
452 }
453
454 fn should_initiate(&self, their_uuid: &str) -> bool {
457 self.my_peer_id.uuid < their_uuid.to_string()
458 }
459
460 pub async fn run(&mut self) -> Result<()> {
462 info!(
463 "Starting WebRTC manager with peer ID: {}",
464 self.my_peer_id.short()
465 );
466
467 let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr::Event)>(100);
468
469 let mut signaling_rx = self
471 .signaling_rx
472 .take()
473 .expect("signaling_rx already taken");
474
475 let mut state_event_rx = self
477 .state_event_rx
478 .take()
479 .expect("state_event_rx already taken");
480
481 let (relay_write_tx, _) = tokio::sync::broadcast::channel::<SignalingMessage>(100);
483
484 for relay_url in &self.config.relays {
486 let url = relay_url.clone();
487 let event_tx = event_tx.clone();
488 let shutdown_rx = self.shutdown_rx.clone();
489 let keys = self.keys.clone();
490 let my_peer_id = self.my_peer_id.clone();
491 let hello_interval = Duration::from_millis(self.config.hello_interval_ms);
492 let relay_write_rx = relay_write_tx.subscribe();
493
494 tokio::spawn(async move {
495 if let Err(e) = Self::relay_task(
496 url.clone(),
497 event_tx,
498 shutdown_rx,
499 keys,
500 my_peer_id,
501 hello_interval,
502 relay_write_rx,
503 )
504 .await
505 {
506 error!("Relay {} error: {}", url, e);
507 }
508 });
509 }
510
511 let mut shutdown_rx = self.shutdown_rx.clone();
513 let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
515 loop {
516 tokio::select! {
517 _ = shutdown_rx.changed() => {
518 if *shutdown_rx.borrow() {
519 info!("WebRTC manager shutting down");
520 break;
521 }
522 }
523 Some((relay, event)) = event_rx.recv() => {
524 if let Err(e) = self.handle_event(&relay, &event, &relay_write_tx).await {
525 debug!("Error handling event from {}: {}", relay, e);
526 }
527 }
528 Some(msg) = signaling_rx.recv() => {
529 let _ = relay_write_tx.send(msg);
531 }
532 Some(event) = state_event_rx.recv() => {
533 self.handle_peer_state_event(event).await;
535 }
536 _ = cleanup_interval.tick() => {
537 self.cleanup_stale_peers().await;
539 }
540 }
541 }
542
543 Ok(())
544 }
545
546 async fn relay_task(
548 url: String,
549 event_tx: mpsc::Sender<(String, nostr::Event)>,
550 mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
551 keys: Keys,
552 my_peer_id: PeerId,
553 hello_interval: Duration,
554 mut signaling_rx: tokio::sync::broadcast::Receiver<SignalingMessage>,
555 ) -> Result<()> {
556 info!("Connecting to relay: {}", url);
557
558 let (ws_stream, _) = connect_async(&url).await?;
559 let (mut write, mut read) = ws_stream.split();
560
561 let hello_filter = Filter::new()
565 .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
566 .custom_tag(
567 nostr::SingleLetterTag::lowercase(nostr::Alphabet::L),
568 vec![HELLO_TAG],
569 )
570 .since(nostr::Timestamp::now() - Duration::from_secs(60));
571
572 let directed_filter = Filter::new()
573 .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
574 .custom_tag(
575 nostr::SingleLetterTag::lowercase(nostr::Alphabet::P),
576 vec![keys.public_key().to_hex()],
577 )
578 .since(nostr::Timestamp::now() - Duration::from_secs(60));
579
580 let sub_id = nostr::SubscriptionId::generate();
581 let sub_msg = ClientMessage::req(sub_id.clone(), vec![hello_filter, directed_filter]);
582 write.send(Message::Text(sub_msg.as_json().into())).await?;
583
584 info!(
585 "Subscribed to {} for WebRTC events (kind {})",
586 url, WEBRTC_KIND
587 );
588
589 let mut last_hello = Instant::now() - hello_interval; let mut hello_ticker = tokio::time::interval(Duration::from_secs(1));
591
592 loop {
593 tokio::select! {
594 _ = shutdown_rx.changed() => {
595 if *shutdown_rx.borrow() {
596 break;
597 }
598 }
599 _ = hello_ticker.tick() => {
600 if last_hello.elapsed() >= hello_interval {
602 let hello = SignalingMessage::hello(&my_peer_id.uuid);
603 if let Ok(event) = Self::create_signaling_event(&keys, &hello).await {
604 let msg = ClientMessage::event(event);
605 if write.send(Message::Text(msg.as_json().into())).await.is_ok() {
606 debug!("Sent hello to {}", url);
607 }
608 }
609 last_hello = Instant::now();
610 }
611 }
612 Ok(signaling_msg) = signaling_rx.recv() => {
614 info!("Sending {} via {}", signaling_msg.msg_type(), url);
615 if let Ok(event) = Self::create_signaling_event(&keys, &signaling_msg).await {
616 let event_id = event.id.to_string();
617 let msg = ClientMessage::event(event);
618 if write.send(Message::Text(msg.as_json().into())).await.is_ok() {
619 info!("Sent {} to {} (event id: {})", signaling_msg.msg_type(), url, &event_id[..16]);
620 }
621 }
622 }
623 msg = read.next() => {
624 match msg {
625 Some(Ok(Message::Text(text))) => {
626 if let Ok(relay_msg) = RelayMessage::from_json(&text) {
627 if let RelayMessage::Event { event, .. } = relay_msg {
628 let _ = event_tx.send((url.clone(), *event)).await;
629 }
630 }
631 }
632 Some(Err(e)) => {
633 error!("WebSocket error from {}: {}", url, e);
634 break;
635 }
636 None => {
637 warn!("WebSocket closed: {}", url);
638 break;
639 }
640 _ => {}
641 }
642 }
643 }
644 }
645
646 Ok(())
647 }
648
649 async fn create_signaling_event(keys: &Keys, msg: &SignalingMessage) -> Result<nostr::Event> {
655 if let Some(recipient_str) = msg.recipient() {
657 if let Some(peer_id) = PeerId::from_string(recipient_str) {
659 let recipient_pubkey = PublicKey::from_hex(&peer_id.pubkey)?;
660
661 let seal = serde_json::json!({
663 "pubkey": keys.public_key().to_hex(),
664 "kind": WEBRTC_KIND,
665 "content": serde_json::to_string(msg)?,
666 "tags": []
667 });
668
669 let ephemeral_keys = Keys::generate();
671
672 let encrypted_content = nip44::encrypt(
674 ephemeral_keys.secret_key(),
675 &recipient_pubkey,
676 &seal.to_string(),
677 nip44::Version::V2,
678 )?;
679
680 let created_at = nostr::Timestamp::now();
682 let expiration = created_at + Duration::from_secs(5 * 60); let tags = vec![
685 Tag::parse(&["p", &recipient_pubkey.to_hex()])?,
686 Tag::parse(&["expiration", &expiration.as_u64().to_string()])?,
687 ];
688
689 let event =
690 EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), encrypted_content, tags)
691 .to_event(&ephemeral_keys)?;
692
693 return Ok(event);
694 }
695 }
696
697 let tags = vec![
699 Tag::parse(&["l", HELLO_TAG])?,
700 Tag::parse(&["peerId", msg.peer_id()])?,
701 ];
702
703 let event =
704 EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), "", tags).to_event(keys)?;
705
706 Ok(event)
707 }
708
709 async fn handle_event(
715 &self,
716 relay: &str,
717 event: &nostr::Event,
718 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
719 ) -> Result<()> {
720 if event.kind != Kind::Ephemeral(WEBRTC_KIND as u16) {
722 return Ok(());
723 }
724
725 let get_tag = |name: &str| -> Option<String> {
727 event.tags.iter().find_map(|tag| {
728 let v: Vec<String> = tag.clone().to_vec();
729 if v.len() >= 2 && v[0] == name {
730 Some(v[1].clone())
731 } else {
732 None
733 }
734 })
735 };
736
737 let l_tag = get_tag("l");
739 if l_tag.as_deref() == Some(HELLO_TAG) {
740 let sender_pubkey = event.pubkey.to_hex();
741
742 if sender_pubkey == self.my_peer_id.pubkey {
744 return Ok(());
745 }
746
747 if let Some(their_uuid) = get_tag("peerId") {
748 debug!("Received hello from {} via {}", &sender_pubkey[..8], relay);
749 self.handle_hello(&sender_pubkey, &their_uuid, relay_write_tx)
750 .await?;
751 }
752 return Ok(());
753 }
754
755 let p_tag = get_tag("p");
757 if p_tag.as_deref() != Some(&self.keys.public_key().to_hex()) {
758 return Ok(());
760 }
761
762 if event.content.is_empty() {
764 return Ok(());
765 }
766
767 let seal: serde_json::Value =
769 match nip44::decrypt(self.keys.secret_key(), &event.pubkey, &event.content) {
770 Ok(plaintext) => match serde_json::from_str(&plaintext) {
771 Ok(v) => v,
772 Err(_) => return Ok(()),
773 },
774 Err(_) => {
775 return Ok(());
777 }
778 };
779
780 let sender_pubkey = seal
782 .get("pubkey")
783 .and_then(|v| v.as_str())
784 .ok_or_else(|| anyhow::anyhow!("Missing pubkey in seal"))?;
785
786 if sender_pubkey == self.my_peer_id.pubkey {
788 return Ok(());
789 }
790
791 let content = seal
792 .get("content")
793 .and_then(|v| v.as_str())
794 .ok_or_else(|| anyhow::anyhow!("Missing content in seal"))?;
795
796 let raw_msg: serde_json::Value = serde_json::from_str(content)?;
797 let msg_type = raw_msg.get("type").and_then(|v| v.as_str()).unwrap_or("");
798
799 if raw_msg.get("targetPeerId").is_some() {
801 let target_peer = raw_msg
802 .get("targetPeerId")
803 .and_then(|v| v.as_str())
804 .unwrap_or("");
805 if target_peer != self.my_peer_id.to_string() {
806 return Ok(());
807 }
808
809 let peer_id = raw_msg.get("peerId").and_then(|v| v.as_str()).unwrap_or("");
810 let their_uuid = peer_id.split(':').nth(1).unwrap_or(peer_id);
811
812 match msg_type {
813 "offer" => {
814 let sdp = raw_msg
815 .get("sdp")
816 .and_then(|v| v.as_str())
817 .ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
818 let offer = serde_json::json!({ "type": "offer", "sdp": sdp });
819 self.handle_offer(&sender_pubkey, their_uuid, offer, relay_write_tx)
820 .await?;
821 }
822 "answer" => {
823 let sdp = raw_msg
824 .get("sdp")
825 .and_then(|v| v.as_str())
826 .ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
827 let answer = serde_json::json!({ "type": "answer", "sdp": sdp });
828 self.handle_answer(&sender_pubkey, their_uuid, answer)
829 .await?;
830 }
831 "candidate" => {
832 let candidate = raw_msg
833 .get("candidate")
834 .and_then(|v| v.as_str())
835 .unwrap_or("");
836 if !candidate.is_empty() {
837 let candidate_json = serde_json::json!({
838 "candidate": candidate,
839 "sdpMid": raw_msg.get("sdpMid"),
840 "sdpMLineIndex": raw_msg.get("sdpMLineIndex"),
841 });
842 self.handle_candidate(&sender_pubkey, their_uuid, candidate_json)
843 .await?;
844 }
845 }
846 "candidates" => {
847 let candidates = raw_msg
848 .get("candidates")
849 .and_then(|v| v.as_array())
850 .map(|entries| {
851 entries
852 .iter()
853 .filter_map(|entry| {
854 if let Some(candidate_str) = entry
855 .get("candidate")
856 .and_then(|v| v.as_str())
857 .or_else(|| entry.as_str())
858 {
859 Some(serde_json::json!({
860 "candidate": candidate_str,
861 "sdpMid": entry.get("sdpMid"),
862 "sdpMLineIndex": entry.get("sdpMLineIndex"),
863 }))
864 } else {
865 None
866 }
867 })
868 .collect::<Vec<_>>()
869 })
870 .unwrap_or_default();
871 self.handle_candidates(&sender_pubkey, their_uuid, candidates)
872 .await?;
873 }
874 _ => {}
875 }
876
877 return Ok(());
878 }
879
880 let msg: SignalingMessage = serde_json::from_value(raw_msg)?;
881
882 debug!(
883 "Received {} from {} via {} (gift-wrapped)",
884 msg.msg_type(),
885 &sender_pubkey[..8],
886 relay
887 );
888
889 match msg {
890 SignalingMessage::Hello { .. } => {
891 return Ok(());
893 }
894 SignalingMessage::Offer {
895 recipient,
896 peer_id: their_uuid,
897 offer,
898 } => {
899 if recipient != self.my_peer_id.to_string() {
900 return Ok(()); }
902 if let Err(e) = self
903 .handle_offer(&sender_pubkey, &their_uuid, offer, relay_write_tx)
904 .await
905 {
906 error!(
907 "handle_offer FAILED: sender={}, uuid={}, error={:?}",
908 &sender_pubkey[..8.min(sender_pubkey.len())],
909 their_uuid,
910 e
911 );
912 return Err(e);
913 }
914 }
915 SignalingMessage::Answer {
916 recipient,
917 peer_id: their_uuid,
918 answer,
919 } => {
920 if recipient != self.my_peer_id.to_string() {
921 return Ok(());
922 }
923 self.handle_answer(&sender_pubkey, &their_uuid, answer)
924 .await?;
925 }
926 SignalingMessage::Candidate {
927 recipient,
928 peer_id: their_uuid,
929 candidate,
930 } => {
931 if recipient != self.my_peer_id.to_string() {
932 return Ok(());
933 }
934 self.handle_candidate(&sender_pubkey, &their_uuid, candidate)
935 .await?;
936 }
937 SignalingMessage::Candidates {
938 recipient,
939 peer_id: their_uuid,
940 candidates,
941 } => {
942 if recipient != self.my_peer_id.to_string() {
943 return Ok(());
944 }
945 self.handle_candidates(&sender_pubkey, &their_uuid, candidates)
946 .await?;
947 }
948 }
949
950 Ok(())
951 }
952
953 async fn handle_hello(
955 &self,
956 sender_pubkey: &str,
957 their_uuid: &str,
958 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
959 ) -> Result<()> {
960 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
961 let peer_key = full_peer_id.to_string();
962
963 {
965 let peers = self.state.peers.read().await;
966 if let Some(entry) = peers.get(&peer_key) {
967 if entry.state == ConnectionState::Connected
969 || entry.state == ConnectionState::Connecting
970 {
971 return Ok(());
972 }
973 }
974 }
975
976 let pool = (self.peer_classifier)(sender_pubkey);
978
979 let pool_counts = self.get_pool_counts().await;
981 if !self.can_accept_peer(pool, &pool_counts) {
982 debug!(
983 "Ignoring hello from {} - pool {:?} is full",
984 full_peer_id.short(),
985 pool
986 );
987 return Ok(());
988 }
989
990 let should_initiate = self.should_initiate(their_uuid);
992
993 let pool_satisfied = self.is_pool_satisfied(pool, &pool_counts);
996 let will_initiate = should_initiate && !pool_satisfied;
997
998 info!(
999 "Discovered peer: {} (pool: {:?}, initiate: {}, pool_satisfied: {})",
1000 full_peer_id.short(),
1001 pool,
1002 will_initiate,
1003 pool_satisfied
1004 );
1005
1006 if !will_initiate && pool_satisfied {
1009 debug!(
1010 "Pool {:?} is satisfied, not tracking peer {}",
1011 pool,
1012 full_peer_id.short()
1013 );
1014 return Ok(());
1015 }
1016
1017 {
1019 let mut peers = self.state.peers.write().await;
1020 peers.insert(
1021 peer_key.clone(),
1022 PeerEntry {
1023 peer_id: full_peer_id.clone(),
1024 direction: if will_initiate {
1025 PeerDirection::Outbound
1026 } else {
1027 PeerDirection::Inbound
1028 },
1029 state: ConnectionState::Discovered,
1030 last_seen: Instant::now(),
1031 peer: None,
1032 pool,
1033 bytes_sent: 0,
1034 bytes_received: 0,
1035 },
1036 );
1037 }
1038
1039 if will_initiate {
1041 self.initiate_connection(&full_peer_id, pool, relay_write_tx)
1042 .await?;
1043 }
1044
1045 Ok(())
1046 }
1047
1048 async fn initiate_connection(
1050 &self,
1051 peer_id: &PeerId,
1052 pool: PeerPool,
1053 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1054 ) -> Result<()> {
1055 let peer_key = peer_id.to_string();
1056
1057 info!(
1058 "Initiating connection to {} (pool: {:?})",
1059 peer_id.short(),
1060 pool
1061 );
1062
1063 let mut peer = Peer::new_with_store_and_events(
1065 peer_id.clone(),
1066 PeerDirection::Outbound,
1067 self.my_peer_id.clone(),
1068 self.signaling_tx.clone(),
1069 self.config.stun_servers.clone(),
1070 self.store.clone(),
1071 Some(self.state_event_tx.clone()),
1072 self.nostr_relay.clone(),
1073 )
1074 .await?;
1075
1076 peer.setup_handlers().await?;
1077
1078 let offer = peer.connect().await?;
1080
1081 {
1083 let mut peers = self.state.peers.write().await;
1084 if let Some(entry) = peers.get_mut(&peer_key) {
1085 entry.state = ConnectionState::Connecting;
1086 entry.peer = Some(peer);
1087 entry.pool = pool;
1088 }
1089 }
1090
1091 let offer_msg = SignalingMessage::Offer {
1093 offer,
1094 recipient: peer_id.to_string(),
1095 peer_id: self.my_peer_id.uuid.clone(),
1096 };
1097 if relay_write_tx.send(offer_msg).is_err() {
1098 warn!("Failed to broadcast offer to {}", peer_id.short());
1099 }
1100
1101 info!("Sent offer to {}", peer_id.short());
1102
1103 Ok(())
1104 }
1105
1106 async fn handle_offer(
1108 &self,
1109 sender_pubkey: &str,
1110 their_uuid: &str,
1111 offer: serde_json::Value,
1112 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1113 ) -> Result<()> {
1114 debug!(
1115 "handle_offer ENTRY: sender={}, uuid={}",
1116 &sender_pubkey[..8.min(sender_pubkey.len())],
1117 their_uuid
1118 );
1119 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1120 let peer_key = full_peer_id.to_string();
1121
1122 let pool = (self.peer_classifier)(sender_pubkey);
1124
1125 info!(
1126 "Received offer from {} (pool: {:?})",
1127 full_peer_id.short(),
1128 pool
1129 );
1130
1131 {
1133 let peers = self.state.peers.read().await;
1134 debug!(
1135 "Checking for existing peer, peer_key: {}, known_peers: {}",
1136 peer_key,
1137 peers.len()
1138 );
1139 if let Some(entry) = peers.get(&peer_key) {
1140 if entry.peer.is_some() {
1142 debug!(
1143 "Already have peer {} with connection, skipping offer",
1144 full_peer_id.short()
1145 );
1146 return Ok(());
1147 }
1148 debug!(
1149 "Peer {} exists but has no connection, proceeding",
1150 full_peer_id.short()
1151 );
1152 } else {
1153 debug!(
1154 "Peer {} not found in peers map, will create new entry",
1155 full_peer_id.short()
1156 );
1157 }
1158 }
1159
1160 let pool_counts = self.get_pool_counts().await;
1162 debug!(
1163 "Pool counts: {:?}, checking can_accept_peer for {:?}",
1164 pool_counts, pool
1165 );
1166 if !self.can_accept_peer(pool, &pool_counts) {
1167 warn!(
1168 "Rejecting offer from {} - pool {:?} is full",
1169 full_peer_id.short(),
1170 pool
1171 );
1172 return Ok(());
1173 }
1174 debug!("Pool check passed for {}", full_peer_id.short());
1175
1176 debug!("Creating peer connection for {}", full_peer_id.short());
1178 let mut peer = Peer::new_with_store_and_events(
1179 full_peer_id.clone(),
1180 PeerDirection::Inbound,
1181 self.my_peer_id.clone(),
1182 self.signaling_tx.clone(),
1183 self.config.stun_servers.clone(),
1184 self.store.clone(),
1185 Some(self.state_event_tx.clone()),
1186 self.nostr_relay.clone(),
1187 )
1188 .await?;
1189 debug!("Peer connection created for {}", full_peer_id.short());
1190
1191 peer.setup_handlers().await?;
1192 debug!("Handlers set up for {}", full_peer_id.short());
1193
1194 let answer = peer.handle_offer(offer).await?;
1196 debug!("Answer created for {}", full_peer_id.short());
1197
1198 {
1200 let mut peers = self.state.peers.write().await;
1201 peers.insert(
1202 peer_key,
1203 PeerEntry {
1204 peer_id: full_peer_id.clone(),
1205 direction: PeerDirection::Inbound,
1206 state: ConnectionState::Connecting,
1207 last_seen: Instant::now(),
1208 peer: Some(peer),
1209 pool,
1210 bytes_sent: 0,
1211 bytes_received: 0,
1212 },
1213 );
1214 }
1215
1216 let answer_msg = SignalingMessage::Answer {
1220 answer,
1221 recipient: full_peer_id.to_string(),
1222 peer_id: self.my_peer_id.uuid.clone(),
1223 };
1224 if relay_write_tx.send(answer_msg).is_err() {
1225 warn!("Failed to send answer to {}", full_peer_id.short());
1226 }
1227 info!("Sent answer to {}", full_peer_id.short());
1228
1229 Ok(())
1230 }
1231
1232 async fn handle_answer(
1234 &self,
1235 sender_pubkey: &str,
1236 their_uuid: &str,
1237 answer: serde_json::Value,
1238 ) -> Result<()> {
1239 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1240 let peer_key = full_peer_id.to_string();
1241
1242 info!("Received answer from {}", full_peer_id.short());
1243
1244 let mut peers = self.state.peers.write().await;
1245 if let Some(entry) = peers.get_mut(&peer_key) {
1246 if entry.state == ConnectionState::Connected {
1248 debug!(
1249 "Ignoring duplicate answer from {} - already connected",
1250 full_peer_id.short()
1251 );
1252 return Ok(());
1253 }
1254 if let Some(ref mut peer) = entry.peer {
1255 use webrtc::peer_connection::signaling_state::RTCSignalingState;
1257 let signaling_state = peer.signaling_state();
1258 if signaling_state != RTCSignalingState::HaveLocalOffer {
1259 debug!(
1260 "Ignoring answer from {} - signaling state is {:?}, not HaveLocalOffer",
1261 full_peer_id.short(),
1262 signaling_state
1263 );
1264 return Ok(());
1265 }
1266 peer.handle_answer(answer).await?;
1267 info!("Applied answer from {}", full_peer_id.short());
1268 } else {
1269 debug!("Peer {} has no connection object", full_peer_id.short());
1270 }
1271 } else {
1272 debug!("No peer found for key: {}", peer_key);
1273 }
1274
1275 Ok(())
1276 }
1277
1278 async fn handle_candidate(
1280 &self,
1281 sender_pubkey: &str,
1282 their_uuid: &str,
1283 candidate: serde_json::Value,
1284 ) -> Result<()> {
1285 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1286 let peer_key = full_peer_id.to_string();
1287
1288 info!("Received ICE candidate from {}", full_peer_id.short());
1289
1290 let mut peers = self.state.peers.write().await;
1291 if let Some(entry) = peers.get_mut(&peer_key) {
1292 if let Some(ref mut peer) = entry.peer {
1293 peer.handle_candidate(candidate).await?;
1294 }
1295 }
1296
1297 Ok(())
1298 }
1299
1300 async fn handle_candidates(
1302 &self,
1303 sender_pubkey: &str,
1304 their_uuid: &str,
1305 candidates: Vec<serde_json::Value>,
1306 ) -> Result<()> {
1307 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1308 let peer_key = full_peer_id.to_string();
1309
1310 debug!(
1311 "Received {} candidates from {}",
1312 candidates.len(),
1313 full_peer_id.short()
1314 );
1315
1316 let mut peers = self.state.peers.write().await;
1317 if let Some(entry) = peers.get_mut(&peer_key) {
1318 if let Some(ref mut peer) = entry.peer {
1319 for candidate in candidates {
1320 if let Err(e) = peer.handle_candidate(candidate).await {
1321 debug!("Failed to add candidate: {}", e);
1322 }
1323 }
1324 }
1325 }
1326
1327 Ok(())
1328 }
1329
1330 async fn handle_peer_state_event(&self, event: PeerStateEvent) {
1332 match event {
1333 PeerStateEvent::Connected(peer_id) => {
1334 let peer_key = peer_id.to_string();
1335 let mut peers = self.state.peers.write().await;
1336 if let Some(entry) = peers.get_mut(&peer_key) {
1337 if entry.state != ConnectionState::Connected {
1338 info!("Peer {} connected (via state event)", peer_id.short());
1339 entry.state = ConnectionState::Connected;
1340 self.state
1342 .connected_count
1343 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1344 }
1345 }
1346 }
1347 PeerStateEvent::Failed(peer_id) => {
1348 let peer_key = peer_id.to_string();
1349 info!(
1350 "Peer {} connection failed - removing from pool",
1351 peer_id.short()
1352 );
1353 let mut peers = self.state.peers.write().await;
1354 if let Some(entry) = peers.remove(&peer_key) {
1355 if entry.state == ConnectionState::Connected {
1357 self.state
1358 .connected_count
1359 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
1360 }
1361 if let Some(peer) = entry.peer {
1363 let _ = peer.close().await;
1364 }
1365 }
1366 }
1367 PeerStateEvent::Disconnected(peer_id) => {
1368 let peer_key = peer_id.to_string();
1369 info!("Peer {} disconnected - removing from pool", peer_id.short());
1370 let mut peers = self.state.peers.write().await;
1371 if let Some(entry) = peers.remove(&peer_key) {
1372 if entry.state == ConnectionState::Connected {
1374 self.state
1375 .connected_count
1376 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
1377 }
1378 if let Some(peer) = entry.peer {
1380 let _ = peer.close().await;
1381 }
1382 }
1383 }
1384 }
1385 }
1386
1387 async fn cleanup_stale_peers(&self) {
1389 let mut peers = self.state.peers.write().await;
1390 let mut connected_count = 0;
1391 let mut to_remove = Vec::new();
1392 let stale_timeout = Duration::from_secs(60); for (key, entry) in peers.iter_mut() {
1395 if let Some(ref peer) = entry.peer {
1396 if peer.is_connected() {
1398 if entry.state != ConnectionState::Connected {
1399 info!(
1400 "Peer {} is now connected (sync fallback)",
1401 entry.peer_id.short()
1402 );
1403 entry.state = ConnectionState::Connected;
1404 }
1405 connected_count += 1;
1406 } else if entry.state == ConnectionState::Connecting
1407 && entry.last_seen.elapsed() > stale_timeout
1408 {
1409 info!(
1411 "Removing stale peer {} (stuck in Connecting for {:?})",
1412 entry.peer_id.short(),
1413 entry.last_seen.elapsed()
1414 );
1415 to_remove.push(key.clone());
1416 }
1417 } else if entry.state == ConnectionState::Discovered
1418 && entry.last_seen.elapsed() > stale_timeout
1419 {
1420 debug!("Removing stale discovered peer {}", entry.peer_id.short());
1422 to_remove.push(key.clone());
1423 }
1424 }
1425
1426 for key in to_remove {
1428 if let Some(entry) = peers.remove(&key) {
1429 if let Some(peer) = entry.peer {
1430 let _ = peer.close().await;
1431 }
1432 }
1433 }
1434
1435 self.state
1436 .connected_count
1437 .store(connected_count, std::sync::atomic::Ordering::Relaxed);
1438 }
1439}
1440
1441#[allow(dead_code)]
1443#[derive(Debug, Clone)]
1444pub struct PeerState {
1445 pub peer_id: PeerId,
1446 pub direction: PeerDirection,
1447 pub state: String,
1448 pub last_seen: Instant,
1449}