1use anyhow::Result;
4use bytes::Bytes;
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::{mpsc, oneshot, Mutex, Notify};
8use tracing::{debug, error, info, warn};
9use webrtc::api::interceptor_registry::register_default_interceptors;
10use webrtc::api::media_engine::MediaEngine;
11use webrtc::api::setting_engine::SettingEngine;
12use webrtc::api::APIBuilder;
13use webrtc::data_channel::data_channel_init::RTCDataChannelInit;
14use webrtc::data_channel::data_channel_message::DataChannelMessage;
15use webrtc::data_channel::data_channel_state::RTCDataChannelState;
16use webrtc::data_channel::RTCDataChannel;
17use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
18use webrtc::ice_transport::ice_server::RTCIceServer;
19use webrtc::interceptor::registry::Registry;
20use webrtc::peer_connection::configuration::RTCConfiguration;
21use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
22use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
23use webrtc::peer_connection::RTCPeerConnection;
24
25use super::types::{
26 encode_message, encode_request, encode_response, hash_to_hex, parse_message, DataMessage,
27 DataRequest, DataResponse, PeerDirection, PeerId, PeerStateEvent, SignalingMessage,
28};
29use crate::nostr_relay::NostrRelay;
30use nostr::{ClientMessage as NostrClientMessage, JsonUtil as NostrJsonUtil};
31
32pub trait ContentStore: Send + Sync + 'static {
34 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>>;
36}
37
38pub struct PendingRequest {
40 pub hash: Vec<u8>,
41 pub response_tx: oneshot::Sender<Option<Vec<u8>>>,
42}
43
44pub struct Peer {
46 pub peer_id: PeerId,
47 pub direction: PeerDirection,
48 pub created_at: std::time::Instant,
49 pub connected_at: Option<std::time::Instant>,
50
51 pc: Arc<RTCPeerConnection>,
52 pub data_channel: Arc<Mutex<Option<Arc<RTCDataChannel>>>>,
54 signaling_tx: mpsc::Sender<SignalingMessage>,
55 my_peer_id: PeerId,
56
57 store: Option<Arc<dyn ContentStore>>,
59
60 pub pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
62
63 #[allow(dead_code)]
65 message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
66 #[allow(dead_code)]
67 message_rx: Option<mpsc::Receiver<(DataMessage, Option<Vec<u8>>)>>,
68
69 state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
71
72 nostr_relay: Option<Arc<NostrRelay>>,
74}
75
76impl Peer {
77 pub async fn new(
79 peer_id: PeerId,
80 direction: PeerDirection,
81 my_peer_id: PeerId,
82 signaling_tx: mpsc::Sender<SignalingMessage>,
83 stun_servers: Vec<String>,
84 ) -> Result<Self> {
85 Self::new_with_store_and_events(
86 peer_id,
87 direction,
88 my_peer_id,
89 signaling_tx,
90 stun_servers,
91 None,
92 None,
93 None,
94 )
95 .await
96 }
97
98 pub async fn new_with_store(
100 peer_id: PeerId,
101 direction: PeerDirection,
102 my_peer_id: PeerId,
103 signaling_tx: mpsc::Sender<SignalingMessage>,
104 stun_servers: Vec<String>,
105 store: Option<Arc<dyn ContentStore>>,
106 ) -> Result<Self> {
107 Self::new_with_store_and_events(
108 peer_id,
109 direction,
110 my_peer_id,
111 signaling_tx,
112 stun_servers,
113 store,
114 None,
115 None,
116 )
117 .await
118 }
119
120 pub async fn new_with_store_and_events(
122 peer_id: PeerId,
123 direction: PeerDirection,
124 my_peer_id: PeerId,
125 signaling_tx: mpsc::Sender<SignalingMessage>,
126 stun_servers: Vec<String>,
127 store: Option<Arc<dyn ContentStore>>,
128 state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
129 nostr_relay: Option<Arc<NostrRelay>>,
130 ) -> Result<Self> {
131 let mut m = MediaEngine::default();
133 m.register_default_codecs()?;
134
135 let mut registry = Registry::new();
136 registry = register_default_interceptors(registry, &mut m)?;
137
138 let setting_engine = SettingEngine::default();
141 let api = APIBuilder::new()
144 .with_media_engine(m)
145 .with_interceptor_registry(registry)
146 .with_setting_engine(setting_engine)
147 .build();
148
149 let ice_servers: Vec<RTCIceServer> = stun_servers
151 .iter()
152 .map(|url| RTCIceServer {
153 urls: vec![url.clone()],
154 ..Default::default()
155 })
156 .collect();
157
158 let config = RTCConfiguration {
159 ice_servers,
160 ..Default::default()
161 };
162
163 let pc = Arc::new(api.new_peer_connection(config).await?);
164 let (message_tx, message_rx) = mpsc::channel(100);
165 Ok(Self {
166 peer_id,
167 direction,
168 created_at: std::time::Instant::now(),
169 connected_at: None,
170 pc,
171 data_channel: Arc::new(Mutex::new(None)),
172 signaling_tx,
173 my_peer_id,
174 store,
175 pending_requests: Arc::new(Mutex::new(HashMap::new())),
176 message_tx,
177 message_rx: Some(message_rx),
178 state_event_tx,
179 nostr_relay,
180 })
181 }
182
183 pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
185 self.store = Some(store);
186 }
187
188 pub fn state(&self) -> RTCPeerConnectionState {
190 self.pc.connection_state()
191 }
192
193 pub fn signaling_state(&self) -> webrtc::peer_connection::signaling_state::RTCSignalingState {
195 self.pc.signaling_state()
196 }
197
198 pub fn is_connected(&self) -> bool {
200 self.pc.connection_state() == RTCPeerConnectionState::Connected
201 }
202
203 pub async fn setup_handlers(&mut self) -> Result<()> {
205 let peer_id = self.peer_id.clone();
206 let signaling_tx = self.signaling_tx.clone();
207 let my_peer_id_str = self.my_peer_id.to_string();
208 let recipient = self.peer_id.to_string();
209
210 self.pc
212 .on_ice_candidate(Box::new(move |candidate: Option<RTCIceCandidate>| {
213 let signaling_tx = signaling_tx.clone();
214 let my_peer_id_str = my_peer_id_str.clone();
215 let recipient = recipient.clone();
216
217 Box::pin(async move {
218 if let Some(c) = candidate {
219 if let Some(init) = c.to_json().ok() {
220 info!(
221 "ICE candidate generated: {}",
222 &init.candidate[..init.candidate.len().min(60)]
223 );
224 let msg = SignalingMessage::candidate(
225 serde_json::to_value(&init).unwrap_or_default(),
226 &recipient,
227 &my_peer_id_str,
228 );
229 if let Err(e) = signaling_tx.send(msg).await {
230 error!("Failed to send ICE candidate: {}", e);
231 }
232 }
233 }
234 })
235 }));
236
237 let peer_id_log = peer_id.clone();
239 let state_event_tx = self.state_event_tx.clone();
240 self.pc
241 .on_peer_connection_state_change(Box::new(move |state: RTCPeerConnectionState| {
242 let peer_id = peer_id_log.clone();
243 let state_event_tx = state_event_tx.clone();
244 Box::pin(async move {
245 info!("Peer {} connection state: {:?}", peer_id.short(), state);
246
247 if let Some(tx) = state_event_tx {
249 let event = match state {
250 RTCPeerConnectionState::Connected => {
251 Some(PeerStateEvent::Connected(peer_id))
252 }
253 RTCPeerConnectionState::Failed => Some(PeerStateEvent::Failed(peer_id)),
254 RTCPeerConnectionState::Disconnected
255 | RTCPeerConnectionState::Closed => {
256 Some(PeerStateEvent::Disconnected(peer_id))
257 }
258 _ => None,
259 };
260 if let Some(event) = event {
261 if let Err(e) = tx.send(event).await {
262 error!("Failed to send peer state event: {}", e);
263 }
264 }
265 }
266 })
267 }));
268
269 Ok(())
270 }
271
272 pub async fn connect(&mut self) -> Result<serde_json::Value> {
274 println!("[Peer {}] Creating data channel...", self.peer_id.short());
275 let dc_init = RTCDataChannelInit {
278 ordered: Some(false),
279 ..Default::default()
280 };
281 let dc = self
282 .pc
283 .create_data_channel("hashtree", Some(dc_init))
284 .await?;
285 println!(
286 "[Peer {}] Data channel created, setting up handlers...",
287 self.peer_id.short()
288 );
289 self.setup_data_channel(dc.clone()).await?;
290 println!(
291 "[Peer {}] Handlers set up, storing data channel...",
292 self.peer_id.short()
293 );
294 {
295 let mut dc_guard = self.data_channel.lock().await;
296 *dc_guard = Some(dc);
297 }
298 println!("[Peer {}] Data channel stored", self.peer_id.short());
299
300 let offer = self.pc.create_offer(None).await?;
303 let mut gathering_complete = self.pc.gathering_complete_promise().await;
304 self.pc.set_local_description(offer).await?;
305
306 let _ = tokio::time::timeout(
308 std::time::Duration::from_secs(10),
309 gathering_complete.recv(),
310 )
311 .await;
312
313 let local_desc = self
315 .pc
316 .local_description()
317 .await
318 .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
319
320 debug!(
321 "Offer created, SDP len: {}, ice_gathering: {:?}",
322 local_desc.sdp.len(),
323 self.pc.ice_gathering_state()
324 );
325
326 let offer_json = serde_json::json!({
328 "type": local_desc.sdp_type.to_string().to_lowercase(),
329 "sdp": local_desc.sdp
330 });
331
332 Ok(offer_json)
333 }
334
335 pub async fn handle_offer(&mut self, offer: serde_json::Value) -> Result<serde_json::Value> {
337 let sdp = offer
338 .get("sdp")
339 .and_then(|s| s.as_str())
340 .ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
341
342 let peer_id = self.peer_id.clone();
345 let message_tx = self.message_tx.clone();
346 let pending_requests = self.pending_requests.clone();
347 let store = self.store.clone();
348 let data_channel_holder = self.data_channel.clone();
349 let nostr_relay = self.nostr_relay.clone();
350 let peer_pubkey = Some(self.peer_id.pubkey.clone());
351
352 self.pc
353 .on_data_channel(Box::new(move |dc: Arc<RTCDataChannel>| {
354 let peer_id = peer_id.clone();
355 let message_tx = message_tx.clone();
356 let pending_requests = pending_requests.clone();
357 let store = store.clone();
358 let data_channel_holder = data_channel_holder.clone();
359 let nostr_relay = nostr_relay.clone();
360 let peer_pubkey = peer_pubkey.clone();
361
362 Box::pin(async move {
364 info!(
365 "Peer {} received data channel: {}",
366 peer_id.short(),
367 dc.label()
368 );
369
370 {
372 let mut dc_guard = data_channel_holder.lock().await;
373 *dc_guard = Some(dc.clone());
374 }
375
376 Self::setup_dc_handlers(
378 dc.clone(),
379 peer_id,
380 message_tx,
381 pending_requests,
382 store,
383 nostr_relay,
384 peer_pubkey,
385 )
386 .await;
387 })
388 }));
389
390 let offer_desc = RTCSessionDescription::offer(sdp.to_string())?;
392 self.pc.set_remote_description(offer_desc).await?;
393
394 let answer = self.pc.create_answer(None).await?;
397 let mut gathering_complete = self.pc.gathering_complete_promise().await;
398 self.pc.set_local_description(answer).await?;
399
400 let _ = tokio::time::timeout(
402 std::time::Duration::from_secs(10),
403 gathering_complete.recv(),
404 )
405 .await;
406
407 let local_desc = self
409 .pc
410 .local_description()
411 .await
412 .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
413
414 debug!(
415 "Answer created, SDP len: {}, ice_gathering: {:?}",
416 local_desc.sdp.len(),
417 self.pc.ice_gathering_state()
418 );
419
420 let answer_json = serde_json::json!({
421 "type": local_desc.sdp_type.to_string().to_lowercase(),
422 "sdp": local_desc.sdp
423 });
424
425 Ok(answer_json)
426 }
427
428 pub async fn handle_answer(&mut self, answer: serde_json::Value) -> Result<()> {
430 let sdp = answer
431 .get("sdp")
432 .and_then(|s| s.as_str())
433 .ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
434
435 let answer_desc = RTCSessionDescription::answer(sdp.to_string())?;
436 self.pc.set_remote_description(answer_desc).await?;
437
438 Ok(())
439 }
440
441 pub async fn handle_candidate(&mut self, candidate: serde_json::Value) -> Result<()> {
443 let candidate_str = candidate
444 .get("candidate")
445 .and_then(|c| c.as_str())
446 .unwrap_or("");
447
448 let sdp_mid = candidate
449 .get("sdpMid")
450 .and_then(|m| m.as_str())
451 .map(|s| s.to_string());
452
453 let sdp_mline_index = candidate
454 .get("sdpMLineIndex")
455 .and_then(|i| i.as_u64())
456 .map(|i| i as u16);
457
458 if !candidate_str.is_empty() {
459 use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
460 let init = RTCIceCandidateInit {
461 candidate: candidate_str.to_string(),
462 sdp_mid,
463 sdp_mline_index,
464 username_fragment: candidate
465 .get("usernameFragment")
466 .and_then(|u| u.as_str())
467 .map(|s| s.to_string()),
468 };
469 self.pc.add_ice_candidate(init).await?;
470 }
471
472 Ok(())
473 }
474
475 async fn setup_data_channel(&mut self, dc: Arc<RTCDataChannel>) -> Result<()> {
477 let peer_id = self.peer_id.clone();
478 let message_tx = self.message_tx.clone();
479 let pending_requests = self.pending_requests.clone();
480 let store = self.store.clone();
481 let nostr_relay = self.nostr_relay.clone();
482 let peer_pubkey = Some(self.peer_id.pubkey.clone());
483
484 Self::setup_dc_handlers(
485 dc,
486 peer_id,
487 message_tx,
488 pending_requests,
489 store,
490 nostr_relay,
491 peer_pubkey,
492 )
493 .await;
494 Ok(())
495 }
496
497 async fn setup_dc_handlers(
499 dc: Arc<RTCDataChannel>,
500 peer_id: PeerId,
501 message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
502 pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
503 store: Option<Arc<dyn ContentStore>>,
504 nostr_relay: Option<Arc<NostrRelay>>,
505 peer_pubkey: Option<String>,
506 ) {
507 let label = dc.label().to_string();
508 let peer_short = peer_id.short();
509
510 let _pending_binary: Arc<Mutex<Option<u32>>> = Arc::new(Mutex::new(None));
512
513 let open_notify = nostr_relay.as_ref().map(|_| Arc::new(Notify::new()));
514 if let Some(ref notify) = open_notify {
515 if dc.ready_state() == RTCDataChannelState::Open {
516 notify.notify_waiters();
517 }
518 }
519
520 let mut nostr_client_id: Option<u64> = None;
521 if let Some(relay) = nostr_relay.clone() {
522 let client_id = relay.next_client_id();
523 let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
524 relay
525 .register_client(client_id, nostr_tx, peer_pubkey.clone())
526 .await;
527 nostr_client_id = Some(client_id);
528
529 if let Some(notify) = open_notify.clone() {
530 let dc_for_send = dc.clone();
531 tokio::spawn(async move {
532 notify.notified().await;
533 while let Some(text) = nostr_rx.recv().await {
534 if dc_for_send.send_text(text).await.is_err() {
535 break;
536 }
537 }
538 });
539 }
540 }
541
542 if let (Some(relay), Some(client_id)) = (nostr_relay.clone(), nostr_client_id) {
543 dc.on_close(Box::new(move || {
544 let relay = relay.clone();
545 Box::pin(async move {
546 relay.unregister_client(client_id).await;
547 })
548 }));
549 }
550
551 let open_notify_clone = open_notify.clone();
552 let peer_short_open = peer_short.clone();
553 let label_clone = label.clone();
554 dc.on_open(Box::new(move || {
555 let peer_short_open = peer_short_open.clone();
556 let label_clone = label_clone.clone();
557 let open_notify = open_notify_clone.clone();
558 Box::pin(async move {
560 info!(
561 "[Peer {}] Data channel '{}' open",
562 peer_short_open, label_clone
563 );
564 if let Some(notify) = open_notify {
565 notify.notify_waiters();
566 }
567 })
568 }));
569
570 let dc_for_msg = dc.clone();
571 let peer_short_msg = peer_short.clone();
572 let _pending_binary_clone = _pending_binary.clone();
573 let store_clone = store.clone();
574 let nostr_relay_for_msg = nostr_relay.clone();
575 let nostr_client_id_for_msg = nostr_client_id;
576
577 dc.on_message(Box::new(move |msg: DataChannelMessage| {
578 let dc = dc_for_msg.clone();
579 let peer_short = peer_short_msg.clone();
580 let pending_requests = pending_requests.clone();
581 let _pending_binary = _pending_binary_clone.clone();
582 let _message_tx = message_tx.clone();
583 let store = store_clone.clone();
584 let nostr_relay = nostr_relay_for_msg.clone();
585 let nostr_client_id = nostr_client_id_for_msg;
586 let msg_data = msg.data.clone();
587
588 Box::pin(async move {
590 if msg.is_string {
591 if let Some(relay) = nostr_relay {
592 if let Ok(text) = std::str::from_utf8(&msg_data) {
593 if let Ok(nostr_msg) = NostrClientMessage::from_json(text) {
594 if let Some(client_id) = nostr_client_id {
595 relay.handle_client_message(client_id, nostr_msg).await;
596 }
597 }
598 }
599 }
600 return;
601 }
602 debug!(
604 "[Peer {}] Received {} bytes on data channel",
605 peer_short,
606 msg_data.len()
607 );
608 match parse_message(&msg_data) {
609 Ok(data_msg) => match data_msg {
610 DataMessage::Request(req) => {
611 let hash_hex = hash_to_hex(&req.h);
612 let hash_short = &hash_hex[..8.min(hash_hex.len())];
613 info!("[Peer {}] Received request for {}", peer_short, hash_short);
614
615 let data = if let Some(ref store) = store {
617 match store.get(&hash_hex) {
618 Ok(Some(data)) => {
619 info!(
620 "[Peer {}] Found {} in store ({} bytes)",
621 peer_short,
622 hash_short,
623 data.len()
624 );
625 Some(data)
626 }
627 Ok(None) => {
628 info!(
629 "[Peer {}] Hash {} not in store",
630 peer_short, hash_short
631 );
632 None
633 }
634 Err(e) => {
635 warn!("[Peer {}] Store error: {}", peer_short, e);
636 None
637 }
638 }
639 } else {
640 warn!(
641 "[Peer {}] No store configured - cannot serve requests",
642 peer_short
643 );
644 None
645 };
646
647 if let Some(data) = data {
649 let data_len = data.len();
650 let response = DataResponse { h: req.h, d: data };
651 if let Ok(wire) = encode_response(&response) {
652 if let Err(e) = dc.send(&Bytes::from(wire)).await {
653 error!(
654 "[Peer {}] Failed to send response: {}",
655 peer_short, e
656 );
657 } else {
658 info!(
659 "[Peer {}] Sent response for {} ({} bytes)",
660 peer_short, hash_short, data_len
661 );
662 }
663 }
664 } else {
665 info!("[Peer {}] Content not found for {}", peer_short, hash_short);
666 }
667 }
668 DataMessage::Response(res) => {
669 let hash_hex = hash_to_hex(&res.h);
670 let hash_short = &hash_hex[..8.min(hash_hex.len())];
671 debug!(
672 "[Peer {}] Received response for {} ({} bytes)",
673 peer_short,
674 hash_short,
675 res.d.len()
676 );
677
678 let mut pending = pending_requests.lock().await;
680 if let Some(req) = pending.remove(&hash_hex) {
681 let _ = req.response_tx.send(Some(res.d));
682 }
683 }
684 },
685 Err(e) => {
686 warn!("[Peer {}] Failed to parse message: {:?}", peer_short, e);
687 let hex_dump: String = msg_data
689 .iter()
690 .take(50)
691 .map(|b| format!("{:02x}", b))
692 .collect();
693 warn!("[Peer {}] Message hex: {}", peer_short, hex_dump);
694 }
695 }
696 })
697 }));
698 }
699
700 pub fn has_data_channel(&self) -> bool {
702 self.data_channel
704 .try_lock()
705 .map(|guard| guard.is_some())
706 .unwrap_or(false)
707 }
708
709 pub async fn request(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
711 let dc_guard = self.data_channel.lock().await;
712 let dc = dc_guard
713 .as_ref()
714 .ok_or_else(|| anyhow::anyhow!("No data channel"))?
715 .clone();
716 drop(dc_guard); let hash = hex::decode(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hex hash: {}", e))?;
720
721 let (tx, rx) = oneshot::channel();
723
724 {
726 let mut pending = self.pending_requests.lock().await;
727 pending.insert(
728 hash_hex.to_string(),
729 PendingRequest {
730 hash: hash.clone(),
731 response_tx: tx,
732 },
733 );
734 }
735
736 let req = DataRequest {
738 h: hash,
739 htl: crate::webrtc::types::MAX_HTL,
740 };
741 let wire = encode_request(&req)?;
742 dc.send(&Bytes::from(wire)).await?;
743
744 debug!(
745 "[Peer {}] Sent request for {}",
746 self.peer_id.short(),
747 &hash_hex[..8.min(hash_hex.len())]
748 );
749
750 match tokio::time::timeout(std::time::Duration::from_secs(10), rx).await {
752 Ok(Ok(data)) => Ok(data),
753 Ok(Err(_)) => {
754 Ok(None)
756 }
757 Err(_) => {
758 let mut pending = self.pending_requests.lock().await;
760 pending.remove(hash_hex);
761 Ok(None)
762 }
763 }
764 }
765
766 pub async fn send_message(&self, msg: &DataMessage) -> Result<()> {
768 let dc_guard = self.data_channel.lock().await;
769 if let Some(ref dc) = *dc_guard {
770 let wire = encode_message(msg)?;
771 dc.send(&Bytes::from(wire)).await?;
772 }
773 Ok(())
774 }
775
776 pub async fn close(&self) -> Result<()> {
778 {
779 let dc_guard = self.data_channel.lock().await;
780 if let Some(ref dc) = *dc_guard {
781 dc.close().await?;
782 }
783 }
784 self.pc.close().await?;
785 Ok(())
786 }
787}