1use anyhow::Result;
13use futures::{SinkExt, StreamExt};
14use nostr::{nips::nip44, ClientMessage, EventBuilder, Filter, JsonUtil, Keys, Kind, PublicKey, RelayMessage, Tag};
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use tokio::sync::{mpsc, Mutex, RwLock};
19use tokio_tungstenite::{connect_async, tungstenite::Message};
20use tracing::{debug, error, info, warn};
21
22use super::peer::{ContentStore, Peer, PendingRequest};
23use super::types::{
24 PeerDirection, PeerId, PeerPool, PeerStateEvent, PeerStatus, SignalingMessage, WebRTCConfig, WEBRTC_KIND, HELLO_TAG,
25};
26
27pub type PeerClassifier = Arc<dyn Fn(&str) -> PeerPool + Send + Sync>;
29
30#[derive(Debug, Clone, PartialEq)]
32pub enum ConnectionState {
33 Discovered,
34 Connecting,
35 Connected,
36 Failed,
37}
38
39impl std::fmt::Display for ConnectionState {
40 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 match self {
42 ConnectionState::Discovered => write!(f, "discovered"),
43 ConnectionState::Connecting => write!(f, "connecting"),
44 ConnectionState::Connected => write!(f, "connected"),
45 ConnectionState::Failed => write!(f, "failed"),
46 }
47 }
48}
49
50pub struct PeerEntry {
52 pub peer_id: PeerId,
53 pub direction: PeerDirection,
54 pub state: ConnectionState,
55 pub last_seen: Instant,
56 pub peer: Option<Peer>,
57 pub pool: PeerPool,
58 pub bytes_sent: u64,
59 pub bytes_received: u64,
60}
61
62pub struct WebRTCState {
64 pub peers: RwLock<HashMap<String, PeerEntry>>,
65 pub connected_count: std::sync::atomic::AtomicUsize,
66 pub bytes_sent: std::sync::atomic::AtomicU64,
68 pub bytes_received: std::sync::atomic::AtomicU64,
70}
71
72impl WebRTCState {
73 pub fn new() -> Self {
74 Self {
75 peers: RwLock::new(HashMap::new()),
76 connected_count: std::sync::atomic::AtomicUsize::new(0),
77 bytes_sent: std::sync::atomic::AtomicU64::new(0),
78 bytes_received: std::sync::atomic::AtomicU64::new(0),
79 }
80 }
81
82 pub fn get_bandwidth(&self) -> (u64, u64) {
84 (
85 self.bytes_sent.load(std::sync::atomic::Ordering::Relaxed),
86 self.bytes_received.load(std::sync::atomic::Ordering::Relaxed),
87 )
88 }
89
90 pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
92 self.bytes_sent.fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
93 if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
94 entry.bytes_sent += bytes;
95 }
96 }
97
98 pub async fn record_received(&self, peer_id: &str, bytes: u64) {
100 self.bytes_received.fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
101 if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
102 entry.bytes_received += bytes;
103 }
104 }
105
106 pub async fn request_from_peers(&self, hash_hex: &str) -> Option<Vec<u8>> {
110 use super::types::{DataRequest, MAX_HTL, encode_request};
111
112 let peers = self.peers.read().await;
113
114 let peer_refs: Vec<_> = peers
117 .values()
118 .filter(|p| p.state == ConnectionState::Connected && p.peer.is_some())
119 .filter_map(|p| {
120 p.peer.as_ref().map(|peer| {
121 (p.peer_id.short(), peer.data_channel.clone(), peer.pending_requests.clone())
122 })
123 })
124 .collect();
125
126 drop(peers); let mut connected_peers: Vec<(String, Arc<Mutex<HashMap<String, PendingRequest>>>, Arc<webrtc::data_channel::RTCDataChannel>)> = Vec::new();
130 for (peer_id, dc_mutex, pending) in peer_refs {
131 let dc_guard = dc_mutex.lock().await;
132 if let Some(dc) = dc_guard.as_ref() {
133 connected_peers.push((peer_id, pending, dc.clone()));
134 }
135 }
136
137 if connected_peers.is_empty() {
138 debug!("No connected peers to query for {}", &hash_hex[..8.min(hash_hex.len())]);
139 return None;
140 }
141
142 debug!(
143 "Querying {} connected peers for {} (sequential with 500ms delay)",
144 connected_peers.len(),
145 &hash_hex[..8.min(hash_hex.len())]
146 );
147
148 let hash_bytes = match hex::decode(hash_hex) {
150 Ok(b) => b,
151 Err(_) => return None,
152 };
153
154 for (_i, (peer_id, pending_requests, dc)) in connected_peers.into_iter().enumerate() {
156 debug!("Querying peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
157
158 let (tx, rx) = tokio::sync::oneshot::channel();
160
161 {
163 let mut pending = pending_requests.lock().await;
164 pending.insert(
165 hash_hex.to_string(),
166 super::PendingRequest {
167 hash: hash_bytes.clone(),
168 response_tx: tx,
169 },
170 );
171 }
172
173 let req = DataRequest {
175 h: hash_bytes.clone(),
176 htl: MAX_HTL,
177 };
178 if let Ok(wire) = encode_request(&req) {
179 let wire_len = wire.len() as u64;
180 if dc.send(&bytes::Bytes::from(wire)).await.is_ok() {
181 self.record_sent(&peer_id, wire_len).await;
182 match tokio::time::timeout(std::time::Duration::from_millis(500), rx).await {
184 Ok(Ok(Some(data))) => {
185 self.record_received(&peer_id, data.len() as u64).await;
186 debug!("Got response from peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
187 return Some(data);
188 }
189 _ => {
190 debug!("No response from peer {} for {}", peer_id, &hash_hex[..8.min(hash_hex.len())]);
192 }
193 }
194 }
195 }
196
197 let mut pending = pending_requests.lock().await;
199 pending.remove(hash_hex);
200 }
201
202 debug!("No peer had data for {}", &hash_hex[..8.min(hash_hex.len())]);
203 None
204 }
205}
206
207pub struct WebRTCManager {
209 config: WebRTCConfig,
210 my_peer_id: PeerId,
211 keys: Keys,
212 state: Arc<WebRTCState>,
213 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
214 shutdown_rx: tokio::sync::watch::Receiver<bool>,
215 signaling_tx: mpsc::Sender<SignalingMessage>,
217 signaling_rx: Option<mpsc::Receiver<SignalingMessage>>,
218 store: Option<Arc<dyn ContentStore>>,
220 peer_classifier: PeerClassifier,
222 state_event_tx: mpsc::Sender<PeerStateEvent>,
224 state_event_rx: Option<mpsc::Receiver<PeerStateEvent>>,
225}
226
227impl WebRTCManager {
228 pub fn new(keys: Keys, config: WebRTCConfig) -> Self {
230 let pubkey = keys.public_key().to_hex();
231 let my_peer_id = PeerId::new(pubkey, None);
232 let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
233 let (signaling_tx, signaling_rx) = mpsc::channel(100);
234 let (state_event_tx, state_event_rx) = mpsc::channel(100);
235
236 let peer_classifier: PeerClassifier = Arc::new(|_| PeerPool::Other);
238
239 Self {
240 config,
241 my_peer_id,
242 keys,
243 state: Arc::new(WebRTCState::new()),
244 shutdown: Arc::new(shutdown),
245 shutdown_rx,
246 signaling_tx,
247 signaling_rx: Some(signaling_rx),
248 store: None,
249 peer_classifier,
250 state_event_tx,
251 state_event_rx: Some(state_event_rx),
252 }
253 }
254
255 pub fn new_with_classifier(keys: Keys, config: WebRTCConfig, classifier: PeerClassifier) -> Self {
257 let mut manager = Self::new(keys, config);
258 manager.peer_classifier = classifier;
259 manager
260 }
261
262 pub fn new_with_store(keys: Keys, config: WebRTCConfig, store: Arc<dyn ContentStore>) -> Self {
264 let mut manager = Self::new(keys, config);
265 manager.store = Some(store);
266 manager
267 }
268
269 pub fn new_with_store_and_classifier(
271 keys: Keys,
272 config: WebRTCConfig,
273 store: Arc<dyn ContentStore>,
274 classifier: PeerClassifier,
275 ) -> Self {
276 let mut manager = Self::new(keys, config);
277 manager.store = Some(store);
278 manager.peer_classifier = classifier;
279 manager
280 }
281
282 pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
284 self.store = Some(store);
285 }
286
287 pub fn set_peer_classifier(&mut self, classifier: PeerClassifier) {
289 self.peer_classifier = classifier;
290 }
291
292 pub fn my_peer_id(&self) -> &PeerId {
294 &self.my_peer_id
295 }
296
297 pub fn state(&self) -> Arc<WebRTCState> {
299 self.state.clone()
300 }
301
302 pub fn shutdown(&self) {
304 let _ = self.shutdown.send(true);
305 }
306
307 pub async fn connected_count(&self) -> usize {
309 self.state
310 .connected_count
311 .load(std::sync::atomic::Ordering::Relaxed)
312 }
313
314 pub async fn peer_statuses(&self) -> Vec<PeerStatus> {
316 self.state
317 .peers
318 .read()
319 .await
320 .values()
321 .map(|p| PeerStatus {
322 peer_id: p.peer_id.to_string(),
323 pubkey: p.peer_id.pubkey.clone(),
324 state: p.state.to_string(),
325 direction: p.direction,
326 connected_at: Some(p.last_seen),
327 pool: p.pool,
328 })
329 .collect()
330 }
331
332 pub async fn get_pool_counts(&self) -> (usize, usize, usize, usize) {
336 let peers = self.state.peers.read().await;
337 let mut follows_connected = 0;
338 let mut follows_active = 0;
339 let mut other_connected = 0;
340 let mut other_active = 0;
341
342 for entry in peers.values() {
343 let is_active = entry.state == ConnectionState::Connected
346 || entry.state == ConnectionState::Connecting;
347
348 match entry.pool {
349 PeerPool::Follows => {
350 if is_active {
351 follows_active += 1;
352 }
353 if entry.state == ConnectionState::Connected {
354 follows_connected += 1;
355 }
356 }
357 PeerPool::Other => {
358 if is_active {
359 other_active += 1;
360 }
361 if entry.state == ConnectionState::Connected {
362 other_connected += 1;
363 }
364 }
365 }
366 }
367
368 (follows_connected, follows_active, other_connected, other_active)
369 }
370
371 fn can_accept_peer(&self, pool: PeerPool, pool_counts: &(usize, usize, usize, usize)) -> bool {
373 let (_, follows_active, _, other_active) = *pool_counts;
374 match pool {
375 PeerPool::Follows => follows_active < self.config.pools.follows.max_connections,
376 PeerPool::Other => other_active < self.config.pools.other.max_connections,
377 }
378 }
379
380 #[allow(dead_code)]
382 fn is_pool_satisfied(&self, pool: PeerPool, pool_counts: &(usize, usize, usize, usize)) -> bool {
383 let (follows_connected, _, other_connected, _) = *pool_counts;
384 match pool {
385 PeerPool::Follows => follows_connected >= self.config.pools.follows.satisfied_connections,
386 PeerPool::Other => other_connected >= self.config.pools.other.satisfied_connections,
387 }
388 }
389
390 #[allow(dead_code)]
392 fn is_satisfied(&self, pool_counts: &(usize, usize, usize, usize)) -> bool {
393 self.is_pool_satisfied(PeerPool::Follows, pool_counts)
394 && self.is_pool_satisfied(PeerPool::Other, pool_counts)
395 }
396
397 fn should_initiate(&self, their_uuid: &str) -> bool {
400 self.my_peer_id.uuid < their_uuid.to_string()
401 }
402
403 pub async fn run(&mut self) -> Result<()> {
405 info!(
406 "Starting WebRTC manager with peer ID: {}",
407 self.my_peer_id.short()
408 );
409
410 let (event_tx, mut event_rx) = mpsc::channel::<(String, nostr::Event)>(100);
411
412 let mut signaling_rx = self.signaling_rx.take().expect("signaling_rx already taken");
414
415 let mut state_event_rx = self.state_event_rx.take().expect("state_event_rx already taken");
417
418 let (relay_write_tx, _) = tokio::sync::broadcast::channel::<SignalingMessage>(100);
420
421 for relay_url in &self.config.relays {
423 let url = relay_url.clone();
424 let event_tx = event_tx.clone();
425 let shutdown_rx = self.shutdown_rx.clone();
426 let keys = self.keys.clone();
427 let my_peer_id = self.my_peer_id.clone();
428 let hello_interval = Duration::from_millis(self.config.hello_interval_ms);
429 let relay_write_rx = relay_write_tx.subscribe();
430
431 tokio::spawn(async move {
432 if let Err(e) = Self::relay_task(
433 url.clone(),
434 event_tx,
435 shutdown_rx,
436 keys,
437 my_peer_id,
438 hello_interval,
439 relay_write_rx,
440 )
441 .await
442 {
443 error!("Relay {} error: {}", url, e);
444 }
445 });
446 }
447
448 let mut shutdown_rx = self.shutdown_rx.clone();
450 let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
452 loop {
453 tokio::select! {
454 _ = shutdown_rx.changed() => {
455 if *shutdown_rx.borrow() {
456 info!("WebRTC manager shutting down");
457 break;
458 }
459 }
460 Some((relay, event)) = event_rx.recv() => {
461 if let Err(e) = self.handle_event(&relay, &event, &relay_write_tx).await {
462 debug!("Error handling event from {}: {}", relay, e);
463 }
464 }
465 Some(msg) = signaling_rx.recv() => {
466 let _ = relay_write_tx.send(msg);
468 }
469 Some(event) = state_event_rx.recv() => {
470 self.handle_peer_state_event(event).await;
472 }
473 _ = cleanup_interval.tick() => {
474 self.cleanup_stale_peers().await;
476 }
477 }
478 }
479
480 Ok(())
481 }
482
483 async fn relay_task(
485 url: String,
486 event_tx: mpsc::Sender<(String, nostr::Event)>,
487 mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
488 keys: Keys,
489 my_peer_id: PeerId,
490 hello_interval: Duration,
491 mut signaling_rx: tokio::sync::broadcast::Receiver<SignalingMessage>,
492 ) -> Result<()> {
493 info!("Connecting to relay: {}", url);
494
495 let (ws_stream, _) = connect_async(&url).await?;
496 let (mut write, mut read) = ws_stream.split();
497
498 let hello_filter = Filter::new()
502 .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
503 .custom_tag(
504 nostr::SingleLetterTag::lowercase(nostr::Alphabet::L),
505 vec![HELLO_TAG],
506 )
507 .since(nostr::Timestamp::now() - Duration::from_secs(60));
508
509 let directed_filter = Filter::new()
510 .kind(Kind::Ephemeral(WEBRTC_KIND as u16))
511 .custom_tag(
512 nostr::SingleLetterTag::lowercase(nostr::Alphabet::P),
513 vec![keys.public_key().to_hex()],
514 )
515 .since(nostr::Timestamp::now() - Duration::from_secs(60));
516
517 let sub_id = nostr::SubscriptionId::generate();
518 let sub_msg = ClientMessage::req(sub_id.clone(), vec![hello_filter, directed_filter]);
519 write.send(Message::Text(sub_msg.as_json().into())).await?;
520
521 info!("Subscribed to {} for WebRTC events (kind {})", url, WEBRTC_KIND);
522
523 let mut last_hello = Instant::now() - hello_interval; let mut hello_ticker = tokio::time::interval(Duration::from_secs(1));
525
526 loop {
527 tokio::select! {
528 _ = shutdown_rx.changed() => {
529 if *shutdown_rx.borrow() {
530 break;
531 }
532 }
533 _ = hello_ticker.tick() => {
534 if last_hello.elapsed() >= hello_interval {
536 let hello = SignalingMessage::hello(&my_peer_id.uuid);
537 if let Ok(event) = Self::create_signaling_event(&keys, &hello).await {
538 let msg = ClientMessage::event(event);
539 if write.send(Message::Text(msg.as_json().into())).await.is_ok() {
540 debug!("Sent hello to {}", url);
541 }
542 }
543 last_hello = Instant::now();
544 }
545 }
546 Ok(signaling_msg) = signaling_rx.recv() => {
548 info!("Sending {} via {}", signaling_msg.msg_type(), url);
549 if let Ok(event) = Self::create_signaling_event(&keys, &signaling_msg).await {
550 let event_id = event.id.to_string();
551 let msg = ClientMessage::event(event);
552 if write.send(Message::Text(msg.as_json().into())).await.is_ok() {
553 info!("Sent {} to {} (event id: {})", signaling_msg.msg_type(), url, &event_id[..16]);
554 }
555 }
556 }
557 msg = read.next() => {
558 match msg {
559 Some(Ok(Message::Text(text))) => {
560 if let Ok(relay_msg) = RelayMessage::from_json(&text) {
561 if let RelayMessage::Event { event, .. } = relay_msg {
562 let _ = event_tx.send((url.clone(), *event)).await;
563 }
564 }
565 }
566 Some(Err(e)) => {
567 error!("WebSocket error from {}: {}", url, e);
568 break;
569 }
570 None => {
571 warn!("WebSocket closed: {}", url);
572 break;
573 }
574 _ => {}
575 }
576 }
577 }
578 }
579
580 Ok(())
581 }
582
583 async fn create_signaling_event(keys: &Keys, msg: &SignalingMessage) -> Result<nostr::Event> {
589 if let Some(recipient_str) = msg.recipient() {
591 if let Some(peer_id) = PeerId::from_string(recipient_str) {
593 let recipient_pubkey = PublicKey::from_hex(&peer_id.pubkey)?;
594
595 let seal = serde_json::json!({
597 "pubkey": keys.public_key().to_hex(),
598 "kind": WEBRTC_KIND,
599 "content": serde_json::to_string(msg)?,
600 "tags": []
601 });
602
603 let ephemeral_keys = Keys::generate();
605
606 let encrypted_content = nip44::encrypt(
608 ephemeral_keys.secret_key(),
609 &recipient_pubkey,
610 &seal.to_string(),
611 nip44::Version::V2
612 )?;
613
614 let created_at = nostr::Timestamp::now();
616 let expiration = created_at + Duration::from_secs(5 * 60); let tags = vec![
619 Tag::parse(&["p", &recipient_pubkey.to_hex()])?,
620 Tag::parse(&["expiration", &expiration.as_u64().to_string()])?,
621 ];
622
623 let event = EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), encrypted_content, tags)
624 .to_event(&ephemeral_keys)?;
625
626 return Ok(event);
627 }
628 }
629
630 let tags = vec![
632 Tag::parse(&["l", HELLO_TAG])?,
633 Tag::parse(&["peerId", msg.peer_id()])?,
634 ];
635
636 let event = EventBuilder::new(Kind::Ephemeral(WEBRTC_KIND as u16), "", tags)
637 .to_event(keys)?;
638
639 Ok(event)
640 }
641
642 async fn handle_event(
648 &self,
649 relay: &str,
650 event: &nostr::Event,
651 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
652 ) -> Result<()> {
653 if event.kind != Kind::Ephemeral(WEBRTC_KIND as u16) {
655 return Ok(());
656 }
657
658 let get_tag = |name: &str| -> Option<String> {
660 event.tags.iter().find_map(|tag| {
661 let v: Vec<String> = tag.clone().to_vec();
662 if v.len() >= 2 && v[0] == name {
663 Some(v[1].clone())
664 } else {
665 None
666 }
667 })
668 };
669
670 let l_tag = get_tag("l");
672 if l_tag.as_deref() == Some(HELLO_TAG) {
673 let sender_pubkey = event.pubkey.to_hex();
674
675 if sender_pubkey == self.my_peer_id.pubkey {
677 return Ok(());
678 }
679
680 if let Some(their_uuid) = get_tag("peerId") {
681 debug!("Received hello from {} via {}", &sender_pubkey[..8], relay);
682 self.handle_hello(&sender_pubkey, &their_uuid, relay_write_tx)
683 .await?;
684 }
685 return Ok(());
686 }
687
688 let p_tag = get_tag("p");
690 if p_tag.as_deref() != Some(&self.keys.public_key().to_hex()) {
691 return Ok(());
693 }
694
695 if event.content.is_empty() {
697 return Ok(());
698 }
699
700 let seal: serde_json::Value = match nip44::decrypt(self.keys.secret_key(), &event.pubkey, &event.content) {
702 Ok(plaintext) => {
703 match serde_json::from_str(&plaintext) {
704 Ok(v) => v,
705 Err(_) => return Ok(()),
706 }
707 }
708 Err(_) => {
709 return Ok(());
711 }
712 };
713
714 let sender_pubkey = seal.get("pubkey")
716 .and_then(|v| v.as_str())
717 .ok_or_else(|| anyhow::anyhow!("Missing pubkey in seal"))?;
718
719 if sender_pubkey == self.my_peer_id.pubkey {
721 return Ok(());
722 }
723
724 let content = seal.get("content")
725 .and_then(|v| v.as_str())
726 .ok_or_else(|| anyhow::anyhow!("Missing content in seal"))?;
727
728 let raw_msg: serde_json::Value = serde_json::from_str(content)?;
729 let msg_type = raw_msg.get("type").and_then(|v| v.as_str()).unwrap_or("");
730
731 if raw_msg.get("targetPeerId").is_some() {
733 let target_peer = raw_msg
734 .get("targetPeerId")
735 .and_then(|v| v.as_str())
736 .unwrap_or("");
737 if target_peer != self.my_peer_id.to_string() {
738 return Ok(());
739 }
740
741 let peer_id = raw_msg.get("peerId").and_then(|v| v.as_str()).unwrap_or("");
742 let their_uuid = peer_id.split(':').nth(1).unwrap_or(peer_id);
743
744 match msg_type {
745 "offer" => {
746 let sdp = raw_msg.get("sdp").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
747 let offer = serde_json::json!({ "type": "offer", "sdp": sdp });
748 self.handle_offer(&sender_pubkey, their_uuid, offer, relay_write_tx).await?;
749 }
750 "answer" => {
751 let sdp = raw_msg.get("sdp").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
752 let answer = serde_json::json!({ "type": "answer", "sdp": sdp });
753 self.handle_answer(&sender_pubkey, their_uuid, answer).await?;
754 }
755 "candidate" => {
756 let candidate = raw_msg.get("candidate").and_then(|v| v.as_str()).unwrap_or("");
757 if !candidate.is_empty() {
758 let candidate_json = serde_json::json!({
759 "candidate": candidate,
760 "sdpMid": raw_msg.get("sdpMid"),
761 "sdpMLineIndex": raw_msg.get("sdpMLineIndex"),
762 });
763 self.handle_candidate(&sender_pubkey, their_uuid, candidate_json).await?;
764 }
765 }
766 "candidates" => {
767 let candidates = raw_msg
768 .get("candidates")
769 .and_then(|v| v.as_array())
770 .map(|entries| {
771 entries.iter().filter_map(|entry| {
772 if let Some(candidate_str) = entry.get("candidate").and_then(|v| v.as_str()).or_else(|| entry.as_str()) {
773 Some(serde_json::json!({
774 "candidate": candidate_str,
775 "sdpMid": entry.get("sdpMid"),
776 "sdpMLineIndex": entry.get("sdpMLineIndex"),
777 }))
778 } else {
779 None
780 }
781 }).collect::<Vec<_>>()
782 })
783 .unwrap_or_default();
784 self.handle_candidates(&sender_pubkey, their_uuid, candidates).await?;
785 }
786 _ => {}
787 }
788
789 return Ok(());
790 }
791
792 let msg: SignalingMessage = serde_json::from_value(raw_msg)?;
793
794 debug!(
795 "Received {} from {} via {} (gift-wrapped)",
796 msg.msg_type(),
797 &sender_pubkey[..8],
798 relay
799 );
800
801 match msg {
802 SignalingMessage::Hello { .. } => {
803 return Ok(());
805 }
806 SignalingMessage::Offer {
807 recipient,
808 peer_id: their_uuid,
809 offer,
810 } => {
811 if recipient != self.my_peer_id.to_string() {
812 return Ok(()); }
814 if let Err(e) = self.handle_offer(&sender_pubkey, &their_uuid, offer, relay_write_tx)
815 .await {
816 error!("handle_offer FAILED: sender={}, uuid={}, error={:?}", &sender_pubkey[..8.min(sender_pubkey.len())], their_uuid, e);
817 return Err(e);
818 }
819 }
820 SignalingMessage::Answer {
821 recipient,
822 peer_id: their_uuid,
823 answer,
824 } => {
825 if recipient != self.my_peer_id.to_string() {
826 return Ok(());
827 }
828 self.handle_answer(&sender_pubkey, &their_uuid, answer)
829 .await?;
830 }
831 SignalingMessage::Candidate {
832 recipient,
833 peer_id: their_uuid,
834 candidate,
835 } => {
836 if recipient != self.my_peer_id.to_string() {
837 return Ok(());
838 }
839 self.handle_candidate(&sender_pubkey, &their_uuid, candidate)
840 .await?;
841 }
842 SignalingMessage::Candidates {
843 recipient,
844 peer_id: their_uuid,
845 candidates,
846 } => {
847 if recipient != self.my_peer_id.to_string() {
848 return Ok(());
849 }
850 self.handle_candidates(&sender_pubkey, &their_uuid, candidates)
851 .await?;
852 }
853 }
854
855 Ok(())
856 }
857
858 async fn handle_hello(
860 &self,
861 sender_pubkey: &str,
862 their_uuid: &str,
863 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
864 ) -> Result<()> {
865 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
866 let peer_key = full_peer_id.to_string();
867
868 {
870 let peers = self.state.peers.read().await;
871 if let Some(entry) = peers.get(&peer_key) {
872 if entry.state == ConnectionState::Connected
874 || entry.state == ConnectionState::Connecting
875 {
876 return Ok(());
877 }
878 }
879 }
880
881 let pool = (self.peer_classifier)(sender_pubkey);
883
884 let pool_counts = self.get_pool_counts().await;
886 if !self.can_accept_peer(pool, &pool_counts) {
887 debug!("Ignoring hello from {} - pool {:?} is full", full_peer_id.short(), pool);
888 return Ok(());
889 }
890
891 let should_initiate = self.should_initiate(their_uuid);
893
894 let pool_satisfied = self.is_pool_satisfied(pool, &pool_counts);
897 let will_initiate = should_initiate && !pool_satisfied;
898
899 info!(
900 "Discovered peer: {} (pool: {:?}, initiate: {}, pool_satisfied: {})",
901 full_peer_id.short(),
902 pool,
903 will_initiate,
904 pool_satisfied
905 );
906
907 if !will_initiate && pool_satisfied {
910 debug!("Pool {:?} is satisfied, not tracking peer {}", pool, full_peer_id.short());
911 return Ok(());
912 }
913
914 {
916 let mut peers = self.state.peers.write().await;
917 peers.insert(
918 peer_key.clone(),
919 PeerEntry {
920 peer_id: full_peer_id.clone(),
921 direction: if will_initiate {
922 PeerDirection::Outbound
923 } else {
924 PeerDirection::Inbound
925 },
926 state: ConnectionState::Discovered,
927 last_seen: Instant::now(),
928 peer: None,
929 pool,
930 bytes_sent: 0,
931 bytes_received: 0,
932 },
933 );
934 }
935
936 if will_initiate {
938 self.initiate_connection(&full_peer_id, pool, relay_write_tx)
939 .await?;
940 }
941
942 Ok(())
943 }
944
945 async fn initiate_connection(
947 &self,
948 peer_id: &PeerId,
949 pool: PeerPool,
950 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
951 ) -> Result<()> {
952 let peer_key = peer_id.to_string();
953
954 info!("Initiating connection to {} (pool: {:?})", peer_id.short(), pool);
955
956 let mut peer = Peer::new_with_store_and_events(
958 peer_id.clone(),
959 PeerDirection::Outbound,
960 self.my_peer_id.clone(),
961 self.signaling_tx.clone(),
962 self.config.stun_servers.clone(),
963 self.store.clone(),
964 Some(self.state_event_tx.clone()),
965 )
966 .await?;
967
968 peer.setup_handlers().await?;
969
970 let offer = peer.connect().await?;
972
973 {
975 let mut peers = self.state.peers.write().await;
976 if let Some(entry) = peers.get_mut(&peer_key) {
977 entry.state = ConnectionState::Connecting;
978 entry.peer = Some(peer);
979 entry.pool = pool;
980 }
981 }
982
983 let offer_msg = SignalingMessage::Offer {
985 offer,
986 recipient: peer_id.to_string(),
987 peer_id: self.my_peer_id.uuid.clone(),
988 };
989 if relay_write_tx.send(offer_msg).is_err() {
990 warn!("Failed to broadcast offer to {}", peer_id.short());
991 }
992
993 info!("Sent offer to {}", peer_id.short());
994
995 Ok(())
996 }
997
998 async fn handle_offer(
1000 &self,
1001 sender_pubkey: &str,
1002 their_uuid: &str,
1003 offer: serde_json::Value,
1004 relay_write_tx: &tokio::sync::broadcast::Sender<SignalingMessage>,
1005 ) -> Result<()> {
1006 debug!("handle_offer ENTRY: sender={}, uuid={}", &sender_pubkey[..8.min(sender_pubkey.len())], their_uuid);
1007 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1008 let peer_key = full_peer_id.to_string();
1009
1010 let pool = (self.peer_classifier)(sender_pubkey);
1012
1013 info!("Received offer from {} (pool: {:?})", full_peer_id.short(), pool);
1014
1015 {
1017 let peers = self.state.peers.read().await;
1018 debug!("Checking for existing peer, peer_key: {}, known_peers: {}", peer_key, peers.len());
1019 if let Some(entry) = peers.get(&peer_key) {
1020 if entry.peer.is_some() {
1022 debug!("Already have peer {} with connection, skipping offer", full_peer_id.short());
1023 return Ok(());
1024 }
1025 debug!("Peer {} exists but has no connection, proceeding", full_peer_id.short());
1026 } else {
1027 debug!("Peer {} not found in peers map, will create new entry", full_peer_id.short());
1028 }
1029 }
1030
1031 let pool_counts = self.get_pool_counts().await;
1033 debug!("Pool counts: {:?}, checking can_accept_peer for {:?}", pool_counts, pool);
1034 if !self.can_accept_peer(pool, &pool_counts) {
1035 warn!("Rejecting offer from {} - pool {:?} is full", full_peer_id.short(), pool);
1036 return Ok(());
1037 }
1038 debug!("Pool check passed for {}", full_peer_id.short());
1039
1040 debug!("Creating peer connection for {}", full_peer_id.short());
1042 let mut peer = Peer::new_with_store_and_events(
1043 full_peer_id.clone(),
1044 PeerDirection::Inbound,
1045 self.my_peer_id.clone(),
1046 self.signaling_tx.clone(),
1047 self.config.stun_servers.clone(),
1048 self.store.clone(),
1049 Some(self.state_event_tx.clone()),
1050 )
1051 .await?;
1052 debug!("Peer connection created for {}", full_peer_id.short());
1053
1054 peer.setup_handlers().await?;
1055 debug!("Handlers set up for {}", full_peer_id.short());
1056
1057 let answer = peer.handle_offer(offer).await?;
1059 debug!("Answer created for {}", full_peer_id.short());
1060
1061 {
1063 let mut peers = self.state.peers.write().await;
1064 peers.insert(
1065 peer_key,
1066 PeerEntry {
1067 peer_id: full_peer_id.clone(),
1068 direction: PeerDirection::Inbound,
1069 state: ConnectionState::Connecting,
1070 last_seen: Instant::now(),
1071 peer: Some(peer),
1072 pool,
1073 bytes_sent: 0,
1074 bytes_received: 0,
1075 },
1076 );
1077 }
1078
1079 let answer_msg = SignalingMessage::Answer {
1083 answer,
1084 recipient: full_peer_id.to_string(),
1085 peer_id: self.my_peer_id.uuid.clone(),
1086 };
1087 if relay_write_tx.send(answer_msg).is_err() {
1088 warn!("Failed to send answer to {}", full_peer_id.short());
1089 }
1090 info!("Sent answer to {}", full_peer_id.short());
1091
1092 Ok(())
1093 }
1094
1095 async fn handle_answer(
1097 &self,
1098 sender_pubkey: &str,
1099 their_uuid: &str,
1100 answer: serde_json::Value,
1101 ) -> Result<()> {
1102 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1103 let peer_key = full_peer_id.to_string();
1104
1105 info!("Received answer from {}", full_peer_id.short());
1106
1107 let mut peers = self.state.peers.write().await;
1108 if let Some(entry) = peers.get_mut(&peer_key) {
1109 if entry.state == ConnectionState::Connected {
1111 debug!("Ignoring duplicate answer from {} - already connected", full_peer_id.short());
1112 return Ok(());
1113 }
1114 if let Some(ref mut peer) = entry.peer {
1115 use webrtc::peer_connection::signaling_state::RTCSignalingState;
1117 let signaling_state = peer.signaling_state();
1118 if signaling_state != RTCSignalingState::HaveLocalOffer {
1119 debug!(
1120 "Ignoring answer from {} - signaling state is {:?}, not HaveLocalOffer",
1121 full_peer_id.short(),
1122 signaling_state
1123 );
1124 return Ok(());
1125 }
1126 peer.handle_answer(answer).await?;
1127 info!("Applied answer from {}", full_peer_id.short());
1128 } else {
1129 debug!("Peer {} has no connection object", full_peer_id.short());
1130 }
1131 } else {
1132 debug!("No peer found for key: {}", peer_key);
1133 }
1134
1135 Ok(())
1136 }
1137
1138 async fn handle_candidate(
1140 &self,
1141 sender_pubkey: &str,
1142 their_uuid: &str,
1143 candidate: serde_json::Value,
1144 ) -> Result<()> {
1145 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1146 let peer_key = full_peer_id.to_string();
1147
1148 info!("Received ICE candidate from {}", full_peer_id.short());
1149
1150 let mut peers = self.state.peers.write().await;
1151 if let Some(entry) = peers.get_mut(&peer_key) {
1152 if let Some(ref mut peer) = entry.peer {
1153 peer.handle_candidate(candidate).await?;
1154 }
1155 }
1156
1157 Ok(())
1158 }
1159
1160 async fn handle_candidates(
1162 &self,
1163 sender_pubkey: &str,
1164 their_uuid: &str,
1165 candidates: Vec<serde_json::Value>,
1166 ) -> Result<()> {
1167 let full_peer_id = PeerId::new(sender_pubkey.to_string(), Some(their_uuid.to_string()));
1168 let peer_key = full_peer_id.to_string();
1169
1170 debug!(
1171 "Received {} candidates from {}",
1172 candidates.len(),
1173 full_peer_id.short()
1174 );
1175
1176 let mut peers = self.state.peers.write().await;
1177 if let Some(entry) = peers.get_mut(&peer_key) {
1178 if let Some(ref mut peer) = entry.peer {
1179 for candidate in candidates {
1180 if let Err(e) = peer.handle_candidate(candidate).await {
1181 debug!("Failed to add candidate: {}", e);
1182 }
1183 }
1184 }
1185 }
1186
1187 Ok(())
1188 }
1189
1190 async fn handle_peer_state_event(&self, event: PeerStateEvent) {
1192 match event {
1193 PeerStateEvent::Connected(peer_id) => {
1194 let peer_key = peer_id.to_string();
1195 let mut peers = self.state.peers.write().await;
1196 if let Some(entry) = peers.get_mut(&peer_key) {
1197 if entry.state != ConnectionState::Connected {
1198 info!("Peer {} connected (via state event)", peer_id.short());
1199 entry.state = ConnectionState::Connected;
1200 self.state.connected_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1202 }
1203 }
1204 }
1205 PeerStateEvent::Failed(peer_id) => {
1206 let peer_key = peer_id.to_string();
1207 info!("Peer {} connection failed - removing from pool", peer_id.short());
1208 let mut peers = self.state.peers.write().await;
1209 if let Some(entry) = peers.remove(&peer_key) {
1210 if entry.state == ConnectionState::Connected {
1212 self.state.connected_count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
1213 }
1214 if let Some(peer) = entry.peer {
1216 let _ = peer.close().await;
1217 }
1218 }
1219 }
1220 PeerStateEvent::Disconnected(peer_id) => {
1221 let peer_key = peer_id.to_string();
1222 info!("Peer {} disconnected - removing from pool", peer_id.short());
1223 let mut peers = self.state.peers.write().await;
1224 if let Some(entry) = peers.remove(&peer_key) {
1225 if entry.state == ConnectionState::Connected {
1227 self.state.connected_count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
1228 }
1229 if let Some(peer) = entry.peer {
1231 let _ = peer.close().await;
1232 }
1233 }
1234 }
1235 }
1236 }
1237
1238 async fn cleanup_stale_peers(&self) {
1240 let mut peers = self.state.peers.write().await;
1241 let mut connected_count = 0;
1242 let mut to_remove = Vec::new();
1243 let stale_timeout = Duration::from_secs(60); for (key, entry) in peers.iter_mut() {
1246 if let Some(ref peer) = entry.peer {
1247 if peer.is_connected() {
1249 if entry.state != ConnectionState::Connected {
1250 info!("Peer {} is now connected (sync fallback)", entry.peer_id.short());
1251 entry.state = ConnectionState::Connected;
1252 }
1253 connected_count += 1;
1254 } else if entry.state == ConnectionState::Connecting && entry.last_seen.elapsed() > stale_timeout {
1255 info!("Removing stale peer {} (stuck in Connecting for {:?})", entry.peer_id.short(), entry.last_seen.elapsed());
1257 to_remove.push(key.clone());
1258 }
1259 } else if entry.state == ConnectionState::Discovered && entry.last_seen.elapsed() > stale_timeout {
1260 debug!("Removing stale discovered peer {}", entry.peer_id.short());
1262 to_remove.push(key.clone());
1263 }
1264 }
1265
1266 for key in to_remove {
1268 if let Some(entry) = peers.remove(&key) {
1269 if let Some(peer) = entry.peer {
1270 let _ = peer.close().await;
1271 }
1272 }
1273 }
1274
1275 self.state
1276 .connected_count
1277 .store(connected_count, std::sync::atomic::Ordering::Relaxed);
1278 }
1279}
1280
1281#[allow(dead_code)]
1283#[derive(Debug, Clone)]
1284pub struct PeerState {
1285 pub peer_id: PeerId,
1286 pub direction: PeerDirection,
1287 pub state: String,
1288 pub last_seen: Instant,
1289}