Skip to main content

hashtree_cli/webrtc/
peer.rs

1//! WebRTC peer connection for hashtree data exchange
2
3use anyhow::Result;
4use bytes::Bytes;
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::{mpsc, oneshot, Mutex, Notify};
8use tracing::{debug, error, info, warn};
9use webrtc::api::interceptor_registry::register_default_interceptors;
10use webrtc::api::media_engine::MediaEngine;
11use webrtc::api::setting_engine::SettingEngine;
12use webrtc::api::APIBuilder;
13use webrtc::data_channel::data_channel_init::RTCDataChannelInit;
14use webrtc::data_channel::data_channel_message::DataChannelMessage;
15use webrtc::data_channel::data_channel_state::RTCDataChannelState;
16use webrtc::data_channel::RTCDataChannel;
17use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
18use webrtc::ice_transport::ice_server::RTCIceServer;
19use webrtc::interceptor::registry::Registry;
20use webrtc::peer_connection::configuration::RTCConfiguration;
21use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
22use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
23use webrtc::peer_connection::RTCPeerConnection;
24
25use super::types::{
26    encode_message, encode_request, encode_response, hash_to_hex, parse_message, DataMessage,
27    DataRequest, DataResponse, PeerDirection, PeerId, PeerStateEvent, SignalingMessage,
28};
29use crate::nostr_relay::NostrRelay;
30use nostr::{ClientMessage as NostrClientMessage, JsonUtil as NostrJsonUtil};
31
32/// Trait for content storage that can be used by WebRTC peers
33pub trait ContentStore: Send + Sync + 'static {
34    /// Get content by hex hash
35    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>>;
36}
37
38/// Pending request tracking (keyed by hash hex)
39pub struct PendingRequest {
40    pub hash: Vec<u8>,
41    pub response_tx: oneshot::Sender<Option<Vec<u8>>>,
42}
43
44/// WebRTC peer connection with data channel protocol
45pub struct Peer {
46    pub peer_id: PeerId,
47    pub direction: PeerDirection,
48    pub created_at: std::time::Instant,
49    pub connected_at: Option<std::time::Instant>,
50
51    pc: Arc<RTCPeerConnection>,
52    /// Data channel - can be set from callback when receiving channel from peer
53    pub data_channel: Arc<Mutex<Option<Arc<RTCDataChannel>>>>,
54    signaling_tx: mpsc::Sender<SignalingMessage>,
55    my_peer_id: PeerId,
56
57    // Content store for serving requests
58    store: Option<Arc<dyn ContentStore>>,
59
60    // Track pending outgoing requests (keyed by hash hex)
61    pub pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
62
63    // Channel for incoming data messages
64    #[allow(dead_code)]
65    message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
66    #[allow(dead_code)]
67    message_rx: Option<mpsc::Receiver<(DataMessage, Option<Vec<u8>>)>>,
68
69    // Optional channel to notify signaling layer of state changes
70    state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
71
72    // Optional Nostr relay for text messages over data channel
73    nostr_relay: Option<Arc<NostrRelay>>,
74}
75
76impl Peer {
77    /// Create a new peer connection
78    pub async fn new(
79        peer_id: PeerId,
80        direction: PeerDirection,
81        my_peer_id: PeerId,
82        signaling_tx: mpsc::Sender<SignalingMessage>,
83        stun_servers: Vec<String>,
84    ) -> Result<Self> {
85        Self::new_with_store_and_events(
86            peer_id,
87            direction,
88            my_peer_id,
89            signaling_tx,
90            stun_servers,
91            None,
92            None,
93            None,
94        )
95        .await
96    }
97
98    /// Create a new peer connection with content store
99    pub async fn new_with_store(
100        peer_id: PeerId,
101        direction: PeerDirection,
102        my_peer_id: PeerId,
103        signaling_tx: mpsc::Sender<SignalingMessage>,
104        stun_servers: Vec<String>,
105        store: Option<Arc<dyn ContentStore>>,
106    ) -> Result<Self> {
107        Self::new_with_store_and_events(
108            peer_id,
109            direction,
110            my_peer_id,
111            signaling_tx,
112            stun_servers,
113            store,
114            None,
115            None,
116        )
117        .await
118    }
119
120    /// Create a new peer connection with content store and state event channel
121    pub async fn new_with_store_and_events(
122        peer_id: PeerId,
123        direction: PeerDirection,
124        my_peer_id: PeerId,
125        signaling_tx: mpsc::Sender<SignalingMessage>,
126        stun_servers: Vec<String>,
127        store: Option<Arc<dyn ContentStore>>,
128        state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
129        nostr_relay: Option<Arc<NostrRelay>>,
130    ) -> Result<Self> {
131        // Create WebRTC API
132        let mut m = MediaEngine::default();
133        m.register_default_codecs()?;
134
135        let mut registry = Registry::new();
136        registry = register_default_interceptors(registry, &mut m)?;
137
138        // Enable mDNS temporarily for debugging
139        // Previously disabled due to https://github.com/webrtc-rs/webrtc/issues/616
140        let setting_engine = SettingEngine::default();
141        // Note: mDNS enabled by default
142
143        let api = APIBuilder::new()
144            .with_media_engine(m)
145            .with_interceptor_registry(registry)
146            .with_setting_engine(setting_engine)
147            .build();
148
149        // Configure ICE servers
150        let ice_servers: Vec<RTCIceServer> = stun_servers
151            .iter()
152            .map(|url| RTCIceServer {
153                urls: vec![url.clone()],
154                ..Default::default()
155            })
156            .collect();
157
158        let config = RTCConfiguration {
159            ice_servers,
160            ..Default::default()
161        };
162
163        let pc = Arc::new(api.new_peer_connection(config).await?);
164        let (message_tx, message_rx) = mpsc::channel(100);
165        Ok(Self {
166            peer_id,
167            direction,
168            created_at: std::time::Instant::now(),
169            connected_at: None,
170            pc,
171            data_channel: Arc::new(Mutex::new(None)),
172            signaling_tx,
173            my_peer_id,
174            store,
175            pending_requests: Arc::new(Mutex::new(HashMap::new())),
176            message_tx,
177            message_rx: Some(message_rx),
178            state_event_tx,
179            nostr_relay,
180        })
181    }
182
183    /// Set content store
184    pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
185        self.store = Some(store);
186    }
187
188    /// Get connection state
189    pub fn state(&self) -> RTCPeerConnectionState {
190        self.pc.connection_state()
191    }
192
193    /// Get signaling state
194    pub fn signaling_state(&self) -> webrtc::peer_connection::signaling_state::RTCSignalingState {
195        self.pc.signaling_state()
196    }
197
198    /// Check if connected
199    pub fn is_connected(&self) -> bool {
200        self.pc.connection_state() == RTCPeerConnectionState::Connected
201    }
202
203    /// Setup event handlers for the peer connection
204    pub async fn setup_handlers(&mut self) -> Result<()> {
205        let peer_id = self.peer_id.clone();
206        let signaling_tx = self.signaling_tx.clone();
207        let my_peer_id_str = self.my_peer_id.to_string();
208        let recipient = self.peer_id.to_string();
209
210        // Handle ICE candidates - work MUST be inside the returned future
211        self.pc
212            .on_ice_candidate(Box::new(move |candidate: Option<RTCIceCandidate>| {
213                let signaling_tx = signaling_tx.clone();
214                let my_peer_id_str = my_peer_id_str.clone();
215                let recipient = recipient.clone();
216
217                Box::pin(async move {
218                    if let Some(c) = candidate {
219                        if let Some(init) = c.to_json().ok() {
220                            info!(
221                                "ICE candidate generated: {}",
222                                &init.candidate[..init.candidate.len().min(60)]
223                            );
224                            let msg = SignalingMessage::candidate(
225                                serde_json::to_value(&init).unwrap_or_default(),
226                                &recipient,
227                                &my_peer_id_str,
228                            );
229                            if let Err(e) = signaling_tx.send(msg).await {
230                                error!("Failed to send ICE candidate: {}", e);
231                            }
232                        }
233                    }
234                })
235            }));
236
237        // Handle connection state changes - work MUST be inside the returned future
238        let peer_id_log = peer_id.clone();
239        let state_event_tx = self.state_event_tx.clone();
240        self.pc
241            .on_peer_connection_state_change(Box::new(move |state: RTCPeerConnectionState| {
242                let peer_id = peer_id_log.clone();
243                let state_event_tx = state_event_tx.clone();
244                Box::pin(async move {
245                    info!("Peer {} connection state: {:?}", peer_id.short(), state);
246
247                    // Notify signaling layer of state changes
248                    if let Some(tx) = state_event_tx {
249                        let event = match state {
250                            RTCPeerConnectionState::Connected => {
251                                Some(PeerStateEvent::Connected(peer_id))
252                            }
253                            RTCPeerConnectionState::Failed => Some(PeerStateEvent::Failed(peer_id)),
254                            RTCPeerConnectionState::Disconnected
255                            | RTCPeerConnectionState::Closed => {
256                                Some(PeerStateEvent::Disconnected(peer_id))
257                            }
258                            _ => None,
259                        };
260                        if let Some(event) = event {
261                            if let Err(e) = tx.send(event).await {
262                                error!("Failed to send peer state event: {}", e);
263                            }
264                        }
265                    }
266                })
267            }));
268
269        Ok(())
270    }
271
272    /// Initiate connection (create offer) - for outbound connections
273    pub async fn connect(&mut self) -> Result<serde_json::Value> {
274        println!("[Peer {}] Creating data channel...", self.peer_id.short());
275        // Create data channel first
276        // Use unordered for better performance - protocol is stateless (each message self-describes)
277        let dc_init = RTCDataChannelInit {
278            ordered: Some(false),
279            ..Default::default()
280        };
281        let dc = self
282            .pc
283            .create_data_channel("hashtree", Some(dc_init))
284            .await?;
285        println!(
286            "[Peer {}] Data channel created, setting up handlers...",
287            self.peer_id.short()
288        );
289        self.setup_data_channel(dc.clone()).await?;
290        println!(
291            "[Peer {}] Handlers set up, storing data channel...",
292            self.peer_id.short()
293        );
294        {
295            let mut dc_guard = self.data_channel.lock().await;
296            *dc_guard = Some(dc);
297        }
298        println!("[Peer {}] Data channel stored", self.peer_id.short());
299
300        // Create offer and wait for ICE gathering to complete
301        // This ensures all ICE candidates are embedded in the SDP
302        let offer = self.pc.create_offer(None).await?;
303        let mut gathering_complete = self.pc.gathering_complete_promise().await;
304        self.pc.set_local_description(offer).await?;
305
306        // Wait for ICE gathering to complete (with timeout)
307        let _ = tokio::time::timeout(
308            std::time::Duration::from_secs(10),
309            gathering_complete.recv(),
310        )
311        .await;
312
313        // Get the local description with ICE candidates embedded
314        let local_desc = self
315            .pc
316            .local_description()
317            .await
318            .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
319
320        debug!(
321            "Offer created, SDP len: {}, ice_gathering: {:?}",
322            local_desc.sdp.len(),
323            self.pc.ice_gathering_state()
324        );
325
326        // Return offer as JSON
327        let offer_json = serde_json::json!({
328            "type": local_desc.sdp_type.to_string().to_lowercase(),
329            "sdp": local_desc.sdp
330        });
331
332        Ok(offer_json)
333    }
334
335    /// Handle incoming offer and create answer
336    pub async fn handle_offer(&mut self, offer: serde_json::Value) -> Result<serde_json::Value> {
337        let sdp = offer
338            .get("sdp")
339            .and_then(|s| s.as_str())
340            .ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
341
342        // Setup data channel handler BEFORE set_remote_description
343        // This ensures the handler is registered before any data channel events fire
344        let peer_id = self.peer_id.clone();
345        let message_tx = self.message_tx.clone();
346        let pending_requests = self.pending_requests.clone();
347        let store = self.store.clone();
348        let data_channel_holder = self.data_channel.clone();
349        let nostr_relay = self.nostr_relay.clone();
350        let peer_pubkey = Some(self.peer_id.pubkey.clone());
351
352        self.pc
353            .on_data_channel(Box::new(move |dc: Arc<RTCDataChannel>| {
354                let peer_id = peer_id.clone();
355                let message_tx = message_tx.clone();
356                let pending_requests = pending_requests.clone();
357                let store = store.clone();
358                let data_channel_holder = data_channel_holder.clone();
359                let nostr_relay = nostr_relay.clone();
360                let peer_pubkey = peer_pubkey.clone();
361
362                // Work MUST be inside the returned future
363                Box::pin(async move {
364                    info!(
365                        "Peer {} received data channel: {}",
366                        peer_id.short(),
367                        dc.label()
368                    );
369
370                    // Store the received data channel
371                    {
372                        let mut dc_guard = data_channel_holder.lock().await;
373                        *dc_guard = Some(dc.clone());
374                    }
375
376                    // Set up message handlers
377                    Self::setup_dc_handlers(
378                        dc.clone(),
379                        peer_id,
380                        message_tx,
381                        pending_requests,
382                        store,
383                        nostr_relay,
384                        peer_pubkey,
385                    )
386                    .await;
387                })
388            }));
389
390        // Set remote description after handler is registered
391        let offer_desc = RTCSessionDescription::offer(sdp.to_string())?;
392        self.pc.set_remote_description(offer_desc).await?;
393
394        // Create answer and wait for ICE gathering to complete
395        // This ensures all ICE candidates are embedded in the SDP
396        let answer = self.pc.create_answer(None).await?;
397        let mut gathering_complete = self.pc.gathering_complete_promise().await;
398        self.pc.set_local_description(answer).await?;
399
400        // Wait for ICE gathering to complete (with timeout)
401        let _ = tokio::time::timeout(
402            std::time::Duration::from_secs(10),
403            gathering_complete.recv(),
404        )
405        .await;
406
407        // Get the local description with ICE candidates embedded
408        let local_desc = self
409            .pc
410            .local_description()
411            .await
412            .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
413
414        debug!(
415            "Answer created, SDP len: {}, ice_gathering: {:?}",
416            local_desc.sdp.len(),
417            self.pc.ice_gathering_state()
418        );
419
420        let answer_json = serde_json::json!({
421            "type": local_desc.sdp_type.to_string().to_lowercase(),
422            "sdp": local_desc.sdp
423        });
424
425        Ok(answer_json)
426    }
427
428    /// Handle incoming answer
429    pub async fn handle_answer(&mut self, answer: serde_json::Value) -> Result<()> {
430        let sdp = answer
431            .get("sdp")
432            .and_then(|s| s.as_str())
433            .ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
434
435        let answer_desc = RTCSessionDescription::answer(sdp.to_string())?;
436        self.pc.set_remote_description(answer_desc).await?;
437
438        Ok(())
439    }
440
441    /// Handle incoming ICE candidate
442    pub async fn handle_candidate(&mut self, candidate: serde_json::Value) -> Result<()> {
443        let candidate_str = candidate
444            .get("candidate")
445            .and_then(|c| c.as_str())
446            .unwrap_or("");
447
448        let sdp_mid = candidate
449            .get("sdpMid")
450            .and_then(|m| m.as_str())
451            .map(|s| s.to_string());
452
453        let sdp_mline_index = candidate
454            .get("sdpMLineIndex")
455            .and_then(|i| i.as_u64())
456            .map(|i| i as u16);
457
458        if !candidate_str.is_empty() {
459            use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
460            let init = RTCIceCandidateInit {
461                candidate: candidate_str.to_string(),
462                sdp_mid,
463                sdp_mline_index,
464                username_fragment: candidate
465                    .get("usernameFragment")
466                    .and_then(|u| u.as_str())
467                    .map(|s| s.to_string()),
468            };
469            self.pc.add_ice_candidate(init).await?;
470        }
471
472        Ok(())
473    }
474
475    /// Setup data channel handlers
476    async fn setup_data_channel(&mut self, dc: Arc<RTCDataChannel>) -> Result<()> {
477        let peer_id = self.peer_id.clone();
478        let message_tx = self.message_tx.clone();
479        let pending_requests = self.pending_requests.clone();
480        let store = self.store.clone();
481        let nostr_relay = self.nostr_relay.clone();
482        let peer_pubkey = Some(self.peer_id.pubkey.clone());
483
484        Self::setup_dc_handlers(
485            dc,
486            peer_id,
487            message_tx,
488            pending_requests,
489            store,
490            nostr_relay,
491            peer_pubkey,
492        )
493        .await;
494        Ok(())
495    }
496
497    /// Setup handlers for a data channel (shared between outbound and inbound)
498    async fn setup_dc_handlers(
499        dc: Arc<RTCDataChannel>,
500        peer_id: PeerId,
501        message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
502        pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
503        store: Option<Arc<dyn ContentStore>>,
504        nostr_relay: Option<Arc<NostrRelay>>,
505        peer_pubkey: Option<String>,
506    ) {
507        let label = dc.label().to_string();
508        let peer_short = peer_id.short();
509
510        // Track pending binary data (request_id -> expected after response)
511        let _pending_binary: Arc<Mutex<Option<u32>>> = Arc::new(Mutex::new(None));
512
513        let open_notify = nostr_relay.as_ref().map(|_| Arc::new(Notify::new()));
514        if let Some(ref notify) = open_notify {
515            if dc.ready_state() == RTCDataChannelState::Open {
516                notify.notify_waiters();
517            }
518        }
519
520        let mut nostr_client_id: Option<u64> = None;
521        if let Some(relay) = nostr_relay.clone() {
522            let client_id = relay.next_client_id();
523            let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
524            relay
525                .register_client(client_id, nostr_tx, peer_pubkey.clone())
526                .await;
527            nostr_client_id = Some(client_id);
528
529            if let Some(notify) = open_notify.clone() {
530                let dc_for_send = dc.clone();
531                tokio::spawn(async move {
532                    notify.notified().await;
533                    while let Some(text) = nostr_rx.recv().await {
534                        if dc_for_send.send_text(text).await.is_err() {
535                            break;
536                        }
537                    }
538                });
539            }
540        }
541
542        if let (Some(relay), Some(client_id)) = (nostr_relay.clone(), nostr_client_id) {
543            dc.on_close(Box::new(move || {
544                let relay = relay.clone();
545                Box::pin(async move {
546                    relay.unregister_client(client_id).await;
547                })
548            }));
549        }
550
551        let open_notify_clone = open_notify.clone();
552        let peer_short_open = peer_short.clone();
553        let label_clone = label.clone();
554        dc.on_open(Box::new(move || {
555            let peer_short_open = peer_short_open.clone();
556            let label_clone = label_clone.clone();
557            let open_notify = open_notify_clone.clone();
558            // Work MUST be inside the returned future
559            Box::pin(async move {
560                info!(
561                    "[Peer {}] Data channel '{}' open",
562                    peer_short_open, label_clone
563                );
564                if let Some(notify) = open_notify {
565                    notify.notify_waiters();
566                }
567            })
568        }));
569
570        let dc_for_msg = dc.clone();
571        let peer_short_msg = peer_short.clone();
572        let _pending_binary_clone = _pending_binary.clone();
573        let store_clone = store.clone();
574        let nostr_relay_for_msg = nostr_relay.clone();
575        let nostr_client_id_for_msg = nostr_client_id;
576
577        dc.on_message(Box::new(move |msg: DataChannelMessage| {
578            let dc = dc_for_msg.clone();
579            let peer_short = peer_short_msg.clone();
580            let pending_requests = pending_requests.clone();
581            let _pending_binary = _pending_binary_clone.clone();
582            let _message_tx = message_tx.clone();
583            let store = store_clone.clone();
584            let nostr_relay = nostr_relay_for_msg.clone();
585            let nostr_client_id = nostr_client_id_for_msg;
586            let msg_data = msg.data.clone();
587
588            // Work MUST be inside the returned future
589            Box::pin(async move {
590                if msg.is_string {
591                    if let Some(relay) = nostr_relay {
592                        if let Ok(text) = std::str::from_utf8(&msg_data) {
593                            if let Ok(nostr_msg) = NostrClientMessage::from_json(text) {
594                                if let Some(client_id) = nostr_client_id {
595                                    relay.handle_client_message(client_id, nostr_msg).await;
596                                }
597                            }
598                        }
599                    }
600                    return;
601                }
602                // All messages are binary with type prefix + MessagePack body
603                debug!(
604                    "[Peer {}] Received {} bytes on data channel",
605                    peer_short,
606                    msg_data.len()
607                );
608                match parse_message(&msg_data) {
609                    Ok(data_msg) => match data_msg {
610                        DataMessage::Request(req) => {
611                            let hash_hex = hash_to_hex(&req.h);
612                            let hash_short = &hash_hex[..8.min(hash_hex.len())];
613                            info!("[Peer {}] Received request for {}", peer_short, hash_short);
614
615                            // Handle request - look up in store
616                            let data = if let Some(ref store) = store {
617                                match store.get(&hash_hex) {
618                                    Ok(Some(data)) => {
619                                        info!(
620                                            "[Peer {}] Found {} in store ({} bytes)",
621                                            peer_short,
622                                            hash_short,
623                                            data.len()
624                                        );
625                                        Some(data)
626                                    }
627                                    Ok(None) => {
628                                        info!(
629                                            "[Peer {}] Hash {} not in store",
630                                            peer_short, hash_short
631                                        );
632                                        None
633                                    }
634                                    Err(e) => {
635                                        warn!("[Peer {}] Store error: {}", peer_short, e);
636                                        None
637                                    }
638                                }
639                            } else {
640                                warn!(
641                                    "[Peer {}] No store configured - cannot serve requests",
642                                    peer_short
643                                );
644                                None
645                            };
646
647                            // Send response only if we have data
648                            if let Some(data) = data {
649                                let data_len = data.len();
650                                let response = DataResponse { h: req.h, d: data };
651                                if let Ok(wire) = encode_response(&response) {
652                                    if let Err(e) = dc.send(&Bytes::from(wire)).await {
653                                        error!(
654                                            "[Peer {}] Failed to send response: {}",
655                                            peer_short, e
656                                        );
657                                    } else {
658                                        info!(
659                                            "[Peer {}] Sent response for {} ({} bytes)",
660                                            peer_short, hash_short, data_len
661                                        );
662                                    }
663                                }
664                            } else {
665                                info!("[Peer {}] Content not found for {}", peer_short, hash_short);
666                            }
667                        }
668                        DataMessage::Response(res) => {
669                            let hash_hex = hash_to_hex(&res.h);
670                            let hash_short = &hash_hex[..8.min(hash_hex.len())];
671                            debug!(
672                                "[Peer {}] Received response for {} ({} bytes)",
673                                peer_short,
674                                hash_short,
675                                res.d.len()
676                            );
677
678                            // Resolve the pending request by hash
679                            let mut pending = pending_requests.lock().await;
680                            if let Some(req) = pending.remove(&hash_hex) {
681                                let _ = req.response_tx.send(Some(res.d));
682                            }
683                        }
684                    },
685                    Err(e) => {
686                        warn!("[Peer {}] Failed to parse message: {:?}", peer_short, e);
687                        // Log hex dump of first 50 bytes for debugging
688                        let hex_dump: String = msg_data
689                            .iter()
690                            .take(50)
691                            .map(|b| format!("{:02x}", b))
692                            .collect();
693                        warn!("[Peer {}] Message hex: {}", peer_short, hex_dump);
694                    }
695                }
696            })
697        }));
698    }
699
700    /// Check if data channel is ready
701    pub fn has_data_channel(&self) -> bool {
702        // Use try_lock for non-async context
703        self.data_channel
704            .try_lock()
705            .map(|guard| guard.is_some())
706            .unwrap_or(false)
707    }
708
709    /// Request content by hash from this peer
710    pub async fn request(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
711        let dc_guard = self.data_channel.lock().await;
712        let dc = dc_guard
713            .as_ref()
714            .ok_or_else(|| anyhow::anyhow!("No data channel"))?
715            .clone();
716        drop(dc_guard); // Release lock before async operations
717
718        // Convert hex to binary hash
719        let hash = hex::decode(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hex hash: {}", e))?;
720
721        // Create response channel
722        let (tx, rx) = oneshot::channel();
723
724        // Store pending request (keyed by hash hex)
725        {
726            let mut pending = self.pending_requests.lock().await;
727            pending.insert(
728                hash_hex.to_string(),
729                PendingRequest {
730                    hash: hash.clone(),
731                    response_tx: tx,
732                },
733            );
734        }
735
736        // Send request with MAX_HTL (fresh request from us)
737        let req = DataRequest {
738            h: hash,
739            htl: crate::webrtc::types::MAX_HTL,
740        };
741        let wire = encode_request(&req)?;
742        dc.send(&Bytes::from(wire)).await?;
743
744        debug!(
745            "[Peer {}] Sent request for {}",
746            self.peer_id.short(),
747            &hash_hex[..8.min(hash_hex.len())]
748        );
749
750        // Wait for response with timeout
751        match tokio::time::timeout(std::time::Duration::from_secs(10), rx).await {
752            Ok(Ok(data)) => Ok(data),
753            Ok(Err(_)) => {
754                // Channel closed
755                Ok(None)
756            }
757            Err(_) => {
758                // Timeout - clean up pending request
759                let mut pending = self.pending_requests.lock().await;
760                pending.remove(hash_hex);
761                Ok(None)
762            }
763        }
764    }
765
766    /// Send a message over the data channel
767    pub async fn send_message(&self, msg: &DataMessage) -> Result<()> {
768        let dc_guard = self.data_channel.lock().await;
769        if let Some(ref dc) = *dc_guard {
770            let wire = encode_message(msg)?;
771            dc.send(&Bytes::from(wire)).await?;
772        }
773        Ok(())
774    }
775
776    /// Close the connection
777    pub async fn close(&self) -> Result<()> {
778        {
779            let dc_guard = self.data_channel.lock().await;
780            if let Some(ref dc) = *dc_guard {
781                dc.close().await?;
782            }
783        }
784        self.pc.close().await?;
785        Ok(())
786    }
787}