Skip to main content

hashtree_cli/webrtc/
peer.rs

1//! WebRTC peer connection for hashtree data exchange
2
3use anyhow::Result;
4use bytes::Bytes;
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::{mpsc, oneshot, Mutex, Notify};
8use tracing::{debug, error, info, warn};
9use webrtc::api::interceptor_registry::register_default_interceptors;
10use webrtc::api::media_engine::MediaEngine;
11use webrtc::api::setting_engine::SettingEngine;
12use webrtc::api::APIBuilder;
13use webrtc::data_channel::data_channel_init::RTCDataChannelInit;
14use webrtc::data_channel::data_channel_message::DataChannelMessage;
15use webrtc::data_channel::RTCDataChannel;
16use webrtc::data_channel::data_channel_state::RTCDataChannelState;
17use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
18use webrtc::ice_transport::ice_server::RTCIceServer;
19use webrtc::interceptor::registry::Registry;
20use webrtc::peer_connection::configuration::RTCConfiguration;
21use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
22use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
23use webrtc::peer_connection::RTCPeerConnection;
24
25use crate::nostr_relay::NostrRelay;
26use super::types::{DataMessage, DataRequest, DataResponse, PeerDirection, PeerId, PeerStateEvent, SignalingMessage, encode_message, encode_request, encode_response, parse_message, hash_to_hex};
27use nostr::{ClientMessage as NostrClientMessage, JsonUtil as NostrJsonUtil};
28
29/// Trait for content storage that can be used by WebRTC peers
30pub trait ContentStore: Send + Sync + 'static {
31    /// Get content by hex hash
32    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>>;
33}
34
35/// Pending request tracking (keyed by hash hex)
36pub struct PendingRequest {
37    pub hash: Vec<u8>,
38    pub response_tx: oneshot::Sender<Option<Vec<u8>>>,
39}
40
41/// WebRTC peer connection with data channel protocol
42pub struct Peer {
43    pub peer_id: PeerId,
44    pub direction: PeerDirection,
45    pub created_at: std::time::Instant,
46    pub connected_at: Option<std::time::Instant>,
47
48    pc: Arc<RTCPeerConnection>,
49    /// Data channel - can be set from callback when receiving channel from peer
50    pub data_channel: Arc<Mutex<Option<Arc<RTCDataChannel>>>>,
51    signaling_tx: mpsc::Sender<SignalingMessage>,
52    my_peer_id: PeerId,
53
54    // Content store for serving requests
55    store: Option<Arc<dyn ContentStore>>,
56
57    // Track pending outgoing requests (keyed by hash hex)
58    pub pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
59
60    // Channel for incoming data messages
61    #[allow(dead_code)]
62    message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
63    #[allow(dead_code)]
64    message_rx: Option<mpsc::Receiver<(DataMessage, Option<Vec<u8>>)>>,
65
66    // Optional channel to notify signaling layer of state changes
67    state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
68
69    // Optional Nostr relay for text messages over data channel
70    nostr_relay: Option<Arc<NostrRelay>>,
71}
72
73impl Peer {
74    /// Create a new peer connection
75    pub async fn new(
76        peer_id: PeerId,
77        direction: PeerDirection,
78        my_peer_id: PeerId,
79        signaling_tx: mpsc::Sender<SignalingMessage>,
80        stun_servers: Vec<String>,
81    ) -> Result<Self> {
82        Self::new_with_store_and_events(
83            peer_id,
84            direction,
85            my_peer_id,
86            signaling_tx,
87            stun_servers,
88            None,
89            None,
90            None,
91        )
92        .await
93    }
94
95    /// Create a new peer connection with content store
96    pub async fn new_with_store(
97        peer_id: PeerId,
98        direction: PeerDirection,
99        my_peer_id: PeerId,
100        signaling_tx: mpsc::Sender<SignalingMessage>,
101        stun_servers: Vec<String>,
102        store: Option<Arc<dyn ContentStore>>,
103    ) -> Result<Self> {
104        Self::new_with_store_and_events(
105            peer_id,
106            direction,
107            my_peer_id,
108            signaling_tx,
109            stun_servers,
110            store,
111            None,
112            None,
113        )
114        .await
115    }
116
117    /// Create a new peer connection with content store and state event channel
118    pub async fn new_with_store_and_events(
119        peer_id: PeerId,
120        direction: PeerDirection,
121        my_peer_id: PeerId,
122        signaling_tx: mpsc::Sender<SignalingMessage>,
123        stun_servers: Vec<String>,
124        store: Option<Arc<dyn ContentStore>>,
125        state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
126        nostr_relay: Option<Arc<NostrRelay>>,
127    ) -> Result<Self> {
128        // Create WebRTC API
129        let mut m = MediaEngine::default();
130        m.register_default_codecs()?;
131
132        let mut registry = Registry::new();
133        registry = register_default_interceptors(registry, &mut m)?;
134
135        // Enable mDNS temporarily for debugging
136        // Previously disabled due to https://github.com/webrtc-rs/webrtc/issues/616
137        let setting_engine = SettingEngine::default();
138        // Note: mDNS enabled by default
139
140        let api = APIBuilder::new()
141            .with_media_engine(m)
142            .with_interceptor_registry(registry)
143            .with_setting_engine(setting_engine)
144            .build();
145
146        // Configure ICE servers
147        let ice_servers: Vec<RTCIceServer> = stun_servers
148            .iter()
149            .map(|url| RTCIceServer {
150                urls: vec![url.clone()],
151                ..Default::default()
152            })
153            .collect();
154
155        let config = RTCConfiguration {
156            ice_servers,
157            ..Default::default()
158        };
159
160        let pc = Arc::new(api.new_peer_connection(config).await?);
161        let (message_tx, message_rx) = mpsc::channel(100);
162        Ok(Self {
163            peer_id,
164            direction,
165            created_at: std::time::Instant::now(),
166            connected_at: None,
167            pc,
168            data_channel: Arc::new(Mutex::new(None)),
169            signaling_tx,
170            my_peer_id,
171            store,
172            pending_requests: Arc::new(Mutex::new(HashMap::new())),
173            message_tx,
174            message_rx: Some(message_rx),
175            state_event_tx,
176            nostr_relay,
177        })
178    }
179
180    /// Set content store
181    pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
182        self.store = Some(store);
183    }
184
185    /// Get connection state
186    pub fn state(&self) -> RTCPeerConnectionState {
187        self.pc.connection_state()
188    }
189
190    /// Get signaling state
191    pub fn signaling_state(&self) -> webrtc::peer_connection::signaling_state::RTCSignalingState {
192        self.pc.signaling_state()
193    }
194
195    /// Check if connected
196    pub fn is_connected(&self) -> bool {
197        self.pc.connection_state() == RTCPeerConnectionState::Connected
198    }
199
200    /// Setup event handlers for the peer connection
201    pub async fn setup_handlers(&mut self) -> Result<()> {
202        let peer_id = self.peer_id.clone();
203        let signaling_tx = self.signaling_tx.clone();
204        let my_peer_id_str = self.my_peer_id.to_string();
205        let recipient = self.peer_id.to_string();
206
207        // Handle ICE candidates - work MUST be inside the returned future
208        self.pc
209            .on_ice_candidate(Box::new(move |candidate: Option<RTCIceCandidate>| {
210                let signaling_tx = signaling_tx.clone();
211                let my_peer_id_str = my_peer_id_str.clone();
212                let recipient = recipient.clone();
213
214                Box::pin(async move {
215                    if let Some(c) = candidate {
216                        if let Some(init) = c.to_json().ok() {
217                            info!("ICE candidate generated: {}", &init.candidate[..init.candidate.len().min(60)]);
218                            let msg = SignalingMessage::candidate(
219                                serde_json::to_value(&init).unwrap_or_default(),
220                                &recipient,
221                                &my_peer_id_str,
222                            );
223                            if let Err(e) = signaling_tx.send(msg).await {
224                                error!("Failed to send ICE candidate: {}", e);
225                            }
226                        }
227                    }
228                })
229            }));
230
231        // Handle connection state changes - work MUST be inside the returned future
232        let peer_id_log = peer_id.clone();
233        let state_event_tx = self.state_event_tx.clone();
234        self.pc
235            .on_peer_connection_state_change(Box::new(move |state: RTCPeerConnectionState| {
236                let peer_id = peer_id_log.clone();
237                let state_event_tx = state_event_tx.clone();
238                Box::pin(async move {
239                    info!("Peer {} connection state: {:?}", peer_id.short(), state);
240
241                    // Notify signaling layer of state changes
242                    if let Some(tx) = state_event_tx {
243                        let event = match state {
244                            RTCPeerConnectionState::Connected => Some(PeerStateEvent::Connected(peer_id)),
245                            RTCPeerConnectionState::Failed => Some(PeerStateEvent::Failed(peer_id)),
246                            RTCPeerConnectionState::Disconnected | RTCPeerConnectionState::Closed => {
247                                Some(PeerStateEvent::Disconnected(peer_id))
248                            }
249                            _ => None,
250                        };
251                        if let Some(event) = event {
252                            if let Err(e) = tx.send(event).await {
253                                error!("Failed to send peer state event: {}", e);
254                            }
255                        }
256                    }
257                })
258            }));
259
260        Ok(())
261    }
262
263    /// Initiate connection (create offer) - for outbound connections
264    pub async fn connect(&mut self) -> Result<serde_json::Value> {
265        println!("[Peer {}] Creating data channel...", self.peer_id.short());
266        // Create data channel first
267        // Use unordered for better performance - protocol is stateless (each message self-describes)
268        let dc_init = RTCDataChannelInit {
269            ordered: Some(false),
270            ..Default::default()
271        };
272        let dc = self.pc.create_data_channel("hashtree", Some(dc_init)).await?;
273        println!("[Peer {}] Data channel created, setting up handlers...", self.peer_id.short());
274        self.setup_data_channel(dc.clone()).await?;
275        println!("[Peer {}] Handlers set up, storing data channel...", self.peer_id.short());
276        {
277            let mut dc_guard = self.data_channel.lock().await;
278            *dc_guard = Some(dc);
279        }
280        println!("[Peer {}] Data channel stored", self.peer_id.short());
281
282        // Create offer and wait for ICE gathering to complete
283        // This ensures all ICE candidates are embedded in the SDP
284        let offer = self.pc.create_offer(None).await?;
285        let mut gathering_complete = self.pc.gathering_complete_promise().await;
286        self.pc.set_local_description(offer).await?;
287
288        // Wait for ICE gathering to complete (with timeout)
289        let _ = tokio::time::timeout(
290            std::time::Duration::from_secs(10),
291            gathering_complete.recv()
292        ).await;
293
294        // Get the local description with ICE candidates embedded
295        let local_desc = self.pc.local_description().await
296            .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
297
298        debug!("Offer created, SDP len: {}, ice_gathering: {:?}",
299            local_desc.sdp.len(), self.pc.ice_gathering_state());
300
301        // Return offer as JSON
302        let offer_json = serde_json::json!({
303            "type": local_desc.sdp_type.to_string().to_lowercase(),
304            "sdp": local_desc.sdp
305        });
306
307        Ok(offer_json)
308    }
309
310    /// Handle incoming offer and create answer
311    pub async fn handle_offer(&mut self, offer: serde_json::Value) -> Result<serde_json::Value> {
312        let sdp = offer
313            .get("sdp")
314            .and_then(|s| s.as_str())
315            .ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
316
317        // Setup data channel handler BEFORE set_remote_description
318        // This ensures the handler is registered before any data channel events fire
319        let peer_id = self.peer_id.clone();
320        let message_tx = self.message_tx.clone();
321        let pending_requests = self.pending_requests.clone();
322        let store = self.store.clone();
323        let data_channel_holder = self.data_channel.clone();
324        let nostr_relay = self.nostr_relay.clone();
325        let peer_pubkey = Some(self.peer_id.pubkey.clone());
326
327        self.pc
328            .on_data_channel(Box::new(move |dc: Arc<RTCDataChannel>| {
329                let peer_id = peer_id.clone();
330                let message_tx = message_tx.clone();
331                let pending_requests = pending_requests.clone();
332                let store = store.clone();
333                let data_channel_holder = data_channel_holder.clone();
334                let nostr_relay = nostr_relay.clone();
335                let peer_pubkey = peer_pubkey.clone();
336
337                // Work MUST be inside the returned future
338                Box::pin(async move {
339                    info!("Peer {} received data channel: {}", peer_id.short(), dc.label());
340
341                    // Store the received data channel
342                    {
343                        let mut dc_guard = data_channel_holder.lock().await;
344                        *dc_guard = Some(dc.clone());
345                    }
346
347                    // Set up message handlers
348                    Self::setup_dc_handlers(
349                        dc.clone(),
350                        peer_id,
351                        message_tx,
352                        pending_requests,
353                        store,
354                        nostr_relay,
355                        peer_pubkey,
356                    )
357                    .await;
358                })
359            }));
360
361        // Set remote description after handler is registered
362        let offer_desc = RTCSessionDescription::offer(sdp.to_string())?;
363        self.pc.set_remote_description(offer_desc).await?;
364
365        // Create answer and wait for ICE gathering to complete
366        // This ensures all ICE candidates are embedded in the SDP
367        let answer = self.pc.create_answer(None).await?;
368        let mut gathering_complete = self.pc.gathering_complete_promise().await;
369        self.pc.set_local_description(answer).await?;
370
371        // Wait for ICE gathering to complete (with timeout)
372        let _ = tokio::time::timeout(
373            std::time::Duration::from_secs(10),
374            gathering_complete.recv()
375        ).await;
376
377        // Get the local description with ICE candidates embedded
378        let local_desc = self.pc.local_description().await
379            .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
380
381        debug!("Answer created, SDP len: {}, ice_gathering: {:?}",
382            local_desc.sdp.len(), self.pc.ice_gathering_state());
383
384        let answer_json = serde_json::json!({
385            "type": local_desc.sdp_type.to_string().to_lowercase(),
386            "sdp": local_desc.sdp
387        });
388
389        Ok(answer_json)
390    }
391
392    /// Handle incoming answer
393    pub async fn handle_answer(&mut self, answer: serde_json::Value) -> Result<()> {
394        let sdp = answer
395            .get("sdp")
396            .and_then(|s| s.as_str())
397            .ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
398
399        let answer_desc = RTCSessionDescription::answer(sdp.to_string())?;
400        self.pc.set_remote_description(answer_desc).await?;
401
402        Ok(())
403    }
404
405    /// Handle incoming ICE candidate
406    pub async fn handle_candidate(&mut self, candidate: serde_json::Value) -> Result<()> {
407        let candidate_str = candidate
408            .get("candidate")
409            .and_then(|c| c.as_str())
410            .unwrap_or("");
411
412        let sdp_mid = candidate
413            .get("sdpMid")
414            .and_then(|m| m.as_str())
415            .map(|s| s.to_string());
416
417        let sdp_mline_index = candidate
418            .get("sdpMLineIndex")
419            .and_then(|i| i.as_u64())
420            .map(|i| i as u16);
421
422        if !candidate_str.is_empty() {
423            use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
424            let init = RTCIceCandidateInit {
425                candidate: candidate_str.to_string(),
426                sdp_mid,
427                sdp_mline_index,
428                username_fragment: candidate
429                    .get("usernameFragment")
430                    .and_then(|u| u.as_str())
431                    .map(|s| s.to_string()),
432            };
433            self.pc.add_ice_candidate(init).await?;
434        }
435
436        Ok(())
437    }
438
439    /// Setup data channel handlers
440    async fn setup_data_channel(&mut self, dc: Arc<RTCDataChannel>) -> Result<()> {
441        let peer_id = self.peer_id.clone();
442        let message_tx = self.message_tx.clone();
443        let pending_requests = self.pending_requests.clone();
444        let store = self.store.clone();
445        let nostr_relay = self.nostr_relay.clone();
446        let peer_pubkey = Some(self.peer_id.pubkey.clone());
447
448        Self::setup_dc_handlers(
449            dc,
450            peer_id,
451            message_tx,
452            pending_requests,
453            store,
454            nostr_relay,
455            peer_pubkey,
456        )
457        .await;
458        Ok(())
459    }
460
461    /// Setup handlers for a data channel (shared between outbound and inbound)
462    async fn setup_dc_handlers(
463        dc: Arc<RTCDataChannel>,
464        peer_id: PeerId,
465        message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
466        pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
467        store: Option<Arc<dyn ContentStore>>,
468        nostr_relay: Option<Arc<NostrRelay>>,
469        peer_pubkey: Option<String>,
470    ) {
471        let label = dc.label().to_string();
472        let peer_short = peer_id.short();
473
474        // Track pending binary data (request_id -> expected after response)
475        let _pending_binary: Arc<Mutex<Option<u32>>> = Arc::new(Mutex::new(None));
476
477        let open_notify = nostr_relay.as_ref().map(|_| Arc::new(Notify::new()));
478        if let Some(ref notify) = open_notify {
479            if dc.ready_state() == RTCDataChannelState::Open {
480                notify.notify_waiters();
481            }
482        }
483
484        let mut nostr_client_id: Option<u64> = None;
485        if let Some(relay) = nostr_relay.clone() {
486            let client_id = relay.next_client_id();
487            let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
488            relay.register_client(client_id, nostr_tx, peer_pubkey.clone()).await;
489            nostr_client_id = Some(client_id);
490
491            if let Some(notify) = open_notify.clone() {
492                let dc_for_send = dc.clone();
493                tokio::spawn(async move {
494                    notify.notified().await;
495                    while let Some(text) = nostr_rx.recv().await {
496                        if dc_for_send.send_text(text).await.is_err() {
497                            break;
498                        }
499                    }
500                });
501            }
502        }
503
504        if let (Some(relay), Some(client_id)) = (nostr_relay.clone(), nostr_client_id) {
505            dc.on_close(Box::new(move || {
506                let relay = relay.clone();
507                Box::pin(async move {
508                    relay.unregister_client(client_id).await;
509                })
510            }));
511        }
512
513        let open_notify_clone = open_notify.clone();
514        let peer_short_open = peer_short.clone();
515        let label_clone = label.clone();
516        dc.on_open(Box::new(move || {
517            let peer_short_open = peer_short_open.clone();
518            let label_clone = label_clone.clone();
519            let open_notify = open_notify_clone.clone();
520            // Work MUST be inside the returned future
521            Box::pin(async move {
522                info!("[Peer {}] Data channel '{}' open", peer_short_open, label_clone);
523                if let Some(notify) = open_notify {
524                    notify.notify_waiters();
525                }
526            })
527        }));
528
529        let dc_for_msg = dc.clone();
530        let peer_short_msg = peer_short.clone();
531        let _pending_binary_clone = _pending_binary.clone();
532        let store_clone = store.clone();
533        let nostr_relay_for_msg = nostr_relay.clone();
534        let nostr_client_id_for_msg = nostr_client_id;
535
536        dc.on_message(Box::new(move |msg: DataChannelMessage| {
537            let dc = dc_for_msg.clone();
538            let peer_short = peer_short_msg.clone();
539            let pending_requests = pending_requests.clone();
540            let _pending_binary = _pending_binary_clone.clone();
541            let _message_tx = message_tx.clone();
542            let store = store_clone.clone();
543            let nostr_relay = nostr_relay_for_msg.clone();
544            let nostr_client_id = nostr_client_id_for_msg;
545            let msg_data = msg.data.clone();
546
547            // Work MUST be inside the returned future
548            Box::pin(async move {
549                if msg.is_string {
550                    if let Some(relay) = nostr_relay {
551                        if let Ok(text) = std::str::from_utf8(&msg_data) {
552                            if let Ok(nostr_msg) = NostrClientMessage::from_json(text) {
553                                if let Some(client_id) = nostr_client_id {
554                                    relay.handle_client_message(client_id, nostr_msg).await;
555                                }
556                            }
557                        }
558                    }
559                    return;
560                }
561                // All messages are binary with type prefix + MessagePack body
562                debug!("[Peer {}] Received {} bytes on data channel", peer_short, msg_data.len());
563                match parse_message(&msg_data) {
564                    Ok(data_msg) => match data_msg {
565                        DataMessage::Request(req) => {
566                            let hash_hex = hash_to_hex(&req.h);
567                            let hash_short = &hash_hex[..8.min(hash_hex.len())];
568                            info!(
569                                "[Peer {}] Received request for {}",
570                                peer_short, hash_short
571                            );
572
573                            // Handle request - look up in store
574                            let data = if let Some(ref store) = store {
575                                match store.get(&hash_hex) {
576                                    Ok(Some(data)) => {
577                                        info!("[Peer {}] Found {} in store ({} bytes)", peer_short, hash_short, data.len());
578                                        Some(data)
579                                    },
580                                    Ok(None) => {
581                                        info!("[Peer {}] Hash {} not in store", peer_short, hash_short);
582                                        None
583                                    },
584                                    Err(e) => {
585                                        warn!("[Peer {}] Store error: {}", peer_short, e);
586                                        None
587                                    }
588                                }
589                            } else {
590                                warn!("[Peer {}] No store configured - cannot serve requests", peer_short);
591                                None
592                            };
593
594                            // Send response only if we have data
595                            if let Some(data) = data {
596                                let data_len = data.len();
597                                let response = DataResponse {
598                                    h: req.h,
599                                    d: data,
600                                };
601                                if let Ok(wire) = encode_response(&response) {
602                                    if let Err(e) = dc.send(&Bytes::from(wire)).await {
603                                        error!(
604                                            "[Peer {}] Failed to send response: {}",
605                                            peer_short, e
606                                        );
607                                    } else {
608                                        info!(
609                                            "[Peer {}] Sent response for {} ({} bytes)",
610                                            peer_short, hash_short, data_len
611                                        );
612                                    }
613                                }
614                            } else {
615                                info!("[Peer {}] Content not found for {}", peer_short, hash_short);
616                            }
617                        }
618                        DataMessage::Response(res) => {
619                            let hash_hex = hash_to_hex(&res.h);
620                            let hash_short = &hash_hex[..8.min(hash_hex.len())];
621                            debug!(
622                                "[Peer {}] Received response for {} ({} bytes)",
623                                peer_short, hash_short, res.d.len()
624                            );
625
626                            // Resolve the pending request by hash
627                            let mut pending = pending_requests.lock().await;
628                            if let Some(req) = pending.remove(&hash_hex) {
629                                let _ = req.response_tx.send(Some(res.d));
630                            }
631                        }
632                    },
633                    Err(e) => {
634                        warn!("[Peer {}] Failed to parse message: {:?}", peer_short, e);
635                        // Log hex dump of first 50 bytes for debugging
636                        let hex_dump: String = msg_data.iter().take(50).map(|b| format!("{:02x}", b)).collect();
637                        warn!("[Peer {}] Message hex: {}", peer_short, hex_dump);
638                    }
639                }
640            })
641        }));
642    }
643
644    /// Check if data channel is ready
645    pub fn has_data_channel(&self) -> bool {
646        // Use try_lock for non-async context
647        self.data_channel
648            .try_lock()
649            .map(|guard| guard.is_some())
650            .unwrap_or(false)
651    }
652
653    /// Request content by hash from this peer
654    pub async fn request(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
655        let dc_guard = self.data_channel.lock().await;
656        let dc = dc_guard
657            .as_ref()
658            .ok_or_else(|| anyhow::anyhow!("No data channel"))?
659            .clone();
660        drop(dc_guard);  // Release lock before async operations
661
662        // Convert hex to binary hash
663        let hash = hex::decode(hash_hex)
664            .map_err(|e| anyhow::anyhow!("Invalid hex hash: {}", e))?;
665
666        // Create response channel
667        let (tx, rx) = oneshot::channel();
668
669        // Store pending request (keyed by hash hex)
670        {
671            let mut pending = self.pending_requests.lock().await;
672            pending.insert(
673                hash_hex.to_string(),
674                PendingRequest {
675                    hash: hash.clone(),
676                    response_tx: tx,
677                },
678            );
679        }
680
681        // Send request with MAX_HTL (fresh request from us)
682        let req = DataRequest {
683            h: hash,
684            htl: crate::webrtc::types::MAX_HTL,
685        };
686        let wire = encode_request(&req)?;
687        dc.send(&Bytes::from(wire)).await?;
688
689        debug!(
690            "[Peer {}] Sent request for {}",
691            self.peer_id.short(),
692            &hash_hex[..8.min(hash_hex.len())]
693        );
694
695        // Wait for response with timeout
696        match tokio::time::timeout(std::time::Duration::from_secs(10), rx).await {
697            Ok(Ok(data)) => Ok(data),
698            Ok(Err(_)) => {
699                // Channel closed
700                Ok(None)
701            }
702            Err(_) => {
703                // Timeout - clean up pending request
704                let mut pending = self.pending_requests.lock().await;
705                pending.remove(hash_hex);
706                Ok(None)
707            }
708        }
709    }
710
711    /// Send a message over the data channel
712    pub async fn send_message(&self, msg: &DataMessage) -> Result<()> {
713        let dc_guard = self.data_channel.lock().await;
714        if let Some(ref dc) = *dc_guard {
715            let wire = encode_message(msg)?;
716            dc.send(&Bytes::from(wire)).await?;
717        }
718        Ok(())
719    }
720
721    /// Close the connection
722    pub async fn close(&self) -> Result<()> {
723        {
724            let dc_guard = self.data_channel.lock().await;
725            if let Some(ref dc) = *dc_guard {
726                dc.close().await?;
727            }
728        }
729        self.pc.close().await?;
730        Ok(())
731    }
732}