Skip to main content

hashtree_cli/webrtc/
peer.rs

1//! WebRTC peer connection for hashtree data exchange
2
3use anyhow::Result;
4use async_trait::async_trait;
5use bytes::Bytes;
6use hashtree_network::{PeerLink as RoutedPeerLink, TransportError as RoutedTransportError};
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::{mpsc, oneshot, Mutex, Notify};
10use tracing::{debug, error, info, warn};
11use webrtc::api::interceptor_registry::register_default_interceptors;
12use webrtc::api::media_engine::MediaEngine;
13use webrtc::api::setting_engine::SettingEngine;
14use webrtc::api::APIBuilder;
15use webrtc::data_channel::data_channel_init::RTCDataChannelInit;
16use webrtc::data_channel::data_channel_message::DataChannelMessage;
17use webrtc::data_channel::data_channel_state::RTCDataChannelState;
18use webrtc::data_channel::RTCDataChannel;
19use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
20use webrtc::ice_transport::ice_server::RTCIceServer;
21use webrtc::interceptor::registry::Registry;
22use webrtc::peer_connection::configuration::RTCConfiguration;
23use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
24use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
25use webrtc::peer_connection::RTCPeerConnection;
26
27use super::cashu::{CashuQuoteState, ExpectedSettlement};
28use super::types::{
29    encode_chunk, encode_message, encode_payment, encode_payment_ack, encode_quote_response,
30    encode_request, encode_response, hash_to_hex, parse_message, validate_mesh_frame, DataChunk,
31    DataMessage, DataPayment, DataPaymentAck, DataQuoteRequest, DataRequest, DataResponse,
32    MeshNostrFrame, PeerDirection, PeerHTLConfig, PeerId, PeerStateEvent, SignalingMessage,
33    BLOB_REQUEST_POLICY,
34};
35use crate::nostr_relay::NostrRelay;
36use nostr::{
37    ClientMessage as NostrClientMessage, Filter as NostrFilter, JsonUtil as NostrJsonUtil,
38    RelayMessage as NostrRelayMessage, SubscriptionId as NostrSubscriptionId,
39};
40
41/// Trait for content storage that can be used by WebRTC peers
42pub trait ContentStore: Send + Sync + 'static {
43    /// Get content by hex hash
44    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>>;
45}
46
47/// Pending request tracking (keyed by hash hex)
48pub struct PendingRequest {
49    pub hash: Vec<u8>,
50    pub response_tx: oneshot::Sender<Option<Vec<u8>>>,
51    pub quoted: Option<PendingQuotedRequest>,
52}
53
54pub struct PendingQuotedRequest {
55    pub quote_id: u64,
56    pub mint_url: String,
57    pub total_payment_sat: u64,
58    pub confirmed_payment_sat: u64,
59    pub next_chunk_index: u32,
60    pub total_chunks: Option<u32>,
61    pub assembled_data: Vec<u8>,
62    pub in_flight_payment: Option<PendingChunkPayment>,
63    pub buffered_chunk: Option<DataChunk>,
64}
65
66pub struct PendingChunkPayment {
67    pub chunk_index: u32,
68    pub amount_sat: u64,
69    pub mint_url: String,
70    pub operation_id: String,
71    pub final_chunk: bool,
72}
73
74impl PendingRequest {
75    pub fn standard(hash: Vec<u8>, response_tx: oneshot::Sender<Option<Vec<u8>>>) -> Self {
76        Self {
77            hash,
78            response_tx,
79            quoted: None,
80        }
81    }
82
83    pub fn quoted(
84        hash: Vec<u8>,
85        response_tx: oneshot::Sender<Option<Vec<u8>>>,
86        quote_id: u64,
87        mint_url: String,
88        total_payment_sat: u64,
89    ) -> Self {
90        Self {
91            hash,
92            response_tx,
93            quoted: Some(PendingQuotedRequest {
94                quote_id,
95                mint_url,
96                total_payment_sat,
97                confirmed_payment_sat: 0,
98                next_chunk_index: 0,
99                total_chunks: None,
100                assembled_data: Vec::new(),
101                in_flight_payment: None,
102                buffered_chunk: None,
103            }),
104        }
105    }
106}
107
108async fn handle_quote_request_message(
109    peer_short: &str,
110    peer_id: &PeerId,
111    store: &Option<Arc<dyn ContentStore>>,
112    cashu_quotes: Option<&Arc<CashuQuoteState>>,
113    req: &DataQuoteRequest,
114) -> Option<super::types::DataQuoteResponse> {
115    let Some(cashu_quotes) = cashu_quotes else {
116        debug!(
117            "[Peer {}] Ignoring quote request without Cashu policy",
118            peer_short
119        );
120        return None;
121    };
122
123    if cashu_quotes
124        .should_refuse_requests_from_peer(&peer_id.to_string())
125        .await
126    {
127        return Some(
128            cashu_quotes
129                .build_quote_response(&peer_id.to_string(), req, false)
130                .await,
131        );
132    }
133
134    let hash_hex = hash_to_hex(&req.h);
135    let can_serve = if let Some(store) = store {
136        match store.get(&hash_hex) {
137            Ok(Some(_)) => true,
138            Ok(None) => false,
139            Err(e) => {
140                warn!("[Peer {}] Store error during quote: {}", peer_short, e);
141                false
142            }
143        }
144    } else {
145        false
146    };
147
148    Some(
149        cashu_quotes
150            .build_quote_response(&peer_id.to_string(), req, can_serve)
151            .await,
152    )
153}
154
155struct PaymentHandlingOutcome {
156    ack: DataPaymentAck,
157    next_chunk: Option<(DataChunk, ExpectedSettlement)>,
158}
159
160async fn send_quoted_chunk(
161    dc: &Arc<RTCDataChannel>,
162    peer_id: &PeerId,
163    peer_short: &str,
164    cashu_quotes: &Arc<CashuQuoteState>,
165    chunk: DataChunk,
166    expected: ExpectedSettlement,
167) -> bool {
168    let hash_hex = hash_to_hex(&chunk.h);
169    let wire = match encode_chunk(&chunk) {
170        Ok(wire) => wire,
171        Err(err) => {
172            warn!(
173                "[Peer {}] Failed to encode quoted chunk {} for quote {}: {}",
174                peer_short, chunk.c, chunk.q, err
175            );
176            return false;
177        }
178    };
179
180    if let Err(err) = dc.send(&Bytes::from(wire)).await {
181        warn!(
182            "[Peer {}] Failed to send quoted chunk {} for quote {}: {}",
183            peer_short, chunk.c, chunk.q, err
184        );
185        return false;
186    }
187
188    cashu_quotes
189        .register_expected_payment(peer_id.to_string(), hash_hex, chunk.q, expected)
190        .await;
191    true
192}
193
194async fn fail_pending_request(
195    pending_requests: &Arc<Mutex<HashMap<String, PendingRequest>>>,
196    cashu_quotes: Option<&Arc<CashuQuoteState>>,
197    hash_hex: &str,
198) {
199    let pending = pending_requests.lock().await.remove(hash_hex);
200    let Some(pending) = pending else {
201        return;
202    };
203
204    if let (Some(cashu_quotes), Some(quoted)) = (cashu_quotes, pending.quoted) {
205        if let Some(in_flight) = quoted.in_flight_payment {
206            let _ = cashu_quotes
207                .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
208                .await;
209        }
210    }
211    let _ = pending.response_tx.send(None);
212}
213
214async fn process_chunk_message(
215    peer_short: &str,
216    _peer_id: &PeerId,
217    dc: &Arc<RTCDataChannel>,
218    pending_requests: &Arc<Mutex<HashMap<String, PendingRequest>>>,
219    cashu_quotes: Option<&Arc<CashuQuoteState>>,
220    chunk: DataChunk,
221) {
222    let hash_hex = hash_to_hex(&chunk.h);
223    let Some(cashu_quotes) = cashu_quotes else {
224        fail_pending_request(pending_requests, None, &hash_hex).await;
225        return;
226    };
227
228    enum ChunkAction {
229        BufferOnly,
230        Fail,
231        Pay {
232            mint_url: String,
233            amount_sat: u64,
234            final_chunk: bool,
235        },
236    }
237
238    let action = {
239        let mut pending = pending_requests.lock().await;
240        let Some(request) = pending.get_mut(&hash_hex) else {
241            return;
242        };
243        match request.quoted.as_mut() {
244            None => ChunkAction::Fail,
245            Some(quoted) if quoted.quote_id != chunk.q || chunk.n == 0 => ChunkAction::Fail,
246            Some(quoted) => {
247                if let Some(in_flight) = quoted.in_flight_payment.as_ref() {
248                    let expected_buffer_index = in_flight.chunk_index + 1;
249                    if chunk.c == expected_buffer_index && quoted.buffered_chunk.is_none() {
250                        quoted.buffered_chunk = Some(chunk.clone());
251                        ChunkAction::BufferOnly
252                    } else {
253                        ChunkAction::Fail
254                    }
255                } else if chunk.c != quoted.next_chunk_index {
256                    ChunkAction::Fail
257                } else if let Some(total_chunks) = quoted.total_chunks {
258                    if total_chunks != chunk.n {
259                        ChunkAction::Fail
260                    } else {
261                        let next_total = quoted.confirmed_payment_sat.saturating_add(chunk.p);
262                        if next_total > quoted.total_payment_sat
263                            || (chunk.c + 1 == chunk.n && next_total != quoted.total_payment_sat)
264                        {
265                            ChunkAction::Fail
266                        } else {
267                            quoted.total_chunks = Some(chunk.n);
268                            quoted.assembled_data.extend_from_slice(&chunk.d);
269                            quoted.next_chunk_index += 1;
270                            ChunkAction::Pay {
271                                mint_url: quoted.mint_url.clone(),
272                                amount_sat: chunk.p,
273                                final_chunk: chunk.c + 1 == chunk.n,
274                            }
275                        }
276                    }
277                } else {
278                    let next_total = quoted.confirmed_payment_sat.saturating_add(chunk.p);
279                    if next_total > quoted.total_payment_sat
280                        || (chunk.c + 1 == chunk.n && next_total != quoted.total_payment_sat)
281                    {
282                        ChunkAction::Fail
283                    } else {
284                        quoted.total_chunks = Some(chunk.n);
285                        quoted.assembled_data.extend_from_slice(&chunk.d);
286                        quoted.next_chunk_index += 1;
287                        ChunkAction::Pay {
288                            mint_url: quoted.mint_url.clone(),
289                            amount_sat: chunk.p,
290                            final_chunk: chunk.c + 1 == chunk.n,
291                        }
292                    }
293                }
294            }
295        }
296    };
297
298    match action {
299        ChunkAction::BufferOnly => (),
300        ChunkAction::Fail => {
301            warn!(
302                "[Peer {}] Invalid quoted chunk {} for hash {}",
303                peer_short, chunk.c, hash_hex
304            );
305            fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
306        }
307        ChunkAction::Pay {
308            mint_url,
309            amount_sat,
310            final_chunk,
311        } => {
312            let payment = match cashu_quotes
313                .create_payment_token(&mint_url, amount_sat)
314                .await
315            {
316                Ok(payment) => payment,
317                Err(err) => {
318                    warn!(
319                        "[Peer {}] Failed to create payment token for chunk {} of {}: {}",
320                        peer_short, chunk.c, hash_hex, err
321                    );
322                    fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
323                    return;
324                }
325            };
326
327            {
328                let mut pending = pending_requests.lock().await;
329                let Some(request) = pending.get_mut(&hash_hex) else {
330                    let _ = cashu_quotes
331                        .revoke_payment_token(&payment.mint_url, &payment.operation_id)
332                        .await;
333                    return;
334                };
335                let Some(quoted) = request.quoted.as_mut() else {
336                    let _ = cashu_quotes
337                        .revoke_payment_token(&payment.mint_url, &payment.operation_id)
338                        .await;
339                    return;
340                };
341                quoted.in_flight_payment = Some(PendingChunkPayment {
342                    chunk_index: chunk.c,
343                    amount_sat,
344                    mint_url: payment.mint_url.clone(),
345                    operation_id: payment.operation_id.clone(),
346                    final_chunk,
347                });
348            }
349
350            let payment_msg = DataPayment {
351                h: chunk.h,
352                q: chunk.q,
353                c: chunk.c,
354                p: amount_sat,
355                m: Some(payment.mint_url.clone()),
356                tok: payment.token,
357            };
358            let wire = match encode_payment(&payment_msg) {
359                Ok(wire) => wire,
360                Err(err) => {
361                    warn!(
362                        "[Peer {}] Failed to encode payment for chunk {} of {}: {}",
363                        peer_short, chunk.c, hash_hex, err
364                    );
365                    fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
366                    return;
367                }
368            };
369            if let Err(err) = dc.send(&Bytes::from(wire)).await {
370                warn!(
371                    "[Peer {}] Failed to send payment for chunk {} of {}: {}",
372                    peer_short, chunk.c, hash_hex, err
373                );
374                fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
375            }
376        }
377    }
378}
379
380async fn handle_payment_ack_message(
381    peer_short: &str,
382    peer_id: &PeerId,
383    dc: &Arc<RTCDataChannel>,
384    pending_requests: &Arc<Mutex<HashMap<String, PendingRequest>>>,
385    cashu_quotes: Option<&Arc<CashuQuoteState>>,
386    ack: DataPaymentAck,
387) {
388    let Some(cashu_quotes) = cashu_quotes else {
389        return;
390    };
391    let hash_hex = hash_to_hex(&ack.h);
392    let mut buffered_next = None;
393    let mut completed = None;
394    let mut failed = None;
395    let mut confirmed_amount = None;
396    let mut completed_data = None;
397
398    {
399        let mut pending = pending_requests.lock().await;
400        let Some(request) = pending.get_mut(&hash_hex) else {
401            return;
402        };
403        let Some(quoted) = request.quoted.as_mut() else {
404            return;
405        };
406        let Some(in_flight) = quoted.in_flight_payment.take() else {
407            return;
408        };
409        if ack.q != quoted.quote_id || ack.c != in_flight.chunk_index {
410            quoted.in_flight_payment = Some(in_flight);
411            return;
412        }
413
414        if !ack.a {
415            failed = Some(in_flight);
416        } else {
417            quoted.confirmed_payment_sat = quoted
418                .confirmed_payment_sat
419                .saturating_add(in_flight.amount_sat);
420            confirmed_amount = Some(in_flight.amount_sat);
421            if in_flight.final_chunk {
422                completed_data = Some(quoted.assembled_data.clone());
423            } else if let Some(next_chunk) = quoted.buffered_chunk.take() {
424                buffered_next = Some(next_chunk);
425            }
426        }
427
428        if let Some(data) = completed_data.take() {
429            let finished = pending
430                .remove(&hash_hex)
431                .expect("pending request must exist");
432            completed = Some((finished.response_tx, data));
433        }
434    }
435
436    if let Some(amount_sat) = confirmed_amount {
437        cashu_quotes
438            .record_paid_peer(&peer_id.to_string(), amount_sat)
439            .await;
440    }
441
442    if let Some(in_flight) = failed {
443        warn!(
444            "[Peer {}] Payment ack rejected chunk {} for {}: {}",
445            peer_short,
446            ack.c,
447            hash_hex,
448            ack.e.as_deref().unwrap_or("payment rejected")
449        );
450        let _ = cashu_quotes
451            .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
452            .await;
453        let removed = pending_requests.lock().await.remove(&hash_hex);
454        if let Some(removed) = removed {
455            let _ = removed.response_tx.send(None);
456        }
457        return;
458    }
459
460    if let Some((tx, data)) = completed {
461        let _ = tx.send(Some(data));
462        return;
463    }
464
465    if let Some(next_chunk) = buffered_next {
466        process_chunk_message(
467            peer_short,
468            peer_id,
469            dc,
470            pending_requests,
471            Some(cashu_quotes),
472            next_chunk,
473        )
474        .await;
475    }
476}
477
478async fn handle_payment_message(
479    peer_id: &PeerId,
480    cashu_quotes: Option<&Arc<CashuQuoteState>>,
481    req: &DataPayment,
482) -> PaymentHandlingOutcome {
483    let nack = |err: String| PaymentHandlingOutcome {
484        ack: DataPaymentAck {
485            h: req.h.clone(),
486            q: req.q,
487            c: req.c,
488            a: false,
489            e: Some(err),
490        },
491        next_chunk: None,
492    };
493
494    let Some(cashu_quotes) = cashu_quotes else {
495        return nack("Cashu settlement unavailable".to_string());
496    };
497
498    let expected = match cashu_quotes
499        .claim_expected_payment(
500            &peer_id.to_string(),
501            &req.h,
502            req.q,
503            req.c,
504            req.p,
505            req.m.as_deref(),
506        )
507        .await
508    {
509        Ok(expected) => expected,
510        Err(err) => {
511            cashu_quotes
512                .record_payment_default_from_peer(&peer_id.to_string())
513                .await;
514            return nack(err.to_string());
515        }
516    };
517
518    match cashu_quotes.receive_payment_token(&req.tok).await {
519        Ok(received) if received.amount_sat >= expected.payment_sat => {
520            if expected.mint_url.as_deref() != Some(received.mint_url.as_str()) {
521                cashu_quotes
522                    .record_payment_default_from_peer(&peer_id.to_string())
523                    .await;
524                return nack("Received payment mint did not match quoted mint".to_string());
525            }
526            if let Err(err) = cashu_quotes
527                .record_receipt_from_peer(
528                    &peer_id.to_string(),
529                    &received.mint_url,
530                    received.amount_sat,
531                )
532                .await
533            {
534                warn!(
535                    "[Peer {}] Failed to persist Cashu mint success for {}: {}",
536                    peer_id.short(),
537                    received.mint_url,
538                    err
539                );
540            }
541
542            let next_chunk = if expected.final_chunk {
543                None
544            } else {
545                cashu_quotes
546                    .next_outgoing_chunk(&peer_id.to_string(), &req.h, req.q)
547                    .await
548            };
549
550            PaymentHandlingOutcome {
551                ack: DataPaymentAck {
552                    h: req.h.clone(),
553                    q: req.q,
554                    c: req.c,
555                    a: true,
556                    e: None,
557                },
558                next_chunk,
559            }
560        }
561        Ok(_) => {
562            cashu_quotes
563                .record_payment_default_from_peer(&peer_id.to_string())
564                .await;
565            nack("Received payment amount was below the quoted amount".to_string())
566        }
567        Err(err) => {
568            if let Some(mint_url) = expected.mint_url.as_deref().or(req.m.as_deref()) {
569                let _ = cashu_quotes.record_mint_receive_failure(mint_url).await;
570            }
571            nack(err.to_string())
572        }
573    }
574}
575
576/// WebRTC peer connection with data channel protocol
577pub struct Peer {
578    pub peer_id: PeerId,
579    pub direction: PeerDirection,
580    pub created_at: std::time::Instant,
581    pub connected_at: Option<std::time::Instant>,
582
583    pc: Arc<RTCPeerConnection>,
584    /// Data channel - can be set from callback when receiving channel from peer
585    pub data_channel: Arc<Mutex<Option<Arc<RTCDataChannel>>>>,
586    signaling_tx: mpsc::Sender<SignalingMessage>,
587    my_peer_id: PeerId,
588
589    // Content store for serving requests
590    store: Option<Arc<dyn ContentStore>>,
591
592    // Track pending outgoing requests (keyed by hash hex)
593    pub pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
594    // Track pending Nostr relay queries over data channel (keyed by subscription id)
595    pending_nostr_queries: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>>,
596
597    // Channel for incoming data messages
598    #[allow(dead_code)]
599    message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
600    #[allow(dead_code)]
601    message_rx: Option<mpsc::Receiver<(DataMessage, Option<Vec<u8>>)>>,
602
603    // Optional channel to notify signaling layer of state changes
604    state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
605
606    // Optional Nostr relay for text messages over data channel
607    nostr_relay: Option<Arc<NostrRelay>>,
608    // Optional channel for inbound relayless signaling mesh frames
609    mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
610    // Optional Cashu quote negotiation state shared with signaling.
611    cashu_quotes: Option<Arc<CashuQuoteState>>,
612    // Per-peer HTL randomness profile (reused across traffic classes)
613    htl_config: PeerHTLConfig,
614}
615
616impl Peer {
617    /// Create a new peer connection
618    pub async fn new(
619        peer_id: PeerId,
620        direction: PeerDirection,
621        my_peer_id: PeerId,
622        signaling_tx: mpsc::Sender<SignalingMessage>,
623        stun_servers: Vec<String>,
624    ) -> Result<Self> {
625        Self::new_with_store_and_events(
626            peer_id,
627            direction,
628            my_peer_id,
629            signaling_tx,
630            stun_servers,
631            None,
632            None,
633            None,
634            None,
635            None,
636        )
637        .await
638    }
639
640    /// Create a new peer connection with content store
641    pub async fn new_with_store(
642        peer_id: PeerId,
643        direction: PeerDirection,
644        my_peer_id: PeerId,
645        signaling_tx: mpsc::Sender<SignalingMessage>,
646        stun_servers: Vec<String>,
647        store: Option<Arc<dyn ContentStore>>,
648    ) -> Result<Self> {
649        Self::new_with_store_and_events(
650            peer_id,
651            direction,
652            my_peer_id,
653            signaling_tx,
654            stun_servers,
655            store,
656            None,
657            None,
658            None,
659            None,
660        )
661        .await
662    }
663
664    /// Create a new peer connection with content store and state event channel
665    #[allow(clippy::too_many_arguments)]
666    pub(crate) async fn new_with_store_and_events(
667        peer_id: PeerId,
668        direction: PeerDirection,
669        my_peer_id: PeerId,
670        signaling_tx: mpsc::Sender<SignalingMessage>,
671        stun_servers: Vec<String>,
672        store: Option<Arc<dyn ContentStore>>,
673        state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
674        nostr_relay: Option<Arc<NostrRelay>>,
675        mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
676        cashu_quotes: Option<Arc<CashuQuoteState>>,
677    ) -> Result<Self> {
678        // Create WebRTC API
679        let mut m = MediaEngine::default();
680        m.register_default_codecs()?;
681
682        let mut registry = Registry::new();
683        registry = register_default_interceptors(registry, &mut m)?;
684
685        // Enable mDNS temporarily for debugging
686        // Previously disabled due to https://github.com/webrtc-rs/webrtc/issues/616
687        let setting_engine = SettingEngine::default();
688        // Note: mDNS enabled by default
689
690        let api = APIBuilder::new()
691            .with_media_engine(m)
692            .with_interceptor_registry(registry)
693            .with_setting_engine(setting_engine)
694            .build();
695
696        // Configure ICE servers
697        let ice_servers: Vec<RTCIceServer> = stun_servers
698            .iter()
699            .map(|url| RTCIceServer {
700                urls: vec![url.clone()],
701                ..Default::default()
702            })
703            .collect();
704
705        let config = RTCConfiguration {
706            ice_servers,
707            ..Default::default()
708        };
709
710        let pc = Arc::new(api.new_peer_connection(config).await?);
711        let (message_tx, message_rx) = mpsc::channel(100);
712        Ok(Self {
713            peer_id,
714            direction,
715            created_at: std::time::Instant::now(),
716            connected_at: None,
717            pc,
718            data_channel: Arc::new(Mutex::new(None)),
719            signaling_tx,
720            my_peer_id,
721            store,
722            pending_requests: Arc::new(Mutex::new(HashMap::new())),
723            pending_nostr_queries: Arc::new(Mutex::new(HashMap::new())),
724            message_tx,
725            message_rx: Some(message_rx),
726            state_event_tx,
727            nostr_relay,
728            mesh_frame_tx,
729            cashu_quotes,
730            htl_config: PeerHTLConfig::random(),
731        })
732    }
733
734    /// Set content store
735    pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
736        self.store = Some(store);
737    }
738
739    /// Get connection state
740    pub fn state(&self) -> RTCPeerConnectionState {
741        self.pc.connection_state()
742    }
743
744    /// Get signaling state
745    pub fn signaling_state(&self) -> webrtc::peer_connection::signaling_state::RTCSignalingState {
746        self.pc.signaling_state()
747    }
748
749    /// Check if connected
750    pub fn is_connected(&self) -> bool {
751        self.pc.connection_state() == RTCPeerConnectionState::Connected
752    }
753
754    pub fn htl_config(&self) -> &PeerHTLConfig {
755        &self.htl_config
756    }
757
758    /// Setup event handlers for the peer connection
759    pub async fn setup_handlers(&self) -> Result<()> {
760        let peer_id = self.peer_id.clone();
761        let signaling_tx = self.signaling_tx.clone();
762        let my_peer_id_str = self.my_peer_id.to_string();
763        let target_peer_id = self.peer_id.to_string();
764
765        // Handle ICE candidates - work MUST be inside the returned future
766        self.pc
767            .on_ice_candidate(Box::new(move |candidate: Option<RTCIceCandidate>| {
768                let signaling_tx = signaling_tx.clone();
769                let my_peer_id_str = my_peer_id_str.clone();
770                let target_peer_id = target_peer_id.clone();
771
772                Box::pin(async move {
773                    if let Some(c) = candidate {
774                        if let Ok(init) = c.to_json() {
775                            info!(
776                                "ICE candidate generated: {}",
777                                &init.candidate[..init.candidate.len().min(60)]
778                            );
779                            let msg = SignalingMessage::Candidate {
780                                peer_id: my_peer_id_str.clone(),
781                                target_peer_id: target_peer_id.clone(),
782                                candidate: init.candidate,
783                                sdp_m_line_index: init.sdp_mline_index,
784                                sdp_mid: init.sdp_mid,
785                            };
786                            if let Err(e) = signaling_tx.send(msg).await {
787                                error!("Failed to send ICE candidate: {}", e);
788                            }
789                        }
790                    }
791                })
792            }));
793
794        // Handle connection state changes - work MUST be inside the returned future
795        let peer_id_log = peer_id.clone();
796        let state_event_tx = self.state_event_tx.clone();
797        self.pc
798            .on_peer_connection_state_change(Box::new(move |state: RTCPeerConnectionState| {
799                let peer_id = peer_id_log.clone();
800                let state_event_tx = state_event_tx.clone();
801                Box::pin(async move {
802                    info!("Peer {} connection state: {:?}", peer_id.short(), state);
803
804                    // Notify signaling layer of state changes
805                    if let Some(tx) = state_event_tx {
806                        let event = match state {
807                            RTCPeerConnectionState::Connected => {
808                                Some(PeerStateEvent::Connected(peer_id))
809                            }
810                            RTCPeerConnectionState::Failed => Some(PeerStateEvent::Failed(peer_id)),
811                            RTCPeerConnectionState::Disconnected
812                            | RTCPeerConnectionState::Closed => {
813                                Some(PeerStateEvent::Disconnected(peer_id))
814                            }
815                            _ => None,
816                        };
817                        if let Some(event) = event {
818                            if let Err(e) = tx.send(event).await {
819                                error!("Failed to send peer state event: {}", e);
820                            }
821                        }
822                    }
823                })
824            }));
825
826        Ok(())
827    }
828
829    /// Initiate connection (create offer) - for outbound connections
830    pub async fn connect(&self) -> Result<serde_json::Value> {
831        println!("[Peer {}] Creating data channel...", self.peer_id.short());
832        // Create data channel first
833        // Use unordered for better performance - protocol is stateless (each message self-describes)
834        let dc_init = RTCDataChannelInit {
835            ordered: Some(false),
836            ..Default::default()
837        };
838        let dc = self
839            .pc
840            .create_data_channel("hashtree", Some(dc_init))
841            .await?;
842        println!(
843            "[Peer {}] Data channel created, setting up handlers...",
844            self.peer_id.short()
845        );
846        self.setup_data_channel(dc.clone()).await?;
847        println!(
848            "[Peer {}] Handlers set up, storing data channel...",
849            self.peer_id.short()
850        );
851        {
852            let mut dc_guard = self.data_channel.lock().await;
853            *dc_guard = Some(dc);
854        }
855        println!("[Peer {}] Data channel stored", self.peer_id.short());
856
857        // Create offer and wait for ICE gathering to complete
858        // This ensures all ICE candidates are embedded in the SDP
859        let offer = self.pc.create_offer(None).await?;
860        let mut gathering_complete = self.pc.gathering_complete_promise().await;
861        self.pc.set_local_description(offer).await?;
862
863        // Wait for ICE gathering to complete (with timeout)
864        let _ = tokio::time::timeout(
865            std::time::Duration::from_secs(10),
866            gathering_complete.recv(),
867        )
868        .await;
869
870        // Get the local description with ICE candidates embedded
871        let local_desc = self
872            .pc
873            .local_description()
874            .await
875            .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
876
877        debug!(
878            "Offer created, SDP len: {}, ice_gathering: {:?}",
879            local_desc.sdp.len(),
880            self.pc.ice_gathering_state()
881        );
882
883        // Return offer as JSON
884        let offer_json = serde_json::json!({
885            "type": local_desc.sdp_type.to_string().to_lowercase(),
886            "sdp": local_desc.sdp
887        });
888
889        Ok(offer_json)
890    }
891
892    /// Handle incoming offer and create answer
893    pub async fn handle_offer(&self, offer: serde_json::Value) -> Result<serde_json::Value> {
894        let sdp = offer
895            .get("sdp")
896            .and_then(|s| s.as_str())
897            .ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
898
899        // Setup data channel handler BEFORE set_remote_description
900        // This ensures the handler is registered before any data channel events fire
901        let peer_id = self.peer_id.clone();
902        let message_tx = self.message_tx.clone();
903        let pending_requests = self.pending_requests.clone();
904        let pending_nostr_queries = self.pending_nostr_queries.clone();
905        let store = self.store.clone();
906        let data_channel_holder = self.data_channel.clone();
907        let nostr_relay = self.nostr_relay.clone();
908        let mesh_frame_tx = self.mesh_frame_tx.clone();
909        let cashu_quotes = self.cashu_quotes.clone();
910        let peer_pubkey = Some(self.peer_id.pubkey.clone());
911
912        self.pc
913            .on_data_channel(Box::new(move |dc: Arc<RTCDataChannel>| {
914                let peer_id = peer_id.clone();
915                let message_tx = message_tx.clone();
916                let pending_requests = pending_requests.clone();
917                let pending_nostr_queries = pending_nostr_queries.clone();
918                let store = store.clone();
919                let data_channel_holder = data_channel_holder.clone();
920                let nostr_relay = nostr_relay.clone();
921                let mesh_frame_tx = mesh_frame_tx.clone();
922                let cashu_quotes = cashu_quotes.clone();
923                let peer_pubkey = peer_pubkey.clone();
924
925                // Work MUST be inside the returned future
926                Box::pin(async move {
927                    info!(
928                        "Peer {} received data channel: {}",
929                        peer_id.short(),
930                        dc.label()
931                    );
932
933                    // Store the received data channel
934                    {
935                        let mut dc_guard = data_channel_holder.lock().await;
936                        *dc_guard = Some(dc.clone());
937                    }
938
939                    // Set up message handlers
940                    Self::setup_dc_handlers(
941                        dc.clone(),
942                        peer_id,
943                        message_tx,
944                        pending_requests,
945                        pending_nostr_queries.clone(),
946                        store,
947                        nostr_relay,
948                        mesh_frame_tx,
949                        cashu_quotes,
950                        peer_pubkey,
951                    )
952                    .await;
953                })
954            }));
955
956        // Set remote description after handler is registered
957        let offer_desc = RTCSessionDescription::offer(sdp.to_string())?;
958        self.pc.set_remote_description(offer_desc).await?;
959
960        // Create answer and wait for ICE gathering to complete
961        // This ensures all ICE candidates are embedded in the SDP
962        let answer = self.pc.create_answer(None).await?;
963        let mut gathering_complete = self.pc.gathering_complete_promise().await;
964        self.pc.set_local_description(answer).await?;
965
966        // Wait for ICE gathering to complete (with timeout)
967        let _ = tokio::time::timeout(
968            std::time::Duration::from_secs(10),
969            gathering_complete.recv(),
970        )
971        .await;
972
973        // Get the local description with ICE candidates embedded
974        let local_desc = self
975            .pc
976            .local_description()
977            .await
978            .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
979
980        debug!(
981            "Answer created, SDP len: {}, ice_gathering: {:?}",
982            local_desc.sdp.len(),
983            self.pc.ice_gathering_state()
984        );
985
986        let answer_json = serde_json::json!({
987            "type": local_desc.sdp_type.to_string().to_lowercase(),
988            "sdp": local_desc.sdp
989        });
990
991        Ok(answer_json)
992    }
993
994    /// Handle incoming answer
995    pub async fn handle_answer(&self, answer: serde_json::Value) -> Result<()> {
996        let sdp = answer
997            .get("sdp")
998            .and_then(|s| s.as_str())
999            .ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
1000
1001        let answer_desc = RTCSessionDescription::answer(sdp.to_string())?;
1002        self.pc.set_remote_description(answer_desc).await?;
1003
1004        Ok(())
1005    }
1006
1007    /// Handle incoming ICE candidate
1008    pub async fn handle_candidate(&self, candidate: serde_json::Value) -> Result<()> {
1009        let candidate_str = candidate
1010            .get("candidate")
1011            .and_then(|c| c.as_str())
1012            .unwrap_or("");
1013
1014        let sdp_mid = candidate
1015            .get("sdpMid")
1016            .and_then(|m| m.as_str())
1017            .map(|s| s.to_string());
1018
1019        let sdp_mline_index = candidate
1020            .get("sdpMLineIndex")
1021            .and_then(|i| i.as_u64())
1022            .map(|i| i as u16);
1023
1024        if !candidate_str.is_empty() {
1025            use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
1026            let init = RTCIceCandidateInit {
1027                candidate: candidate_str.to_string(),
1028                sdp_mid,
1029                sdp_mline_index,
1030                username_fragment: candidate
1031                    .get("usernameFragment")
1032                    .and_then(|u| u.as_str())
1033                    .map(|s| s.to_string()),
1034            };
1035            self.pc.add_ice_candidate(init).await?;
1036        }
1037
1038        Ok(())
1039    }
1040
1041    /// Setup data channel handlers
1042    async fn setup_data_channel(&self, dc: Arc<RTCDataChannel>) -> Result<()> {
1043        let peer_id = self.peer_id.clone();
1044        let message_tx = self.message_tx.clone();
1045        let pending_requests = self.pending_requests.clone();
1046        let store = self.store.clone();
1047        let nostr_relay = self.nostr_relay.clone();
1048        let mesh_frame_tx = self.mesh_frame_tx.clone();
1049        let cashu_quotes = self.cashu_quotes.clone();
1050        let peer_pubkey = Some(self.peer_id.pubkey.clone());
1051
1052        Self::setup_dc_handlers(
1053            dc,
1054            peer_id,
1055            message_tx,
1056            pending_requests,
1057            self.pending_nostr_queries.clone(),
1058            store,
1059            nostr_relay,
1060            mesh_frame_tx,
1061            cashu_quotes,
1062            peer_pubkey,
1063        )
1064        .await;
1065        Ok(())
1066    }
1067
1068    /// Setup handlers for a data channel (shared between outbound and inbound)
1069    #[allow(clippy::too_many_arguments)]
1070    async fn setup_dc_handlers(
1071        dc: Arc<RTCDataChannel>,
1072        peer_id: PeerId,
1073        message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
1074        pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
1075        pending_nostr_queries: Arc<
1076            Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>,
1077        >,
1078        store: Option<Arc<dyn ContentStore>>,
1079        nostr_relay: Option<Arc<NostrRelay>>,
1080        mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
1081        cashu_quotes: Option<Arc<CashuQuoteState>>,
1082        peer_pubkey: Option<String>,
1083    ) {
1084        let label = dc.label().to_string();
1085        let peer_short = peer_id.short();
1086
1087        // Track pending binary data (request_id -> expected after response)
1088        let _pending_binary: Arc<Mutex<Option<u32>>> = Arc::new(Mutex::new(None));
1089
1090        let open_notify = nostr_relay.as_ref().map(|_| Arc::new(Notify::new()));
1091        if let Some(ref notify) = open_notify {
1092            if dc.ready_state() == RTCDataChannelState::Open {
1093                // `notify_one` stores a permit if no waiter is active yet.
1094                notify.notify_one();
1095            }
1096        }
1097
1098        let mut nostr_client_id: Option<u64> = None;
1099        if let Some(relay) = nostr_relay.clone() {
1100            let client_id = relay.next_client_id();
1101            let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
1102            relay
1103                .register_client(client_id, nostr_tx, peer_pubkey.clone())
1104                .await;
1105            nostr_client_id = Some(client_id);
1106
1107            if let Some(notify) = open_notify.clone() {
1108                let dc_for_send = dc.clone();
1109                tokio::spawn(async move {
1110                    notify.notified().await;
1111                    while let Some(text) = nostr_rx.recv().await {
1112                        if dc_for_send.send_text(text).await.is_err() {
1113                            break;
1114                        }
1115                    }
1116                });
1117            }
1118        }
1119
1120        if let (Some(relay), Some(client_id)) = (nostr_relay.clone(), nostr_client_id) {
1121            dc.on_close(Box::new(move || {
1122                let relay = relay.clone();
1123                Box::pin(async move {
1124                    relay.unregister_client(client_id).await;
1125                })
1126            }));
1127        }
1128
1129        let open_notify_clone = open_notify.clone();
1130        let peer_short_open = peer_short.clone();
1131        let label_clone = label.clone();
1132        dc.on_open(Box::new(move || {
1133            let peer_short_open = peer_short_open.clone();
1134            let label_clone = label_clone.clone();
1135            let open_notify = open_notify_clone.clone();
1136            // Work MUST be inside the returned future
1137            Box::pin(async move {
1138                info!(
1139                    "[Peer {}] Data channel '{}' open",
1140                    peer_short_open, label_clone
1141                );
1142                if let Some(notify) = open_notify {
1143                    notify.notify_one();
1144                }
1145            })
1146        }));
1147
1148        let dc_for_msg = dc.clone();
1149        let peer_short_msg = peer_short.clone();
1150        let _pending_binary_clone = _pending_binary.clone();
1151        let store_clone = store.clone();
1152        let nostr_relay_for_msg = nostr_relay.clone();
1153        let nostr_client_id_for_msg = nostr_client_id;
1154        let pending_nostr_queries_for_msg = pending_nostr_queries.clone();
1155        let mesh_frame_tx_for_msg = mesh_frame_tx.clone();
1156        let peer_id_for_msg = peer_id.clone();
1157
1158        dc.on_message(Box::new(move |msg: DataChannelMessage| {
1159            let dc = dc_for_msg.clone();
1160            let peer_short = peer_short_msg.clone();
1161            let pending_requests = pending_requests.clone();
1162            let _pending_binary = _pending_binary_clone.clone();
1163            let _message_tx = message_tx.clone();
1164            let store = store_clone.clone();
1165            let nostr_relay = nostr_relay_for_msg.clone();
1166            let nostr_client_id = nostr_client_id_for_msg;
1167            let pending_nostr_queries = pending_nostr_queries_for_msg.clone();
1168            let mesh_frame_tx = mesh_frame_tx_for_msg.clone();
1169            let cashu_quotes = cashu_quotes.clone();
1170            let peer_id = peer_id_for_msg.clone();
1171            let msg_data = msg.data.clone();
1172
1173            // Work MUST be inside the returned future
1174            Box::pin(async move {
1175                if msg.is_string {
1176                    if let Ok(text) = std::str::from_utf8(&msg_data) {
1177                        if let Ok(mesh_frame) = serde_json::from_str::<MeshNostrFrame>(text) {
1178                            match validate_mesh_frame(&mesh_frame) {
1179                                Ok(()) => {
1180                                    if let Some(tx) = mesh_frame_tx {
1181                                        let _ = tx.send((peer_id.clone(), mesh_frame)).await;
1182                                    }
1183                                    return;
1184                                }
1185                                Err(reason) => {
1186                                    debug!(
1187                                        "[Peer {}] Ignoring invalid mesh frame: {}",
1188                                        peer_short, reason
1189                                    );
1190                                }
1191                            }
1192                        }
1193
1194                        // First, route relay responses to pending local queries.
1195                        if let Ok(relay_msg) = NostrRelayMessage::from_json(text) {
1196                            if let Some(sub_id) = relay_subscription_id(&relay_msg) {
1197                                let sender = {
1198                                    let pending = pending_nostr_queries.lock().await;
1199                                    pending.get(&sub_id).cloned()
1200                                };
1201                                if let Some(tx) = sender {
1202                                    debug!(
1203                                        "[Peer {}] Routed Nostr relay message for subscription {}",
1204                                        peer_short, sub_id
1205                                    );
1206                                    let _ = tx.send(relay_msg);
1207                                    return;
1208                                } else {
1209                                    debug!(
1210                                        "[Peer {}] Dropping Nostr relay message for unknown subscription {}",
1211                                        peer_short, sub_id
1212                                    );
1213                                }
1214                            }
1215                        }
1216
1217                        // Otherwise treat it as a client message to be handled by local relay.
1218                        if let Some(relay) = nostr_relay {
1219                            if let Ok(nostr_msg) = NostrClientMessage::from_json(text) {
1220                                if let Some(client_id) = nostr_client_id {
1221                                    relay.handle_client_message(client_id, nostr_msg).await;
1222                                }
1223                            }
1224                        }
1225                    }
1226                    return;
1227                }
1228                // All messages are binary with type prefix + MessagePack body
1229                debug!(
1230                    "[Peer {}] Received {} bytes on data channel",
1231                    peer_short,
1232                    msg_data.len()
1233                );
1234                match parse_message(&msg_data) {
1235                    Ok(data_msg) => match data_msg {
1236                        DataMessage::Request(req) => {
1237                            let hash_hex = hash_to_hex(&req.h);
1238                            let hash_short = &hash_hex[..8.min(hash_hex.len())];
1239                            info!("[Peer {}] Received request for {}", peer_short, hash_short);
1240
1241                            if let Some(cashu_quotes) = cashu_quotes.as_ref() {
1242                                if cashu_quotes
1243                                    .should_refuse_requests_from_peer(&peer_id.to_string())
1244                                    .await
1245                                {
1246                                    info!(
1247                                        "[Peer {}] Refusing request from peer with unpaid defaults",
1248                                        peer_short
1249                                    );
1250                                    return;
1251                                }
1252                            }
1253
1254                            let quoted_settlement = if let Some(quote_id) = req.q {
1255                                let Some(cashu_quotes) = cashu_quotes.as_ref() else {
1256                                    info!(
1257                                        "[Peer {}] Ignoring quoted request without Cashu settlement state",
1258                                        peer_short
1259                                    );
1260                                    return;
1261                                };
1262                                match cashu_quotes
1263                                    .take_valid_quote(&peer_id.to_string(), &req.h, quote_id)
1264                                    .await
1265                                {
1266                                    Some(settlement) => Some((quote_id, settlement)),
1267                                    None => {
1268                                        info!(
1269                                            "[Peer {}] Ignoring request with invalid or expired quote {}",
1270                                            peer_short, quote_id
1271                                        );
1272                                        return;
1273                                    }
1274                                }
1275                            } else {
1276                                None
1277                            };
1278
1279                            // Handle request - look up in store
1280                            let data = if let Some(ref store) = store {
1281                                match store.get(&hash_hex) {
1282                                    Ok(Some(data)) => {
1283                                        info!(
1284                                            "[Peer {}] Found {} in store ({} bytes)",
1285                                            peer_short,
1286                                            hash_short,
1287                                            data.len()
1288                                        );
1289                                        Some(data)
1290                                    }
1291                                    Ok(None) => {
1292                                        info!(
1293                                            "[Peer {}] Hash {} not in store",
1294                                            peer_short, hash_short
1295                                        );
1296                                        None
1297                                    }
1298                                    Err(e) => {
1299                                        warn!("[Peer {}] Store error: {}", peer_short, e);
1300                                        None
1301                                    }
1302                                }
1303                            } else {
1304                                warn!(
1305                                    "[Peer {}] No store configured - cannot serve requests",
1306                                    peer_short
1307                                );
1308                                None
1309                            };
1310
1311                            // Send response only if we have data
1312                            if let Some(data) = data {
1313                                let data_len = data.len();
1314                                if let (Some(cashu_quotes), Some((quote_id, settlement))) =
1315                                    (cashu_quotes.as_ref(), quoted_settlement)
1316                                {
1317                                    match cashu_quotes
1318                                        .prepare_quoted_transfer(
1319                                            &peer_id.to_string(),
1320                                            &req.h,
1321                                            quote_id,
1322                                            &settlement,
1323                                            data,
1324                                        )
1325                                        .await
1326                                    {
1327                                        Some((first_chunk, first_expected)) => {
1328                                            if send_quoted_chunk(
1329                                                &dc,
1330                                                &peer_id,
1331                                                &peer_short,
1332                                                cashu_quotes,
1333                                                first_chunk,
1334                                                first_expected,
1335                                            )
1336                                            .await
1337                                            {
1338                                                info!(
1339                                                    "[Peer {}] Started quoted chunked response for {} ({} bytes)",
1340                                                    peer_short, hash_short, data_len
1341                                                );
1342                                            }
1343                                        }
1344                                        None => {
1345                                            warn!(
1346                                                "[Peer {}] Failed to prepare quoted transfer for {}",
1347                                                peer_short, hash_short
1348                                            );
1349                                        }
1350                                    }
1351                                } else {
1352                                    let response = DataResponse {
1353                                        h: req.h,
1354                                        d: data,
1355                                        i: None,
1356                                        n: None,
1357                                    };
1358                                    if let Ok(wire) = encode_response(&response) {
1359                                        if let Err(e) = dc.send(&Bytes::from(wire)).await {
1360                                            error!(
1361                                                "[Peer {}] Failed to send response: {}",
1362                                                peer_short, e
1363                                            );
1364                                        } else {
1365                                            info!(
1366                                                "[Peer {}] Sent response for {} ({} bytes)",
1367                                                peer_short, hash_short, data_len
1368                                            );
1369                                        }
1370                                    }
1371                                }
1372                            } else {
1373                                info!("[Peer {}] Content not found for {}", peer_short, hash_short);
1374                            }
1375                        }
1376                        DataMessage::Response(res) => {
1377                            let hash_hex = hash_to_hex(&res.h);
1378                            let hash_short = &hash_hex[..8.min(hash_hex.len())];
1379                            debug!(
1380                                "[Peer {}] Received response for {} ({} bytes)",
1381                                peer_short,
1382                                hash_short,
1383                                res.d.len()
1384                            );
1385
1386                            // Resolve the pending request by hash
1387                            let mut pending = pending_requests.lock().await;
1388                            if let Some(req) = pending.remove(&hash_hex) {
1389                                let _ = req.response_tx.send(Some(res.d));
1390                            }
1391                        }
1392                        DataMessage::QuoteRequest(req) => {
1393                            let response = handle_quote_request_message(
1394                                &peer_short,
1395                                &peer_id,
1396                                &store,
1397                                cashu_quotes.as_ref(),
1398                                &req,
1399                            )
1400                            .await;
1401                            if let Some(response) = response {
1402                                if let Ok(wire) = encode_quote_response(&response) {
1403                                    if let Err(e) = dc.send(&Bytes::from(wire)).await {
1404                                        warn!(
1405                                            "[Peer {}] Failed to send quote response: {}",
1406                                            peer_short, e
1407                                        );
1408                                    }
1409                                }
1410                            }
1411                        }
1412                        DataMessage::QuoteResponse(res) => {
1413                            if let Some(cashu_quotes) = cashu_quotes.as_ref() {
1414                                let _ = cashu_quotes
1415                                    .handle_quote_response(&peer_id.to_string(), res)
1416                                    .await;
1417                            }
1418                        }
1419                        DataMessage::Chunk(chunk) => {
1420                            process_chunk_message(
1421                                &peer_short,
1422                                &peer_id,
1423                                &dc,
1424                                &pending_requests,
1425                                cashu_quotes.as_ref(),
1426                                chunk,
1427                            )
1428                            .await;
1429                        }
1430                        DataMessage::Payment(req) => {
1431                            let outcome =
1432                                handle_payment_message(&peer_id, cashu_quotes.as_ref(), &req).await;
1433                            if let Ok(wire) = encode_payment_ack(&outcome.ack) {
1434                                if let Err(e) = dc.send(&Bytes::from(wire)).await {
1435                                    warn!(
1436                                        "[Peer {}] Failed to send payment ack: {}",
1437                                        peer_short, e
1438                                    );
1439                                }
1440                            }
1441                            if let (Some(cashu_quotes), Some((next_chunk, next_expected))) =
1442                                (cashu_quotes.as_ref(), outcome.next_chunk)
1443                            {
1444                                let _ = send_quoted_chunk(
1445                                    &dc,
1446                                    &peer_id,
1447                                    &peer_short,
1448                                    cashu_quotes,
1449                                    next_chunk,
1450                                    next_expected,
1451                                )
1452                                .await;
1453                            }
1454                        }
1455                        DataMessage::PaymentAck(res) => {
1456                            handle_payment_ack_message(
1457                                &peer_short,
1458                                &peer_id,
1459                                &dc,
1460                                &pending_requests,
1461                                cashu_quotes.as_ref(),
1462                                res,
1463                            )
1464                            .await;
1465                        }
1466                    },
1467                    Err(e) => {
1468                        warn!("[Peer {}] Failed to parse message: {:?}", peer_short, e);
1469                        // Log hex dump of first 50 bytes for debugging
1470                        let hex_dump: String = msg_data
1471                            .iter()
1472                            .take(50)
1473                            .map(|b| format!("{:02x}", b))
1474                            .collect();
1475                        warn!("[Peer {}] Message hex: {}", peer_short, hex_dump);
1476                    }
1477                }
1478            })
1479        }));
1480    }
1481
1482    /// Check if data channel is ready
1483    pub fn has_data_channel(&self) -> bool {
1484        // Use try_lock for non-async context
1485        self.data_channel
1486            .try_lock()
1487            .map(|guard| {
1488                guard
1489                    .as_ref()
1490                    .map(|dc| dc.ready_state() == RTCDataChannelState::Open)
1491                    .unwrap_or(false)
1492            })
1493            .unwrap_or(false)
1494    }
1495
1496    /// Request content by hash from this peer
1497    pub async fn request(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
1498        self.request_with_timeout(hash_hex, std::time::Duration::from_secs(10))
1499            .await
1500    }
1501
1502    /// Request content by hash from this peer with an explicit timeout.
1503    pub async fn request_with_timeout(
1504        &self,
1505        hash_hex: &str,
1506        timeout: std::time::Duration,
1507    ) -> Result<Option<Vec<u8>>> {
1508        let dc_guard = self.data_channel.lock().await;
1509        let dc = dc_guard
1510            .as_ref()
1511            .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1512            .clone();
1513        drop(dc_guard); // Release lock before async operations
1514
1515        // Convert hex to binary hash
1516        let hash = hex::decode(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hex hash: {}", e))?;
1517
1518        // Create response channel
1519        let (tx, rx) = oneshot::channel();
1520
1521        // Store pending request (keyed by hash hex)
1522        {
1523            let mut pending = self.pending_requests.lock().await;
1524            pending.insert(
1525                hash_hex.to_string(),
1526                PendingRequest::standard(hash.clone(), tx),
1527            );
1528        }
1529
1530        // Send request with blob-request default HTL (fresh request from us)
1531        let req = DataRequest {
1532            h: hash,
1533            htl: BLOB_REQUEST_POLICY.max_htl,
1534            q: None,
1535        };
1536        let wire = encode_request(&req)?;
1537        dc.send(&Bytes::from(wire)).await?;
1538
1539        debug!(
1540            "[Peer {}] Sent request for {}",
1541            self.peer_id.short(),
1542            &hash_hex[..8.min(hash_hex.len())]
1543        );
1544
1545        // Wait for response with timeout
1546        match tokio::time::timeout(timeout, rx).await {
1547            Ok(Ok(data)) => Ok(data),
1548            Ok(Err(_)) => {
1549                // Channel closed
1550                Ok(None)
1551            }
1552            Err(_) => {
1553                // Timeout - clean up pending request
1554                let mut pending = self.pending_requests.lock().await;
1555                pending.remove(hash_hex);
1556                Ok(None)
1557            }
1558        }
1559    }
1560
1561    /// Query a peer's embedded Nostr relay over the WebRTC data channel.
1562    /// Returns all events received before EOSE/timeout.
1563    pub async fn query_nostr_events(
1564        &self,
1565        filters: Vec<NostrFilter>,
1566        timeout: std::time::Duration,
1567    ) -> Result<Vec<nostr::Event>> {
1568        let dc_guard = self.data_channel.lock().await;
1569        let dc = dc_guard
1570            .as_ref()
1571            .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1572            .clone();
1573        drop(dc_guard);
1574
1575        let subscription_id = NostrSubscriptionId::generate();
1576        let subscription_key = subscription_id.to_string();
1577        let (tx, mut rx) = mpsc::unbounded_channel::<NostrRelayMessage>();
1578
1579        {
1580            let mut pending = self.pending_nostr_queries.lock().await;
1581            pending.insert(subscription_key.clone(), tx);
1582        }
1583
1584        let req = NostrClientMessage::req(subscription_id.clone(), filters);
1585        if let Err(e) = dc.send_text(req.as_json()).await {
1586            let mut pending = self.pending_nostr_queries.lock().await;
1587            pending.remove(&subscription_key);
1588            return Err(e.into());
1589        }
1590        debug!(
1591            "[Peer {}] Sent Nostr REQ subscription {}",
1592            self.peer_id.short(),
1593            subscription_id
1594        );
1595
1596        let mut events = Vec::new();
1597        let deadline = tokio::time::Instant::now() + timeout;
1598
1599        loop {
1600            let now = tokio::time::Instant::now();
1601            if now >= deadline {
1602                break;
1603            }
1604            let remaining = deadline - now;
1605
1606            let next = tokio::time::timeout(remaining, rx.recv()).await;
1607            match next {
1608                Ok(Some(NostrRelayMessage::Event {
1609                    subscription_id: sid,
1610                    event,
1611                })) if sid == subscription_id => {
1612                    debug!(
1613                        "[Peer {}] Received Nostr EVENT for subscription {}",
1614                        self.peer_id.short(),
1615                        subscription_id
1616                    );
1617                    events.push(*event);
1618                }
1619                Ok(Some(NostrRelayMessage::EndOfStoredEvents(sid))) if sid == subscription_id => {
1620                    debug!(
1621                        "[Peer {}] Received Nostr EOSE for subscription {}",
1622                        self.peer_id.short(),
1623                        subscription_id
1624                    );
1625                    break;
1626                }
1627                Ok(Some(NostrRelayMessage::Closed {
1628                    subscription_id: sid,
1629                    message,
1630                })) if sid == subscription_id => {
1631                    warn!(
1632                        "[Peer {}] Nostr query closed for subscription {}: {}",
1633                        self.peer_id.short(),
1634                        subscription_id,
1635                        message
1636                    );
1637                    break;
1638                }
1639                Ok(Some(_)) => {}
1640                Ok(None) => break,
1641                Err(_) => {
1642                    warn!(
1643                        "[Peer {}] Nostr query timed out for subscription {}",
1644                        self.peer_id.short(),
1645                        subscription_id
1646                    );
1647                    break;
1648                }
1649            }
1650        }
1651
1652        let close = NostrClientMessage::close(subscription_id.clone());
1653        let _ = dc.send_text(close.as_json()).await;
1654
1655        let mut pending = self.pending_nostr_queries.lock().await;
1656        pending.remove(&subscription_key);
1657        debug!(
1658            "[Peer {}] Nostr query subscription {} collected {} event(s)",
1659            self.peer_id.short(),
1660            subscription_id,
1661            events.len()
1662        );
1663
1664        Ok(events)
1665    }
1666
1667    /// Send a mesh signaling frame as text over the data channel.
1668    pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
1669        let dc_guard = self.data_channel.lock().await;
1670        let dc = dc_guard
1671            .as_ref()
1672            .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1673            .clone();
1674        drop(dc_guard);
1675
1676        let text = serde_json::to_string(frame)?;
1677        dc.send_text(text).await?;
1678        Ok(())
1679    }
1680
1681    /// Send a message over the data channel
1682    pub async fn send_message(&self, msg: &DataMessage) -> Result<()> {
1683        let dc_guard = self.data_channel.lock().await;
1684        if let Some(ref dc) = *dc_guard {
1685            let wire = encode_message(msg)?;
1686            dc.send(&Bytes::from(wire)).await?;
1687        }
1688        Ok(())
1689    }
1690
1691    /// Close the connection
1692    pub async fn close(&self) -> Result<()> {
1693        {
1694            let dc_guard = self.data_channel.lock().await;
1695            if let Some(ref dc) = *dc_guard {
1696                dc.close().await?;
1697            }
1698        }
1699        self.pc.close().await?;
1700        Ok(())
1701    }
1702}
1703
1704fn relay_subscription_id(msg: &NostrRelayMessage) -> Option<String> {
1705    match msg {
1706        NostrRelayMessage::Event {
1707            subscription_id, ..
1708        } => Some(subscription_id.to_string()),
1709        NostrRelayMessage::EndOfStoredEvents(subscription_id) => Some(subscription_id.to_string()),
1710        NostrRelayMessage::Closed {
1711            subscription_id, ..
1712        } => Some(subscription_id.to_string()),
1713        NostrRelayMessage::Count {
1714            subscription_id, ..
1715        } => Some(subscription_id.to_string()),
1716        _ => None,
1717    }
1718}
1719
1720#[async_trait]
1721impl RoutedPeerLink for Peer {
1722    async fn send(&self, data: Vec<u8>) -> std::result::Result<(), RoutedTransportError> {
1723        let dc = self
1724            .data_channel
1725            .lock()
1726            .await
1727            .as_ref()
1728            .cloned()
1729            .ok_or(RoutedTransportError::NotConnected)?;
1730        dc.send(&Bytes::from(data))
1731            .await
1732            .map(|_| ())
1733            .map_err(|e| RoutedTransportError::SendFailed(e.to_string()))
1734    }
1735
1736    async fn recv(&self) -> Option<Vec<u8>> {
1737        None
1738    }
1739
1740    fn try_recv(&self) -> Option<Vec<u8>> {
1741        None
1742    }
1743
1744    fn is_open(&self) -> bool {
1745        self.has_data_channel()
1746    }
1747
1748    async fn close(&self) {
1749        let _ = Peer::close(self).await;
1750    }
1751}