1use anyhow::Result;
4use bytes::Bytes;
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::{mpsc, oneshot, Mutex, Notify};
8use tracing::{debug, error, info, warn};
9use webrtc::api::interceptor_registry::register_default_interceptors;
10use webrtc::api::media_engine::MediaEngine;
11use webrtc::api::setting_engine::SettingEngine;
12use webrtc::api::APIBuilder;
13use webrtc::data_channel::data_channel_init::RTCDataChannelInit;
14use webrtc::data_channel::data_channel_message::DataChannelMessage;
15use webrtc::data_channel::RTCDataChannel;
16use webrtc::data_channel::data_channel_state::RTCDataChannelState;
17use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
18use webrtc::ice_transport::ice_server::RTCIceServer;
19use webrtc::interceptor::registry::Registry;
20use webrtc::peer_connection::configuration::RTCConfiguration;
21use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
22use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
23use webrtc::peer_connection::RTCPeerConnection;
24
25use crate::nostr_relay::NostrRelay;
26use super::types::{DataMessage, DataRequest, DataResponse, PeerDirection, PeerId, PeerStateEvent, SignalingMessage, encode_message, encode_request, encode_response, parse_message, hash_to_hex};
27use nostr::{ClientMessage as NostrClientMessage, JsonUtil as NostrJsonUtil};
28
29pub trait ContentStore: Send + Sync + 'static {
31 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>>;
33}
34
35pub struct PendingRequest {
37 pub hash: Vec<u8>,
38 pub response_tx: oneshot::Sender<Option<Vec<u8>>>,
39}
40
41pub struct Peer {
43 pub peer_id: PeerId,
44 pub direction: PeerDirection,
45 pub created_at: std::time::Instant,
46 pub connected_at: Option<std::time::Instant>,
47
48 pc: Arc<RTCPeerConnection>,
49 pub data_channel: Arc<Mutex<Option<Arc<RTCDataChannel>>>>,
51 signaling_tx: mpsc::Sender<SignalingMessage>,
52 my_peer_id: PeerId,
53
54 store: Option<Arc<dyn ContentStore>>,
56
57 pub pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
59
60 #[allow(dead_code)]
62 message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
63 #[allow(dead_code)]
64 message_rx: Option<mpsc::Receiver<(DataMessage, Option<Vec<u8>>)>>,
65
66 state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
68
69 nostr_relay: Option<Arc<NostrRelay>>,
71}
72
73impl Peer {
74 pub async fn new(
76 peer_id: PeerId,
77 direction: PeerDirection,
78 my_peer_id: PeerId,
79 signaling_tx: mpsc::Sender<SignalingMessage>,
80 stun_servers: Vec<String>,
81 ) -> Result<Self> {
82 Self::new_with_store_and_events(
83 peer_id,
84 direction,
85 my_peer_id,
86 signaling_tx,
87 stun_servers,
88 None,
89 None,
90 None,
91 )
92 .await
93 }
94
95 pub async fn new_with_store(
97 peer_id: PeerId,
98 direction: PeerDirection,
99 my_peer_id: PeerId,
100 signaling_tx: mpsc::Sender<SignalingMessage>,
101 stun_servers: Vec<String>,
102 store: Option<Arc<dyn ContentStore>>,
103 ) -> Result<Self> {
104 Self::new_with_store_and_events(
105 peer_id,
106 direction,
107 my_peer_id,
108 signaling_tx,
109 stun_servers,
110 store,
111 None,
112 None,
113 )
114 .await
115 }
116
117 pub async fn new_with_store_and_events(
119 peer_id: PeerId,
120 direction: PeerDirection,
121 my_peer_id: PeerId,
122 signaling_tx: mpsc::Sender<SignalingMessage>,
123 stun_servers: Vec<String>,
124 store: Option<Arc<dyn ContentStore>>,
125 state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
126 nostr_relay: Option<Arc<NostrRelay>>,
127 ) -> Result<Self> {
128 let mut m = MediaEngine::default();
130 m.register_default_codecs()?;
131
132 let mut registry = Registry::new();
133 registry = register_default_interceptors(registry, &mut m)?;
134
135 let setting_engine = SettingEngine::default();
138 let api = APIBuilder::new()
141 .with_media_engine(m)
142 .with_interceptor_registry(registry)
143 .with_setting_engine(setting_engine)
144 .build();
145
146 let ice_servers: Vec<RTCIceServer> = stun_servers
148 .iter()
149 .map(|url| RTCIceServer {
150 urls: vec![url.clone()],
151 ..Default::default()
152 })
153 .collect();
154
155 let config = RTCConfiguration {
156 ice_servers,
157 ..Default::default()
158 };
159
160 let pc = Arc::new(api.new_peer_connection(config).await?);
161 let (message_tx, message_rx) = mpsc::channel(100);
162 Ok(Self {
163 peer_id,
164 direction,
165 created_at: std::time::Instant::now(),
166 connected_at: None,
167 pc,
168 data_channel: Arc::new(Mutex::new(None)),
169 signaling_tx,
170 my_peer_id,
171 store,
172 pending_requests: Arc::new(Mutex::new(HashMap::new())),
173 message_tx,
174 message_rx: Some(message_rx),
175 state_event_tx,
176 nostr_relay,
177 })
178 }
179
180 pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
182 self.store = Some(store);
183 }
184
185 pub fn state(&self) -> RTCPeerConnectionState {
187 self.pc.connection_state()
188 }
189
190 pub fn signaling_state(&self) -> webrtc::peer_connection::signaling_state::RTCSignalingState {
192 self.pc.signaling_state()
193 }
194
195 pub fn is_connected(&self) -> bool {
197 self.pc.connection_state() == RTCPeerConnectionState::Connected
198 }
199
200 pub async fn setup_handlers(&mut self) -> Result<()> {
202 let peer_id = self.peer_id.clone();
203 let signaling_tx = self.signaling_tx.clone();
204 let my_peer_id_str = self.my_peer_id.to_string();
205 let recipient = self.peer_id.to_string();
206
207 self.pc
209 .on_ice_candidate(Box::new(move |candidate: Option<RTCIceCandidate>| {
210 let signaling_tx = signaling_tx.clone();
211 let my_peer_id_str = my_peer_id_str.clone();
212 let recipient = recipient.clone();
213
214 Box::pin(async move {
215 if let Some(c) = candidate {
216 if let Some(init) = c.to_json().ok() {
217 info!("ICE candidate generated: {}", &init.candidate[..init.candidate.len().min(60)]);
218 let msg = SignalingMessage::candidate(
219 serde_json::to_value(&init).unwrap_or_default(),
220 &recipient,
221 &my_peer_id_str,
222 );
223 if let Err(e) = signaling_tx.send(msg).await {
224 error!("Failed to send ICE candidate: {}", e);
225 }
226 }
227 }
228 })
229 }));
230
231 let peer_id_log = peer_id.clone();
233 let state_event_tx = self.state_event_tx.clone();
234 self.pc
235 .on_peer_connection_state_change(Box::new(move |state: RTCPeerConnectionState| {
236 let peer_id = peer_id_log.clone();
237 let state_event_tx = state_event_tx.clone();
238 Box::pin(async move {
239 info!("Peer {} connection state: {:?}", peer_id.short(), state);
240
241 if let Some(tx) = state_event_tx {
243 let event = match state {
244 RTCPeerConnectionState::Connected => Some(PeerStateEvent::Connected(peer_id)),
245 RTCPeerConnectionState::Failed => Some(PeerStateEvent::Failed(peer_id)),
246 RTCPeerConnectionState::Disconnected | RTCPeerConnectionState::Closed => {
247 Some(PeerStateEvent::Disconnected(peer_id))
248 }
249 _ => None,
250 };
251 if let Some(event) = event {
252 if let Err(e) = tx.send(event).await {
253 error!("Failed to send peer state event: {}", e);
254 }
255 }
256 }
257 })
258 }));
259
260 Ok(())
261 }
262
263 pub async fn connect(&mut self) -> Result<serde_json::Value> {
265 println!("[Peer {}] Creating data channel...", self.peer_id.short());
266 let dc_init = RTCDataChannelInit {
269 ordered: Some(false),
270 ..Default::default()
271 };
272 let dc = self.pc.create_data_channel("hashtree", Some(dc_init)).await?;
273 println!("[Peer {}] Data channel created, setting up handlers...", self.peer_id.short());
274 self.setup_data_channel(dc.clone()).await?;
275 println!("[Peer {}] Handlers set up, storing data channel...", self.peer_id.short());
276 {
277 let mut dc_guard = self.data_channel.lock().await;
278 *dc_guard = Some(dc);
279 }
280 println!("[Peer {}] Data channel stored", self.peer_id.short());
281
282 let offer = self.pc.create_offer(None).await?;
285 let mut gathering_complete = self.pc.gathering_complete_promise().await;
286 self.pc.set_local_description(offer).await?;
287
288 let _ = tokio::time::timeout(
290 std::time::Duration::from_secs(10),
291 gathering_complete.recv()
292 ).await;
293
294 let local_desc = self.pc.local_description().await
296 .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
297
298 debug!("Offer created, SDP len: {}, ice_gathering: {:?}",
299 local_desc.sdp.len(), self.pc.ice_gathering_state());
300
301 let offer_json = serde_json::json!({
303 "type": local_desc.sdp_type.to_string().to_lowercase(),
304 "sdp": local_desc.sdp
305 });
306
307 Ok(offer_json)
308 }
309
310 pub async fn handle_offer(&mut self, offer: serde_json::Value) -> Result<serde_json::Value> {
312 let sdp = offer
313 .get("sdp")
314 .and_then(|s| s.as_str())
315 .ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
316
317 let peer_id = self.peer_id.clone();
320 let message_tx = self.message_tx.clone();
321 let pending_requests = self.pending_requests.clone();
322 let store = self.store.clone();
323 let data_channel_holder = self.data_channel.clone();
324 let nostr_relay = self.nostr_relay.clone();
325 let peer_pubkey = Some(self.peer_id.pubkey.clone());
326
327 self.pc
328 .on_data_channel(Box::new(move |dc: Arc<RTCDataChannel>| {
329 let peer_id = peer_id.clone();
330 let message_tx = message_tx.clone();
331 let pending_requests = pending_requests.clone();
332 let store = store.clone();
333 let data_channel_holder = data_channel_holder.clone();
334 let nostr_relay = nostr_relay.clone();
335 let peer_pubkey = peer_pubkey.clone();
336
337 Box::pin(async move {
339 info!("Peer {} received data channel: {}", peer_id.short(), dc.label());
340
341 {
343 let mut dc_guard = data_channel_holder.lock().await;
344 *dc_guard = Some(dc.clone());
345 }
346
347 Self::setup_dc_handlers(
349 dc.clone(),
350 peer_id,
351 message_tx,
352 pending_requests,
353 store,
354 nostr_relay,
355 peer_pubkey,
356 )
357 .await;
358 })
359 }));
360
361 let offer_desc = RTCSessionDescription::offer(sdp.to_string())?;
363 self.pc.set_remote_description(offer_desc).await?;
364
365 let answer = self.pc.create_answer(None).await?;
368 let mut gathering_complete = self.pc.gathering_complete_promise().await;
369 self.pc.set_local_description(answer).await?;
370
371 let _ = tokio::time::timeout(
373 std::time::Duration::from_secs(10),
374 gathering_complete.recv()
375 ).await;
376
377 let local_desc = self.pc.local_description().await
379 .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
380
381 debug!("Answer created, SDP len: {}, ice_gathering: {:?}",
382 local_desc.sdp.len(), self.pc.ice_gathering_state());
383
384 let answer_json = serde_json::json!({
385 "type": local_desc.sdp_type.to_string().to_lowercase(),
386 "sdp": local_desc.sdp
387 });
388
389 Ok(answer_json)
390 }
391
392 pub async fn handle_answer(&mut self, answer: serde_json::Value) -> Result<()> {
394 let sdp = answer
395 .get("sdp")
396 .and_then(|s| s.as_str())
397 .ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
398
399 let answer_desc = RTCSessionDescription::answer(sdp.to_string())?;
400 self.pc.set_remote_description(answer_desc).await?;
401
402 Ok(())
403 }
404
405 pub async fn handle_candidate(&mut self, candidate: serde_json::Value) -> Result<()> {
407 let candidate_str = candidate
408 .get("candidate")
409 .and_then(|c| c.as_str())
410 .unwrap_or("");
411
412 let sdp_mid = candidate
413 .get("sdpMid")
414 .and_then(|m| m.as_str())
415 .map(|s| s.to_string());
416
417 let sdp_mline_index = candidate
418 .get("sdpMLineIndex")
419 .and_then(|i| i.as_u64())
420 .map(|i| i as u16);
421
422 if !candidate_str.is_empty() {
423 use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
424 let init = RTCIceCandidateInit {
425 candidate: candidate_str.to_string(),
426 sdp_mid,
427 sdp_mline_index,
428 username_fragment: candidate
429 .get("usernameFragment")
430 .and_then(|u| u.as_str())
431 .map(|s| s.to_string()),
432 };
433 self.pc.add_ice_candidate(init).await?;
434 }
435
436 Ok(())
437 }
438
439 async fn setup_data_channel(&mut self, dc: Arc<RTCDataChannel>) -> Result<()> {
441 let peer_id = self.peer_id.clone();
442 let message_tx = self.message_tx.clone();
443 let pending_requests = self.pending_requests.clone();
444 let store = self.store.clone();
445 let nostr_relay = self.nostr_relay.clone();
446 let peer_pubkey = Some(self.peer_id.pubkey.clone());
447
448 Self::setup_dc_handlers(
449 dc,
450 peer_id,
451 message_tx,
452 pending_requests,
453 store,
454 nostr_relay,
455 peer_pubkey,
456 )
457 .await;
458 Ok(())
459 }
460
461 async fn setup_dc_handlers(
463 dc: Arc<RTCDataChannel>,
464 peer_id: PeerId,
465 message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
466 pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
467 store: Option<Arc<dyn ContentStore>>,
468 nostr_relay: Option<Arc<NostrRelay>>,
469 peer_pubkey: Option<String>,
470 ) {
471 let label = dc.label().to_string();
472 let peer_short = peer_id.short();
473
474 let _pending_binary: Arc<Mutex<Option<u32>>> = Arc::new(Mutex::new(None));
476
477 let open_notify = nostr_relay.as_ref().map(|_| Arc::new(Notify::new()));
478 if let Some(ref notify) = open_notify {
479 if dc.ready_state() == RTCDataChannelState::Open {
480 notify.notify_waiters();
481 }
482 }
483
484 let mut nostr_client_id: Option<u64> = None;
485 if let Some(relay) = nostr_relay.clone() {
486 let client_id = relay.next_client_id();
487 let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
488 relay.register_client(client_id, nostr_tx, peer_pubkey.clone()).await;
489 nostr_client_id = Some(client_id);
490
491 if let Some(notify) = open_notify.clone() {
492 let dc_for_send = dc.clone();
493 tokio::spawn(async move {
494 notify.notified().await;
495 while let Some(text) = nostr_rx.recv().await {
496 if dc_for_send.send_text(text).await.is_err() {
497 break;
498 }
499 }
500 });
501 }
502 }
503
504 if let (Some(relay), Some(client_id)) = (nostr_relay.clone(), nostr_client_id) {
505 dc.on_close(Box::new(move || {
506 let relay = relay.clone();
507 Box::pin(async move {
508 relay.unregister_client(client_id).await;
509 })
510 }));
511 }
512
513 let open_notify_clone = open_notify.clone();
514 let peer_short_open = peer_short.clone();
515 let label_clone = label.clone();
516 dc.on_open(Box::new(move || {
517 let peer_short_open = peer_short_open.clone();
518 let label_clone = label_clone.clone();
519 let open_notify = open_notify_clone.clone();
520 Box::pin(async move {
522 info!("[Peer {}] Data channel '{}' open", peer_short_open, label_clone);
523 if let Some(notify) = open_notify {
524 notify.notify_waiters();
525 }
526 })
527 }));
528
529 let dc_for_msg = dc.clone();
530 let peer_short_msg = peer_short.clone();
531 let _pending_binary_clone = _pending_binary.clone();
532 let store_clone = store.clone();
533 let nostr_relay_for_msg = nostr_relay.clone();
534 let nostr_client_id_for_msg = nostr_client_id;
535
536 dc.on_message(Box::new(move |msg: DataChannelMessage| {
537 let dc = dc_for_msg.clone();
538 let peer_short = peer_short_msg.clone();
539 let pending_requests = pending_requests.clone();
540 let _pending_binary = _pending_binary_clone.clone();
541 let _message_tx = message_tx.clone();
542 let store = store_clone.clone();
543 let nostr_relay = nostr_relay_for_msg.clone();
544 let nostr_client_id = nostr_client_id_for_msg;
545 let msg_data = msg.data.clone();
546
547 Box::pin(async move {
549 if msg.is_string {
550 if let Some(relay) = nostr_relay {
551 if let Ok(text) = std::str::from_utf8(&msg_data) {
552 if let Ok(nostr_msg) = NostrClientMessage::from_json(text) {
553 if let Some(client_id) = nostr_client_id {
554 relay.handle_client_message(client_id, nostr_msg).await;
555 }
556 }
557 }
558 }
559 return;
560 }
561 debug!("[Peer {}] Received {} bytes on data channel", peer_short, msg_data.len());
563 match parse_message(&msg_data) {
564 Ok(data_msg) => match data_msg {
565 DataMessage::Request(req) => {
566 let hash_hex = hash_to_hex(&req.h);
567 let hash_short = &hash_hex[..8.min(hash_hex.len())];
568 info!(
569 "[Peer {}] Received request for {}",
570 peer_short, hash_short
571 );
572
573 let data = if let Some(ref store) = store {
575 match store.get(&hash_hex) {
576 Ok(Some(data)) => {
577 info!("[Peer {}] Found {} in store ({} bytes)", peer_short, hash_short, data.len());
578 Some(data)
579 },
580 Ok(None) => {
581 info!("[Peer {}] Hash {} not in store", peer_short, hash_short);
582 None
583 },
584 Err(e) => {
585 warn!("[Peer {}] Store error: {}", peer_short, e);
586 None
587 }
588 }
589 } else {
590 warn!("[Peer {}] No store configured - cannot serve requests", peer_short);
591 None
592 };
593
594 if let Some(data) = data {
596 let data_len = data.len();
597 let response = DataResponse {
598 h: req.h,
599 d: data,
600 };
601 if let Ok(wire) = encode_response(&response) {
602 if let Err(e) = dc.send(&Bytes::from(wire)).await {
603 error!(
604 "[Peer {}] Failed to send response: {}",
605 peer_short, e
606 );
607 } else {
608 info!(
609 "[Peer {}] Sent response for {} ({} bytes)",
610 peer_short, hash_short, data_len
611 );
612 }
613 }
614 } else {
615 info!("[Peer {}] Content not found for {}", peer_short, hash_short);
616 }
617 }
618 DataMessage::Response(res) => {
619 let hash_hex = hash_to_hex(&res.h);
620 let hash_short = &hash_hex[..8.min(hash_hex.len())];
621 debug!(
622 "[Peer {}] Received response for {} ({} bytes)",
623 peer_short, hash_short, res.d.len()
624 );
625
626 let mut pending = pending_requests.lock().await;
628 if let Some(req) = pending.remove(&hash_hex) {
629 let _ = req.response_tx.send(Some(res.d));
630 }
631 }
632 },
633 Err(e) => {
634 warn!("[Peer {}] Failed to parse message: {:?}", peer_short, e);
635 let hex_dump: String = msg_data.iter().take(50).map(|b| format!("{:02x}", b)).collect();
637 warn!("[Peer {}] Message hex: {}", peer_short, hex_dump);
638 }
639 }
640 })
641 }));
642 }
643
644 pub fn has_data_channel(&self) -> bool {
646 self.data_channel
648 .try_lock()
649 .map(|guard| guard.is_some())
650 .unwrap_or(false)
651 }
652
653 pub async fn request(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
655 let dc_guard = self.data_channel.lock().await;
656 let dc = dc_guard
657 .as_ref()
658 .ok_or_else(|| anyhow::anyhow!("No data channel"))?
659 .clone();
660 drop(dc_guard); let hash = hex::decode(hash_hex)
664 .map_err(|e| anyhow::anyhow!("Invalid hex hash: {}", e))?;
665
666 let (tx, rx) = oneshot::channel();
668
669 {
671 let mut pending = self.pending_requests.lock().await;
672 pending.insert(
673 hash_hex.to_string(),
674 PendingRequest {
675 hash: hash.clone(),
676 response_tx: tx,
677 },
678 );
679 }
680
681 let req = DataRequest {
683 h: hash,
684 htl: crate::webrtc::types::MAX_HTL,
685 };
686 let wire = encode_request(&req)?;
687 dc.send(&Bytes::from(wire)).await?;
688
689 debug!(
690 "[Peer {}] Sent request for {}",
691 self.peer_id.short(),
692 &hash_hex[..8.min(hash_hex.len())]
693 );
694
695 match tokio::time::timeout(std::time::Duration::from_secs(10), rx).await {
697 Ok(Ok(data)) => Ok(data),
698 Ok(Err(_)) => {
699 Ok(None)
701 }
702 Err(_) => {
703 let mut pending = self.pending_requests.lock().await;
705 pending.remove(hash_hex);
706 Ok(None)
707 }
708 }
709 }
710
711 pub async fn send_message(&self, msg: &DataMessage) -> Result<()> {
713 let dc_guard = self.data_channel.lock().await;
714 if let Some(ref dc) = *dc_guard {
715 let wire = encode_message(msg)?;
716 dc.send(&Bytes::from(wire)).await?;
717 }
718 Ok(())
719 }
720
721 pub async fn close(&self) -> Result<()> {
723 {
724 let dc_guard = self.data_channel.lock().await;
725 if let Some(ref dc) = *dc_guard {
726 dc.close().await?;
727 }
728 }
729 self.pc.close().await?;
730 Ok(())
731 }
732}