Skip to main content

hashtree_cli/webrtc/
peer.rs

1//! WebRTC peer connection for hashtree data exchange
2
3use anyhow::Result;
4use bytes::Bytes;
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::{mpsc, oneshot, Mutex, Notify};
8use tracing::{debug, error, info, warn};
9use webrtc::api::interceptor_registry::register_default_interceptors;
10use webrtc::api::media_engine::MediaEngine;
11use webrtc::api::setting_engine::SettingEngine;
12use webrtc::api::APIBuilder;
13use webrtc::data_channel::data_channel_init::RTCDataChannelInit;
14use webrtc::data_channel::data_channel_message::DataChannelMessage;
15use webrtc::data_channel::data_channel_state::RTCDataChannelState;
16use webrtc::data_channel::RTCDataChannel;
17use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
18use webrtc::ice_transport::ice_server::RTCIceServer;
19use webrtc::interceptor::registry::Registry;
20use webrtc::peer_connection::configuration::RTCConfiguration;
21use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
22use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
23use webrtc::peer_connection::RTCPeerConnection;
24
25use super::cashu::{CashuQuoteState, ExpectedSettlement};
26use super::types::{
27    encode_chunk, encode_message, encode_payment, encode_payment_ack, encode_quote_response,
28    encode_request, encode_response, hash_to_hex, parse_message, validate_mesh_frame, DataChunk,
29    DataMessage, DataPayment, DataPaymentAck, DataQuoteRequest, DataRequest, DataResponse,
30    MeshNostrFrame, PeerDirection, PeerHTLConfig, PeerId, PeerStateEvent, SignalingMessage,
31    BLOB_REQUEST_POLICY,
32};
33use crate::nostr_relay::NostrRelay;
34use nostr::{
35    ClientMessage as NostrClientMessage, Filter as NostrFilter, JsonUtil as NostrJsonUtil,
36    RelayMessage as NostrRelayMessage, SubscriptionId as NostrSubscriptionId,
37};
38
39/// Trait for content storage that can be used by WebRTC peers
40pub trait ContentStore: Send + Sync + 'static {
41    /// Get content by hex hash
42    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>>;
43}
44
45/// Pending request tracking (keyed by hash hex)
46pub struct PendingRequest {
47    pub hash: Vec<u8>,
48    pub response_tx: oneshot::Sender<Option<Vec<u8>>>,
49    pub quoted: Option<PendingQuotedRequest>,
50}
51
52pub struct PendingQuotedRequest {
53    pub quote_id: u64,
54    pub mint_url: String,
55    pub total_payment_sat: u64,
56    pub confirmed_payment_sat: u64,
57    pub next_chunk_index: u32,
58    pub total_chunks: Option<u32>,
59    pub assembled_data: Vec<u8>,
60    pub in_flight_payment: Option<PendingChunkPayment>,
61    pub buffered_chunk: Option<DataChunk>,
62}
63
64pub struct PendingChunkPayment {
65    pub chunk_index: u32,
66    pub amount_sat: u64,
67    pub mint_url: String,
68    pub operation_id: String,
69    pub final_chunk: bool,
70}
71
72impl PendingRequest {
73    pub fn standard(hash: Vec<u8>, response_tx: oneshot::Sender<Option<Vec<u8>>>) -> Self {
74        Self {
75            hash,
76            response_tx,
77            quoted: None,
78        }
79    }
80
81    pub fn quoted(
82        hash: Vec<u8>,
83        response_tx: oneshot::Sender<Option<Vec<u8>>>,
84        quote_id: u64,
85        mint_url: String,
86        total_payment_sat: u64,
87    ) -> Self {
88        Self {
89            hash,
90            response_tx,
91            quoted: Some(PendingQuotedRequest {
92                quote_id,
93                mint_url,
94                total_payment_sat,
95                confirmed_payment_sat: 0,
96                next_chunk_index: 0,
97                total_chunks: None,
98                assembled_data: Vec::new(),
99                in_flight_payment: None,
100                buffered_chunk: None,
101            }),
102        }
103    }
104}
105
106async fn handle_quote_request_message(
107    peer_short: &str,
108    peer_id: &PeerId,
109    store: &Option<Arc<dyn ContentStore>>,
110    cashu_quotes: Option<&Arc<CashuQuoteState>>,
111    req: &DataQuoteRequest,
112) -> Option<super::types::DataQuoteResponse> {
113    let Some(cashu_quotes) = cashu_quotes else {
114        debug!(
115            "[Peer {}] Ignoring quote request without Cashu policy",
116            peer_short
117        );
118        return None;
119    };
120
121    if cashu_quotes
122        .should_refuse_requests_from_peer(&peer_id.to_string())
123        .await
124    {
125        return Some(
126            cashu_quotes
127                .build_quote_response(&peer_id.to_string(), req, false)
128                .await,
129        );
130    }
131
132    let hash_hex = hash_to_hex(&req.h);
133    let can_serve = if let Some(store) = store {
134        match store.get(&hash_hex) {
135            Ok(Some(_)) => true,
136            Ok(None) => false,
137            Err(e) => {
138                warn!("[Peer {}] Store error during quote: {}", peer_short, e);
139                false
140            }
141        }
142    } else {
143        false
144    };
145
146    Some(
147        cashu_quotes
148            .build_quote_response(&peer_id.to_string(), req, can_serve)
149            .await,
150    )
151}
152
153struct PaymentHandlingOutcome {
154    ack: DataPaymentAck,
155    next_chunk: Option<(DataChunk, ExpectedSettlement)>,
156}
157
158async fn send_quoted_chunk(
159    dc: &Arc<RTCDataChannel>,
160    peer_id: &PeerId,
161    peer_short: &str,
162    cashu_quotes: &Arc<CashuQuoteState>,
163    chunk: DataChunk,
164    expected: ExpectedSettlement,
165) -> bool {
166    let hash_hex = hash_to_hex(&chunk.h);
167    let wire = match encode_chunk(&chunk) {
168        Ok(wire) => wire,
169        Err(err) => {
170            warn!(
171                "[Peer {}] Failed to encode quoted chunk {} for quote {}: {}",
172                peer_short, chunk.c, chunk.q, err
173            );
174            return false;
175        }
176    };
177
178    if let Err(err) = dc.send(&Bytes::from(wire)).await {
179        warn!(
180            "[Peer {}] Failed to send quoted chunk {} for quote {}: {}",
181            peer_short, chunk.c, chunk.q, err
182        );
183        return false;
184    }
185
186    cashu_quotes
187        .register_expected_payment(peer_id.to_string(), hash_hex, chunk.q, expected)
188        .await;
189    true
190}
191
192async fn fail_pending_request(
193    pending_requests: &Arc<Mutex<HashMap<String, PendingRequest>>>,
194    cashu_quotes: Option<&Arc<CashuQuoteState>>,
195    hash_hex: &str,
196) {
197    let pending = pending_requests.lock().await.remove(hash_hex);
198    let Some(pending) = pending else {
199        return;
200    };
201
202    if let (Some(cashu_quotes), Some(quoted)) = (cashu_quotes, pending.quoted) {
203        if let Some(in_flight) = quoted.in_flight_payment {
204            let _ = cashu_quotes
205                .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
206                .await;
207        }
208    }
209    let _ = pending.response_tx.send(None);
210}
211
212async fn process_chunk_message(
213    peer_short: &str,
214    _peer_id: &PeerId,
215    dc: &Arc<RTCDataChannel>,
216    pending_requests: &Arc<Mutex<HashMap<String, PendingRequest>>>,
217    cashu_quotes: Option<&Arc<CashuQuoteState>>,
218    chunk: DataChunk,
219) {
220    let hash_hex = hash_to_hex(&chunk.h);
221    let Some(cashu_quotes) = cashu_quotes else {
222        fail_pending_request(pending_requests, None, &hash_hex).await;
223        return;
224    };
225
226    enum ChunkAction {
227        BufferOnly,
228        Fail,
229        Pay {
230            mint_url: String,
231            amount_sat: u64,
232            final_chunk: bool,
233        },
234    }
235
236    let action = {
237        let mut pending = pending_requests.lock().await;
238        let Some(request) = pending.get_mut(&hash_hex) else {
239            return;
240        };
241        match request.quoted.as_mut() {
242            None => ChunkAction::Fail,
243            Some(quoted) if quoted.quote_id != chunk.q || chunk.n == 0 => ChunkAction::Fail,
244            Some(quoted) => {
245                if let Some(in_flight) = quoted.in_flight_payment.as_ref() {
246                    let expected_buffer_index = in_flight.chunk_index + 1;
247                    if chunk.c == expected_buffer_index && quoted.buffered_chunk.is_none() {
248                        quoted.buffered_chunk = Some(chunk.clone());
249                        ChunkAction::BufferOnly
250                    } else {
251                        ChunkAction::Fail
252                    }
253                } else if chunk.c != quoted.next_chunk_index {
254                    ChunkAction::Fail
255                } else if let Some(total_chunks) = quoted.total_chunks {
256                    if total_chunks != chunk.n {
257                        ChunkAction::Fail
258                    } else {
259                        let next_total = quoted.confirmed_payment_sat.saturating_add(chunk.p);
260                        if next_total > quoted.total_payment_sat
261                            || (chunk.c + 1 == chunk.n && next_total != quoted.total_payment_sat)
262                        {
263                            ChunkAction::Fail
264                        } else {
265                            quoted.total_chunks = Some(chunk.n);
266                            quoted.assembled_data.extend_from_slice(&chunk.d);
267                            quoted.next_chunk_index += 1;
268                            ChunkAction::Pay {
269                                mint_url: quoted.mint_url.clone(),
270                                amount_sat: chunk.p,
271                                final_chunk: chunk.c + 1 == chunk.n,
272                            }
273                        }
274                    }
275                } else {
276                    let next_total = quoted.confirmed_payment_sat.saturating_add(chunk.p);
277                    if next_total > quoted.total_payment_sat
278                        || (chunk.c + 1 == chunk.n && next_total != quoted.total_payment_sat)
279                    {
280                        ChunkAction::Fail
281                    } else {
282                        quoted.total_chunks = Some(chunk.n);
283                        quoted.assembled_data.extend_from_slice(&chunk.d);
284                        quoted.next_chunk_index += 1;
285                        ChunkAction::Pay {
286                            mint_url: quoted.mint_url.clone(),
287                            amount_sat: chunk.p,
288                            final_chunk: chunk.c + 1 == chunk.n,
289                        }
290                    }
291                }
292            }
293        }
294    };
295
296    match action {
297        ChunkAction::BufferOnly => (),
298        ChunkAction::Fail => {
299            warn!(
300                "[Peer {}] Invalid quoted chunk {} for hash {}",
301                peer_short, chunk.c, hash_hex
302            );
303            fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
304        }
305        ChunkAction::Pay {
306            mint_url,
307            amount_sat,
308            final_chunk,
309        } => {
310            let payment = match cashu_quotes
311                .create_payment_token(&mint_url, amount_sat)
312                .await
313            {
314                Ok(payment) => payment,
315                Err(err) => {
316                    warn!(
317                        "[Peer {}] Failed to create payment token for chunk {} of {}: {}",
318                        peer_short, chunk.c, hash_hex, err
319                    );
320                    fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
321                    return;
322                }
323            };
324
325            {
326                let mut pending = pending_requests.lock().await;
327                let Some(request) = pending.get_mut(&hash_hex) else {
328                    let _ = cashu_quotes
329                        .revoke_payment_token(&payment.mint_url, &payment.operation_id)
330                        .await;
331                    return;
332                };
333                let Some(quoted) = request.quoted.as_mut() else {
334                    let _ = cashu_quotes
335                        .revoke_payment_token(&payment.mint_url, &payment.operation_id)
336                        .await;
337                    return;
338                };
339                quoted.in_flight_payment = Some(PendingChunkPayment {
340                    chunk_index: chunk.c,
341                    amount_sat,
342                    mint_url: payment.mint_url.clone(),
343                    operation_id: payment.operation_id.clone(),
344                    final_chunk,
345                });
346            }
347
348            let payment_msg = DataPayment {
349                h: chunk.h,
350                q: chunk.q,
351                c: chunk.c,
352                p: amount_sat,
353                m: Some(payment.mint_url.clone()),
354                tok: payment.token,
355            };
356            let wire = match encode_payment(&payment_msg) {
357                Ok(wire) => wire,
358                Err(err) => {
359                    warn!(
360                        "[Peer {}] Failed to encode payment for chunk {} of {}: {}",
361                        peer_short, chunk.c, hash_hex, err
362                    );
363                    fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
364                    return;
365                }
366            };
367            if let Err(err) = dc.send(&Bytes::from(wire)).await {
368                warn!(
369                    "[Peer {}] Failed to send payment for chunk {} of {}: {}",
370                    peer_short, chunk.c, hash_hex, err
371                );
372                fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
373            }
374        }
375    }
376}
377
378async fn handle_payment_ack_message(
379    peer_short: &str,
380    peer_id: &PeerId,
381    dc: &Arc<RTCDataChannel>,
382    pending_requests: &Arc<Mutex<HashMap<String, PendingRequest>>>,
383    cashu_quotes: Option<&Arc<CashuQuoteState>>,
384    ack: DataPaymentAck,
385) {
386    let Some(cashu_quotes) = cashu_quotes else {
387        return;
388    };
389    let hash_hex = hash_to_hex(&ack.h);
390    let mut buffered_next = None;
391    let mut completed = None;
392    let mut failed = None;
393    let mut confirmed_amount = None;
394    let mut completed_data = None;
395
396    {
397        let mut pending = pending_requests.lock().await;
398        let Some(request) = pending.get_mut(&hash_hex) else {
399            return;
400        };
401        let Some(quoted) = request.quoted.as_mut() else {
402            return;
403        };
404        let Some(in_flight) = quoted.in_flight_payment.take() else {
405            return;
406        };
407        if ack.q != quoted.quote_id || ack.c != in_flight.chunk_index {
408            quoted.in_flight_payment = Some(in_flight);
409            return;
410        }
411
412        if !ack.a {
413            failed = Some(in_flight);
414        } else {
415            quoted.confirmed_payment_sat = quoted
416                .confirmed_payment_sat
417                .saturating_add(in_flight.amount_sat);
418            confirmed_amount = Some(in_flight.amount_sat);
419            if in_flight.final_chunk {
420                completed_data = Some(quoted.assembled_data.clone());
421            } else if let Some(next_chunk) = quoted.buffered_chunk.take() {
422                buffered_next = Some(next_chunk);
423            }
424        }
425
426        if let Some(data) = completed_data.take() {
427            let finished = pending
428                .remove(&hash_hex)
429                .expect("pending request must exist");
430            completed = Some((finished.response_tx, data));
431        }
432    }
433
434    if let Some(amount_sat) = confirmed_amount {
435        cashu_quotes
436            .record_paid_peer(&peer_id.to_string(), amount_sat)
437            .await;
438    }
439
440    if let Some(in_flight) = failed {
441        warn!(
442            "[Peer {}] Payment ack rejected chunk {} for {}: {}",
443            peer_short,
444            ack.c,
445            hash_hex,
446            ack.e.as_deref().unwrap_or("payment rejected")
447        );
448        let _ = cashu_quotes
449            .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
450            .await;
451        let removed = pending_requests.lock().await.remove(&hash_hex);
452        if let Some(removed) = removed {
453            let _ = removed.response_tx.send(None);
454        }
455        return;
456    }
457
458    if let Some((tx, data)) = completed {
459        let _ = tx.send(Some(data));
460        return;
461    }
462
463    if let Some(next_chunk) = buffered_next {
464        process_chunk_message(
465            peer_short,
466            peer_id,
467            dc,
468            pending_requests,
469            Some(cashu_quotes),
470            next_chunk,
471        )
472        .await;
473    }
474}
475
476async fn handle_payment_message(
477    peer_id: &PeerId,
478    cashu_quotes: Option<&Arc<CashuQuoteState>>,
479    req: &DataPayment,
480) -> PaymentHandlingOutcome {
481    let nack = |err: String| PaymentHandlingOutcome {
482        ack: DataPaymentAck {
483            h: req.h.clone(),
484            q: req.q,
485            c: req.c,
486            a: false,
487            e: Some(err),
488        },
489        next_chunk: None,
490    };
491
492    let Some(cashu_quotes) = cashu_quotes else {
493        return nack("Cashu settlement unavailable".to_string());
494    };
495
496    let expected = match cashu_quotes
497        .claim_expected_payment(
498            &peer_id.to_string(),
499            &req.h,
500            req.q,
501            req.c,
502            req.p,
503            req.m.as_deref(),
504        )
505        .await
506    {
507        Ok(expected) => expected,
508        Err(err) => {
509            cashu_quotes
510                .record_payment_default_from_peer(&peer_id.to_string())
511                .await;
512            return nack(err.to_string());
513        }
514    };
515
516    match cashu_quotes.receive_payment_token(&req.tok).await {
517        Ok(received) if received.amount_sat >= expected.payment_sat => {
518            if expected.mint_url.as_deref() != Some(received.mint_url.as_str()) {
519                cashu_quotes
520                    .record_payment_default_from_peer(&peer_id.to_string())
521                    .await;
522                return nack("Received payment mint did not match quoted mint".to_string());
523            }
524            if let Err(err) = cashu_quotes
525                .record_receipt_from_peer(
526                    &peer_id.to_string(),
527                    &received.mint_url,
528                    received.amount_sat,
529                )
530                .await
531            {
532                warn!(
533                    "[Peer {}] Failed to persist Cashu mint success for {}: {}",
534                    peer_id.short(),
535                    received.mint_url,
536                    err
537                );
538            }
539
540            let next_chunk = if expected.final_chunk {
541                None
542            } else {
543                cashu_quotes
544                    .next_outgoing_chunk(&peer_id.to_string(), &req.h, req.q)
545                    .await
546            };
547
548            PaymentHandlingOutcome {
549                ack: DataPaymentAck {
550                    h: req.h.clone(),
551                    q: req.q,
552                    c: req.c,
553                    a: true,
554                    e: None,
555                },
556                next_chunk,
557            }
558        }
559        Ok(_) => {
560            cashu_quotes
561                .record_payment_default_from_peer(&peer_id.to_string())
562                .await;
563            nack("Received payment amount was below the quoted amount".to_string())
564        }
565        Err(err) => {
566            if let Some(mint_url) = expected.mint_url.as_deref().or(req.m.as_deref()) {
567                let _ = cashu_quotes.record_mint_receive_failure(mint_url).await;
568            }
569            nack(err.to_string())
570        }
571    }
572}
573
574/// WebRTC peer connection with data channel protocol
575pub struct Peer {
576    pub peer_id: PeerId,
577    pub direction: PeerDirection,
578    pub created_at: std::time::Instant,
579    pub connected_at: Option<std::time::Instant>,
580
581    pc: Arc<RTCPeerConnection>,
582    /// Data channel - can be set from callback when receiving channel from peer
583    pub data_channel: Arc<Mutex<Option<Arc<RTCDataChannel>>>>,
584    signaling_tx: mpsc::Sender<SignalingMessage>,
585    my_peer_id: PeerId,
586
587    // Content store for serving requests
588    store: Option<Arc<dyn ContentStore>>,
589
590    // Track pending outgoing requests (keyed by hash hex)
591    pub pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
592    // Track pending Nostr relay queries over data channel (keyed by subscription id)
593    pending_nostr_queries: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>>,
594
595    // Channel for incoming data messages
596    #[allow(dead_code)]
597    message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
598    #[allow(dead_code)]
599    message_rx: Option<mpsc::Receiver<(DataMessage, Option<Vec<u8>>)>>,
600
601    // Optional channel to notify signaling layer of state changes
602    state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
603
604    // Optional Nostr relay for text messages over data channel
605    nostr_relay: Option<Arc<NostrRelay>>,
606    // Optional channel for inbound relayless signaling mesh frames
607    mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
608    // Optional Cashu quote negotiation state shared with signaling.
609    cashu_quotes: Option<Arc<CashuQuoteState>>,
610    // Per-peer HTL randomness profile (reused across traffic classes)
611    htl_config: PeerHTLConfig,
612}
613
614impl Peer {
615    /// Create a new peer connection
616    pub async fn new(
617        peer_id: PeerId,
618        direction: PeerDirection,
619        my_peer_id: PeerId,
620        signaling_tx: mpsc::Sender<SignalingMessage>,
621        stun_servers: Vec<String>,
622    ) -> Result<Self> {
623        Self::new_with_store_and_events(
624            peer_id,
625            direction,
626            my_peer_id,
627            signaling_tx,
628            stun_servers,
629            None,
630            None,
631            None,
632            None,
633            None,
634        )
635        .await
636    }
637
638    /// Create a new peer connection with content store
639    pub async fn new_with_store(
640        peer_id: PeerId,
641        direction: PeerDirection,
642        my_peer_id: PeerId,
643        signaling_tx: mpsc::Sender<SignalingMessage>,
644        stun_servers: Vec<String>,
645        store: Option<Arc<dyn ContentStore>>,
646    ) -> Result<Self> {
647        Self::new_with_store_and_events(
648            peer_id,
649            direction,
650            my_peer_id,
651            signaling_tx,
652            stun_servers,
653            store,
654            None,
655            None,
656            None,
657            None,
658        )
659        .await
660    }
661
662    /// Create a new peer connection with content store and state event channel
663    #[allow(clippy::too_many_arguments)]
664    pub(crate) async fn new_with_store_and_events(
665        peer_id: PeerId,
666        direction: PeerDirection,
667        my_peer_id: PeerId,
668        signaling_tx: mpsc::Sender<SignalingMessage>,
669        stun_servers: Vec<String>,
670        store: Option<Arc<dyn ContentStore>>,
671        state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
672        nostr_relay: Option<Arc<NostrRelay>>,
673        mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
674        cashu_quotes: Option<Arc<CashuQuoteState>>,
675    ) -> Result<Self> {
676        // Create WebRTC API
677        let mut m = MediaEngine::default();
678        m.register_default_codecs()?;
679
680        let mut registry = Registry::new();
681        registry = register_default_interceptors(registry, &mut m)?;
682
683        // Enable mDNS temporarily for debugging
684        // Previously disabled due to https://github.com/webrtc-rs/webrtc/issues/616
685        let setting_engine = SettingEngine::default();
686        // Note: mDNS enabled by default
687
688        let api = APIBuilder::new()
689            .with_media_engine(m)
690            .with_interceptor_registry(registry)
691            .with_setting_engine(setting_engine)
692            .build();
693
694        // Configure ICE servers
695        let ice_servers: Vec<RTCIceServer> = stun_servers
696            .iter()
697            .map(|url| RTCIceServer {
698                urls: vec![url.clone()],
699                ..Default::default()
700            })
701            .collect();
702
703        let config = RTCConfiguration {
704            ice_servers,
705            ..Default::default()
706        };
707
708        let pc = Arc::new(api.new_peer_connection(config).await?);
709        let (message_tx, message_rx) = mpsc::channel(100);
710        Ok(Self {
711            peer_id,
712            direction,
713            created_at: std::time::Instant::now(),
714            connected_at: None,
715            pc,
716            data_channel: Arc::new(Mutex::new(None)),
717            signaling_tx,
718            my_peer_id,
719            store,
720            pending_requests: Arc::new(Mutex::new(HashMap::new())),
721            pending_nostr_queries: Arc::new(Mutex::new(HashMap::new())),
722            message_tx,
723            message_rx: Some(message_rx),
724            state_event_tx,
725            nostr_relay,
726            mesh_frame_tx,
727            cashu_quotes,
728            htl_config: PeerHTLConfig::random(),
729        })
730    }
731
732    /// Set content store
733    pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
734        self.store = Some(store);
735    }
736
737    /// Get connection state
738    pub fn state(&self) -> RTCPeerConnectionState {
739        self.pc.connection_state()
740    }
741
742    /// Get signaling state
743    pub fn signaling_state(&self) -> webrtc::peer_connection::signaling_state::RTCSignalingState {
744        self.pc.signaling_state()
745    }
746
747    /// Check if connected
748    pub fn is_connected(&self) -> bool {
749        self.pc.connection_state() == RTCPeerConnectionState::Connected
750    }
751
752    pub fn htl_config(&self) -> &PeerHTLConfig {
753        &self.htl_config
754    }
755
756    /// Setup event handlers for the peer connection
757    pub async fn setup_handlers(&self) -> Result<()> {
758        let peer_id = self.peer_id.clone();
759        let signaling_tx = self.signaling_tx.clone();
760        let my_peer_id_str = self.my_peer_id.to_string();
761        let target_peer_id = self.peer_id.to_string();
762
763        // Handle ICE candidates - work MUST be inside the returned future
764        self.pc
765            .on_ice_candidate(Box::new(move |candidate: Option<RTCIceCandidate>| {
766                let signaling_tx = signaling_tx.clone();
767                let my_peer_id_str = my_peer_id_str.clone();
768                let target_peer_id = target_peer_id.clone();
769
770                Box::pin(async move {
771                    if let Some(c) = candidate {
772                        if let Ok(init) = c.to_json() {
773                            info!(
774                                "ICE candidate generated: {}",
775                                &init.candidate[..init.candidate.len().min(60)]
776                            );
777                            let msg = SignalingMessage::Candidate {
778                                peer_id: my_peer_id_str.clone(),
779                                target_peer_id: target_peer_id.clone(),
780                                candidate: init.candidate,
781                                sdp_m_line_index: init.sdp_mline_index,
782                                sdp_mid: init.sdp_mid,
783                            };
784                            if let Err(e) = signaling_tx.send(msg).await {
785                                error!("Failed to send ICE candidate: {}", e);
786                            }
787                        }
788                    }
789                })
790            }));
791
792        // Handle connection state changes - work MUST be inside the returned future
793        let peer_id_log = peer_id.clone();
794        let state_event_tx = self.state_event_tx.clone();
795        self.pc
796            .on_peer_connection_state_change(Box::new(move |state: RTCPeerConnectionState| {
797                let peer_id = peer_id_log.clone();
798                let state_event_tx = state_event_tx.clone();
799                Box::pin(async move {
800                    info!("Peer {} connection state: {:?}", peer_id.short(), state);
801
802                    // Notify signaling layer of state changes
803                    if let Some(tx) = state_event_tx {
804                        let event = match state {
805                            RTCPeerConnectionState::Connected => {
806                                Some(PeerStateEvent::Connected(peer_id))
807                            }
808                            RTCPeerConnectionState::Failed => Some(PeerStateEvent::Failed(peer_id)),
809                            RTCPeerConnectionState::Disconnected
810                            | RTCPeerConnectionState::Closed => {
811                                Some(PeerStateEvent::Disconnected(peer_id))
812                            }
813                            _ => None,
814                        };
815                        if let Some(event) = event {
816                            if let Err(e) = tx.send(event).await {
817                                error!("Failed to send peer state event: {}", e);
818                            }
819                        }
820                    }
821                })
822            }));
823
824        Ok(())
825    }
826
827    /// Initiate connection (create offer) - for outbound connections
828    pub async fn connect(&self) -> Result<serde_json::Value> {
829        println!("[Peer {}] Creating data channel...", self.peer_id.short());
830        // Create data channel first
831        // Use unordered for better performance - protocol is stateless (each message self-describes)
832        let dc_init = RTCDataChannelInit {
833            ordered: Some(false),
834            ..Default::default()
835        };
836        let dc = self
837            .pc
838            .create_data_channel("hashtree", Some(dc_init))
839            .await?;
840        println!(
841            "[Peer {}] Data channel created, setting up handlers...",
842            self.peer_id.short()
843        );
844        self.setup_data_channel(dc.clone()).await?;
845        println!(
846            "[Peer {}] Handlers set up, storing data channel...",
847            self.peer_id.short()
848        );
849        {
850            let mut dc_guard = self.data_channel.lock().await;
851            *dc_guard = Some(dc);
852        }
853        println!("[Peer {}] Data channel stored", self.peer_id.short());
854
855        // Create offer and wait for ICE gathering to complete
856        // This ensures all ICE candidates are embedded in the SDP
857        let offer = self.pc.create_offer(None).await?;
858        let mut gathering_complete = self.pc.gathering_complete_promise().await;
859        self.pc.set_local_description(offer).await?;
860
861        // Wait for ICE gathering to complete (with timeout)
862        let _ = tokio::time::timeout(
863            std::time::Duration::from_secs(10),
864            gathering_complete.recv(),
865        )
866        .await;
867
868        // Get the local description with ICE candidates embedded
869        let local_desc = self
870            .pc
871            .local_description()
872            .await
873            .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
874
875        debug!(
876            "Offer created, SDP len: {}, ice_gathering: {:?}",
877            local_desc.sdp.len(),
878            self.pc.ice_gathering_state()
879        );
880
881        // Return offer as JSON
882        let offer_json = serde_json::json!({
883            "type": local_desc.sdp_type.to_string().to_lowercase(),
884            "sdp": local_desc.sdp
885        });
886
887        Ok(offer_json)
888    }
889
890    /// Handle incoming offer and create answer
891    pub async fn handle_offer(&self, offer: serde_json::Value) -> Result<serde_json::Value> {
892        let sdp = offer
893            .get("sdp")
894            .and_then(|s| s.as_str())
895            .ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
896
897        // Setup data channel handler BEFORE set_remote_description
898        // This ensures the handler is registered before any data channel events fire
899        let peer_id = self.peer_id.clone();
900        let message_tx = self.message_tx.clone();
901        let pending_requests = self.pending_requests.clone();
902        let pending_nostr_queries = self.pending_nostr_queries.clone();
903        let store = self.store.clone();
904        let data_channel_holder = self.data_channel.clone();
905        let nostr_relay = self.nostr_relay.clone();
906        let mesh_frame_tx = self.mesh_frame_tx.clone();
907        let cashu_quotes = self.cashu_quotes.clone();
908        let peer_pubkey = Some(self.peer_id.pubkey.clone());
909
910        self.pc
911            .on_data_channel(Box::new(move |dc: Arc<RTCDataChannel>| {
912                let peer_id = peer_id.clone();
913                let message_tx = message_tx.clone();
914                let pending_requests = pending_requests.clone();
915                let pending_nostr_queries = pending_nostr_queries.clone();
916                let store = store.clone();
917                let data_channel_holder = data_channel_holder.clone();
918                let nostr_relay = nostr_relay.clone();
919                let mesh_frame_tx = mesh_frame_tx.clone();
920                let cashu_quotes = cashu_quotes.clone();
921                let peer_pubkey = peer_pubkey.clone();
922
923                // Work MUST be inside the returned future
924                Box::pin(async move {
925                    info!(
926                        "Peer {} received data channel: {}",
927                        peer_id.short(),
928                        dc.label()
929                    );
930
931                    // Store the received data channel
932                    {
933                        let mut dc_guard = data_channel_holder.lock().await;
934                        *dc_guard = Some(dc.clone());
935                    }
936
937                    // Set up message handlers
938                    Self::setup_dc_handlers(
939                        dc.clone(),
940                        peer_id,
941                        message_tx,
942                        pending_requests,
943                        pending_nostr_queries.clone(),
944                        store,
945                        nostr_relay,
946                        mesh_frame_tx,
947                        cashu_quotes,
948                        peer_pubkey,
949                    )
950                    .await;
951                })
952            }));
953
954        // Set remote description after handler is registered
955        let offer_desc = RTCSessionDescription::offer(sdp.to_string())?;
956        self.pc.set_remote_description(offer_desc).await?;
957
958        // Create answer and wait for ICE gathering to complete
959        // This ensures all ICE candidates are embedded in the SDP
960        let answer = self.pc.create_answer(None).await?;
961        let mut gathering_complete = self.pc.gathering_complete_promise().await;
962        self.pc.set_local_description(answer).await?;
963
964        // Wait for ICE gathering to complete (with timeout)
965        let _ = tokio::time::timeout(
966            std::time::Duration::from_secs(10),
967            gathering_complete.recv(),
968        )
969        .await;
970
971        // Get the local description with ICE candidates embedded
972        let local_desc = self
973            .pc
974            .local_description()
975            .await
976            .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
977
978        debug!(
979            "Answer created, SDP len: {}, ice_gathering: {:?}",
980            local_desc.sdp.len(),
981            self.pc.ice_gathering_state()
982        );
983
984        let answer_json = serde_json::json!({
985            "type": local_desc.sdp_type.to_string().to_lowercase(),
986            "sdp": local_desc.sdp
987        });
988
989        Ok(answer_json)
990    }
991
992    /// Handle incoming answer
993    pub async fn handle_answer(&self, answer: serde_json::Value) -> Result<()> {
994        let sdp = answer
995            .get("sdp")
996            .and_then(|s| s.as_str())
997            .ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
998
999        let answer_desc = RTCSessionDescription::answer(sdp.to_string())?;
1000        self.pc.set_remote_description(answer_desc).await?;
1001
1002        Ok(())
1003    }
1004
1005    /// Handle incoming ICE candidate
1006    pub async fn handle_candidate(&self, candidate: serde_json::Value) -> Result<()> {
1007        let candidate_str = candidate
1008            .get("candidate")
1009            .and_then(|c| c.as_str())
1010            .unwrap_or("");
1011
1012        let sdp_mid = candidate
1013            .get("sdpMid")
1014            .and_then(|m| m.as_str())
1015            .map(|s| s.to_string());
1016
1017        let sdp_mline_index = candidate
1018            .get("sdpMLineIndex")
1019            .and_then(|i| i.as_u64())
1020            .map(|i| i as u16);
1021
1022        if !candidate_str.is_empty() {
1023            use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
1024            let init = RTCIceCandidateInit {
1025                candidate: candidate_str.to_string(),
1026                sdp_mid,
1027                sdp_mline_index,
1028                username_fragment: candidate
1029                    .get("usernameFragment")
1030                    .and_then(|u| u.as_str())
1031                    .map(|s| s.to_string()),
1032            };
1033            self.pc.add_ice_candidate(init).await?;
1034        }
1035
1036        Ok(())
1037    }
1038
1039    /// Setup data channel handlers
1040    async fn setup_data_channel(&self, dc: Arc<RTCDataChannel>) -> Result<()> {
1041        let peer_id = self.peer_id.clone();
1042        let message_tx = self.message_tx.clone();
1043        let pending_requests = self.pending_requests.clone();
1044        let store = self.store.clone();
1045        let nostr_relay = self.nostr_relay.clone();
1046        let mesh_frame_tx = self.mesh_frame_tx.clone();
1047        let cashu_quotes = self.cashu_quotes.clone();
1048        let peer_pubkey = Some(self.peer_id.pubkey.clone());
1049
1050        Self::setup_dc_handlers(
1051            dc,
1052            peer_id,
1053            message_tx,
1054            pending_requests,
1055            self.pending_nostr_queries.clone(),
1056            store,
1057            nostr_relay,
1058            mesh_frame_tx,
1059            cashu_quotes,
1060            peer_pubkey,
1061        )
1062        .await;
1063        Ok(())
1064    }
1065
1066    /// Setup handlers for a data channel (shared between outbound and inbound)
1067    #[allow(clippy::too_many_arguments)]
1068    async fn setup_dc_handlers(
1069        dc: Arc<RTCDataChannel>,
1070        peer_id: PeerId,
1071        message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
1072        pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
1073        pending_nostr_queries: Arc<
1074            Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>,
1075        >,
1076        store: Option<Arc<dyn ContentStore>>,
1077        nostr_relay: Option<Arc<NostrRelay>>,
1078        mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
1079        cashu_quotes: Option<Arc<CashuQuoteState>>,
1080        peer_pubkey: Option<String>,
1081    ) {
1082        let label = dc.label().to_string();
1083        let peer_short = peer_id.short();
1084
1085        // Track pending binary data (request_id -> expected after response)
1086        let _pending_binary: Arc<Mutex<Option<u32>>> = Arc::new(Mutex::new(None));
1087
1088        let open_notify = nostr_relay.as_ref().map(|_| Arc::new(Notify::new()));
1089        if let Some(ref notify) = open_notify {
1090            if dc.ready_state() == RTCDataChannelState::Open {
1091                // `notify_one` stores a permit if no waiter is active yet.
1092                notify.notify_one();
1093            }
1094        }
1095
1096        let mut nostr_client_id: Option<u64> = None;
1097        if let Some(relay) = nostr_relay.clone() {
1098            let client_id = relay.next_client_id();
1099            let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
1100            relay
1101                .register_client(client_id, nostr_tx, peer_pubkey.clone())
1102                .await;
1103            nostr_client_id = Some(client_id);
1104
1105            if let Some(notify) = open_notify.clone() {
1106                let dc_for_send = dc.clone();
1107                tokio::spawn(async move {
1108                    notify.notified().await;
1109                    while let Some(text) = nostr_rx.recv().await {
1110                        if dc_for_send.send_text(text).await.is_err() {
1111                            break;
1112                        }
1113                    }
1114                });
1115            }
1116        }
1117
1118        if let (Some(relay), Some(client_id)) = (nostr_relay.clone(), nostr_client_id) {
1119            dc.on_close(Box::new(move || {
1120                let relay = relay.clone();
1121                Box::pin(async move {
1122                    relay.unregister_client(client_id).await;
1123                })
1124            }));
1125        }
1126
1127        let open_notify_clone = open_notify.clone();
1128        let peer_short_open = peer_short.clone();
1129        let label_clone = label.clone();
1130        dc.on_open(Box::new(move || {
1131            let peer_short_open = peer_short_open.clone();
1132            let label_clone = label_clone.clone();
1133            let open_notify = open_notify_clone.clone();
1134            // Work MUST be inside the returned future
1135            Box::pin(async move {
1136                info!(
1137                    "[Peer {}] Data channel '{}' open",
1138                    peer_short_open, label_clone
1139                );
1140                if let Some(notify) = open_notify {
1141                    notify.notify_one();
1142                }
1143            })
1144        }));
1145
1146        let dc_for_msg = dc.clone();
1147        let peer_short_msg = peer_short.clone();
1148        let _pending_binary_clone = _pending_binary.clone();
1149        let store_clone = store.clone();
1150        let nostr_relay_for_msg = nostr_relay.clone();
1151        let nostr_client_id_for_msg = nostr_client_id;
1152        let pending_nostr_queries_for_msg = pending_nostr_queries.clone();
1153        let mesh_frame_tx_for_msg = mesh_frame_tx.clone();
1154        let peer_id_for_msg = peer_id.clone();
1155
1156        dc.on_message(Box::new(move |msg: DataChannelMessage| {
1157            let dc = dc_for_msg.clone();
1158            let peer_short = peer_short_msg.clone();
1159            let pending_requests = pending_requests.clone();
1160            let _pending_binary = _pending_binary_clone.clone();
1161            let _message_tx = message_tx.clone();
1162            let store = store_clone.clone();
1163            let nostr_relay = nostr_relay_for_msg.clone();
1164            let nostr_client_id = nostr_client_id_for_msg;
1165            let pending_nostr_queries = pending_nostr_queries_for_msg.clone();
1166            let mesh_frame_tx = mesh_frame_tx_for_msg.clone();
1167            let cashu_quotes = cashu_quotes.clone();
1168            let peer_id = peer_id_for_msg.clone();
1169            let msg_data = msg.data.clone();
1170
1171            // Work MUST be inside the returned future
1172            Box::pin(async move {
1173                if msg.is_string {
1174                    if let Ok(text) = std::str::from_utf8(&msg_data) {
1175                        if let Ok(mesh_frame) = serde_json::from_str::<MeshNostrFrame>(text) {
1176                            match validate_mesh_frame(&mesh_frame) {
1177                                Ok(()) => {
1178                                    if let Some(tx) = mesh_frame_tx {
1179                                        let _ = tx.send((peer_id.clone(), mesh_frame)).await;
1180                                    }
1181                                    return;
1182                                }
1183                                Err(reason) => {
1184                                    debug!(
1185                                        "[Peer {}] Ignoring invalid mesh frame: {}",
1186                                        peer_short, reason
1187                                    );
1188                                }
1189                            }
1190                        }
1191
1192                        // First, route relay responses to pending local queries.
1193                        if let Ok(relay_msg) = NostrRelayMessage::from_json(text) {
1194                            if let Some(sub_id) = relay_subscription_id(&relay_msg) {
1195                                let sender = {
1196                                    let pending = pending_nostr_queries.lock().await;
1197                                    pending.get(&sub_id).cloned()
1198                                };
1199                                if let Some(tx) = sender {
1200                                    debug!(
1201                                        "[Peer {}] Routed Nostr relay message for subscription {}",
1202                                        peer_short, sub_id
1203                                    );
1204                                    let _ = tx.send(relay_msg);
1205                                    return;
1206                                } else {
1207                                    debug!(
1208                                        "[Peer {}] Dropping Nostr relay message for unknown subscription {}",
1209                                        peer_short, sub_id
1210                                    );
1211                                }
1212                            }
1213                        }
1214
1215                        // Otherwise treat it as a client message to be handled by local relay.
1216                        if let Some(relay) = nostr_relay {
1217                            if let Ok(nostr_msg) = NostrClientMessage::from_json(text) {
1218                                if let Some(client_id) = nostr_client_id {
1219                                    relay.handle_client_message(client_id, nostr_msg).await;
1220                                }
1221                            }
1222                        }
1223                    }
1224                    return;
1225                }
1226                // All messages are binary with type prefix + MessagePack body
1227                debug!(
1228                    "[Peer {}] Received {} bytes on data channel",
1229                    peer_short,
1230                    msg_data.len()
1231                );
1232                match parse_message(&msg_data) {
1233                    Ok(data_msg) => match data_msg {
1234                        DataMessage::Request(req) => {
1235                            let hash_hex = hash_to_hex(&req.h);
1236                            let hash_short = &hash_hex[..8.min(hash_hex.len())];
1237                            info!("[Peer {}] Received request for {}", peer_short, hash_short);
1238
1239                            if let Some(cashu_quotes) = cashu_quotes.as_ref() {
1240                                if cashu_quotes
1241                                    .should_refuse_requests_from_peer(&peer_id.to_string())
1242                                    .await
1243                                {
1244                                    info!(
1245                                        "[Peer {}] Refusing request from peer with unpaid defaults",
1246                                        peer_short
1247                                    );
1248                                    return;
1249                                }
1250                            }
1251
1252                            let quoted_settlement = if let Some(quote_id) = req.q {
1253                                let Some(cashu_quotes) = cashu_quotes.as_ref() else {
1254                                    info!(
1255                                        "[Peer {}] Ignoring quoted request without Cashu settlement state",
1256                                        peer_short
1257                                    );
1258                                    return;
1259                                };
1260                                match cashu_quotes
1261                                    .take_valid_quote(&peer_id.to_string(), &req.h, quote_id)
1262                                    .await
1263                                {
1264                                    Some(settlement) => Some((quote_id, settlement)),
1265                                    None => {
1266                                        info!(
1267                                            "[Peer {}] Ignoring request with invalid or expired quote {}",
1268                                            peer_short, quote_id
1269                                        );
1270                                        return;
1271                                    }
1272                                }
1273                            } else {
1274                                None
1275                            };
1276
1277                            // Handle request - look up in store
1278                            let data = if let Some(ref store) = store {
1279                                match store.get(&hash_hex) {
1280                                    Ok(Some(data)) => {
1281                                        info!(
1282                                            "[Peer {}] Found {} in store ({} bytes)",
1283                                            peer_short,
1284                                            hash_short,
1285                                            data.len()
1286                                        );
1287                                        Some(data)
1288                                    }
1289                                    Ok(None) => {
1290                                        info!(
1291                                            "[Peer {}] Hash {} not in store",
1292                                            peer_short, hash_short
1293                                        );
1294                                        None
1295                                    }
1296                                    Err(e) => {
1297                                        warn!("[Peer {}] Store error: {}", peer_short, e);
1298                                        None
1299                                    }
1300                                }
1301                            } else {
1302                                warn!(
1303                                    "[Peer {}] No store configured - cannot serve requests",
1304                                    peer_short
1305                                );
1306                                None
1307                            };
1308
1309                            // Send response only if we have data
1310                            if let Some(data) = data {
1311                                let data_len = data.len();
1312                                if let (Some(cashu_quotes), Some((quote_id, settlement))) =
1313                                    (cashu_quotes.as_ref(), quoted_settlement)
1314                                {
1315                                    match cashu_quotes
1316                                        .prepare_quoted_transfer(
1317                                            &peer_id.to_string(),
1318                                            &req.h,
1319                                            quote_id,
1320                                            &settlement,
1321                                            data,
1322                                        )
1323                                        .await
1324                                    {
1325                                        Some((first_chunk, first_expected)) => {
1326                                            if send_quoted_chunk(
1327                                                &dc,
1328                                                &peer_id,
1329                                                &peer_short,
1330                                                cashu_quotes,
1331                                                first_chunk,
1332                                                first_expected,
1333                                            )
1334                                            .await
1335                                            {
1336                                                info!(
1337                                                    "[Peer {}] Started quoted chunked response for {} ({} bytes)",
1338                                                    peer_short, hash_short, data_len
1339                                                );
1340                                            }
1341                                        }
1342                                        None => {
1343                                            warn!(
1344                                                "[Peer {}] Failed to prepare quoted transfer for {}",
1345                                                peer_short, hash_short
1346                                            );
1347                                        }
1348                                    }
1349                                } else {
1350                                    let response = DataResponse { h: req.h, d: data };
1351                                    if let Ok(wire) = encode_response(&response) {
1352                                        if let Err(e) = dc.send(&Bytes::from(wire)).await {
1353                                            error!(
1354                                                "[Peer {}] Failed to send response: {}",
1355                                                peer_short, e
1356                                            );
1357                                        } else {
1358                                            info!(
1359                                                "[Peer {}] Sent response for {} ({} bytes)",
1360                                                peer_short, hash_short, data_len
1361                                            );
1362                                        }
1363                                    }
1364                                }
1365                            } else {
1366                                info!("[Peer {}] Content not found for {}", peer_short, hash_short);
1367                            }
1368                        }
1369                        DataMessage::Response(res) => {
1370                            let hash_hex = hash_to_hex(&res.h);
1371                            let hash_short = &hash_hex[..8.min(hash_hex.len())];
1372                            debug!(
1373                                "[Peer {}] Received response for {} ({} bytes)",
1374                                peer_short,
1375                                hash_short,
1376                                res.d.len()
1377                            );
1378
1379                            // Resolve the pending request by hash
1380                            let mut pending = pending_requests.lock().await;
1381                            if let Some(req) = pending.remove(&hash_hex) {
1382                                let _ = req.response_tx.send(Some(res.d));
1383                            }
1384                        }
1385                        DataMessage::QuoteRequest(req) => {
1386                            let response = handle_quote_request_message(
1387                                &peer_short,
1388                                &peer_id,
1389                                &store,
1390                                cashu_quotes.as_ref(),
1391                                &req,
1392                            )
1393                            .await;
1394                            if let Some(response) = response {
1395                                if let Ok(wire) = encode_quote_response(&response) {
1396                                    if let Err(e) = dc.send(&Bytes::from(wire)).await {
1397                                        warn!(
1398                                            "[Peer {}] Failed to send quote response: {}",
1399                                            peer_short, e
1400                                        );
1401                                    }
1402                                }
1403                            }
1404                        }
1405                        DataMessage::QuoteResponse(res) => {
1406                            if let Some(cashu_quotes) = cashu_quotes.as_ref() {
1407                                let _ = cashu_quotes
1408                                    .handle_quote_response(&peer_id.to_string(), res)
1409                                    .await;
1410                            }
1411                        }
1412                        DataMessage::Chunk(chunk) => {
1413                            process_chunk_message(
1414                                &peer_short,
1415                                &peer_id,
1416                                &dc,
1417                                &pending_requests,
1418                                cashu_quotes.as_ref(),
1419                                chunk,
1420                            )
1421                            .await;
1422                        }
1423                        DataMessage::Payment(req) => {
1424                            let outcome =
1425                                handle_payment_message(&peer_id, cashu_quotes.as_ref(), &req).await;
1426                            if let Ok(wire) = encode_payment_ack(&outcome.ack) {
1427                                if let Err(e) = dc.send(&Bytes::from(wire)).await {
1428                                    warn!(
1429                                        "[Peer {}] Failed to send payment ack: {}",
1430                                        peer_short, e
1431                                    );
1432                                }
1433                            }
1434                            if let (Some(cashu_quotes), Some((next_chunk, next_expected))) =
1435                                (cashu_quotes.as_ref(), outcome.next_chunk)
1436                            {
1437                                let _ = send_quoted_chunk(
1438                                    &dc,
1439                                    &peer_id,
1440                                    &peer_short,
1441                                    cashu_quotes,
1442                                    next_chunk,
1443                                    next_expected,
1444                                )
1445                                .await;
1446                            }
1447                        }
1448                        DataMessage::PaymentAck(res) => {
1449                            handle_payment_ack_message(
1450                                &peer_short,
1451                                &peer_id,
1452                                &dc,
1453                                &pending_requests,
1454                                cashu_quotes.as_ref(),
1455                                res,
1456                            )
1457                            .await;
1458                        }
1459                    },
1460                    Err(e) => {
1461                        warn!("[Peer {}] Failed to parse message: {:?}", peer_short, e);
1462                        // Log hex dump of first 50 bytes for debugging
1463                        let hex_dump: String = msg_data
1464                            .iter()
1465                            .take(50)
1466                            .map(|b| format!("{:02x}", b))
1467                            .collect();
1468                        warn!("[Peer {}] Message hex: {}", peer_short, hex_dump);
1469                    }
1470                }
1471            })
1472        }));
1473    }
1474
1475    /// Check if data channel is ready
1476    pub fn has_data_channel(&self) -> bool {
1477        // Use try_lock for non-async context
1478        self.data_channel
1479            .try_lock()
1480            .map(|guard| {
1481                guard
1482                    .as_ref()
1483                    .map(|dc| dc.ready_state() == RTCDataChannelState::Open)
1484                    .unwrap_or(false)
1485            })
1486            .unwrap_or(false)
1487    }
1488
1489    /// Request content by hash from this peer
1490    pub async fn request(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
1491        self.request_with_timeout(hash_hex, std::time::Duration::from_secs(10))
1492            .await
1493    }
1494
1495    /// Request content by hash from this peer with an explicit timeout.
1496    pub async fn request_with_timeout(
1497        &self,
1498        hash_hex: &str,
1499        timeout: std::time::Duration,
1500    ) -> Result<Option<Vec<u8>>> {
1501        let dc_guard = self.data_channel.lock().await;
1502        let dc = dc_guard
1503            .as_ref()
1504            .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1505            .clone();
1506        drop(dc_guard); // Release lock before async operations
1507
1508        // Convert hex to binary hash
1509        let hash = hex::decode(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hex hash: {}", e))?;
1510
1511        // Create response channel
1512        let (tx, rx) = oneshot::channel();
1513
1514        // Store pending request (keyed by hash hex)
1515        {
1516            let mut pending = self.pending_requests.lock().await;
1517            pending.insert(
1518                hash_hex.to_string(),
1519                PendingRequest::standard(hash.clone(), tx),
1520            );
1521        }
1522
1523        // Send request with blob-request default HTL (fresh request from us)
1524        let req = DataRequest {
1525            h: hash,
1526            htl: BLOB_REQUEST_POLICY.max_htl,
1527            q: None,
1528        };
1529        let wire = encode_request(&req)?;
1530        dc.send(&Bytes::from(wire)).await?;
1531
1532        debug!(
1533            "[Peer {}] Sent request for {}",
1534            self.peer_id.short(),
1535            &hash_hex[..8.min(hash_hex.len())]
1536        );
1537
1538        // Wait for response with timeout
1539        match tokio::time::timeout(timeout, rx).await {
1540            Ok(Ok(data)) => Ok(data),
1541            Ok(Err(_)) => {
1542                // Channel closed
1543                Ok(None)
1544            }
1545            Err(_) => {
1546                // Timeout - clean up pending request
1547                let mut pending = self.pending_requests.lock().await;
1548                pending.remove(hash_hex);
1549                Ok(None)
1550            }
1551        }
1552    }
1553
1554    /// Query a peer's embedded Nostr relay over the WebRTC data channel.
1555    /// Returns all events received before EOSE/timeout.
1556    pub async fn query_nostr_events(
1557        &self,
1558        filters: Vec<NostrFilter>,
1559        timeout: std::time::Duration,
1560    ) -> Result<Vec<nostr::Event>> {
1561        let dc_guard = self.data_channel.lock().await;
1562        let dc = dc_guard
1563            .as_ref()
1564            .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1565            .clone();
1566        drop(dc_guard);
1567
1568        let subscription_id = NostrSubscriptionId::generate();
1569        let subscription_key = subscription_id.to_string();
1570        let (tx, mut rx) = mpsc::unbounded_channel::<NostrRelayMessage>();
1571
1572        {
1573            let mut pending = self.pending_nostr_queries.lock().await;
1574            pending.insert(subscription_key.clone(), tx);
1575        }
1576
1577        let req = NostrClientMessage::req(subscription_id.clone(), filters);
1578        if let Err(e) = dc.send_text(req.as_json()).await {
1579            let mut pending = self.pending_nostr_queries.lock().await;
1580            pending.remove(&subscription_key);
1581            return Err(e.into());
1582        }
1583        debug!(
1584            "[Peer {}] Sent Nostr REQ subscription {}",
1585            self.peer_id.short(),
1586            subscription_id
1587        );
1588
1589        let mut events = Vec::new();
1590        let deadline = tokio::time::Instant::now() + timeout;
1591
1592        loop {
1593            let now = tokio::time::Instant::now();
1594            if now >= deadline {
1595                break;
1596            }
1597            let remaining = deadline - now;
1598
1599            let next = tokio::time::timeout(remaining, rx.recv()).await;
1600            match next {
1601                Ok(Some(NostrRelayMessage::Event {
1602                    subscription_id: sid,
1603                    event,
1604                })) if sid == subscription_id => {
1605                    debug!(
1606                        "[Peer {}] Received Nostr EVENT for subscription {}",
1607                        self.peer_id.short(),
1608                        subscription_id
1609                    );
1610                    events.push(*event);
1611                }
1612                Ok(Some(NostrRelayMessage::EndOfStoredEvents(sid))) if sid == subscription_id => {
1613                    debug!(
1614                        "[Peer {}] Received Nostr EOSE for subscription {}",
1615                        self.peer_id.short(),
1616                        subscription_id
1617                    );
1618                    break;
1619                }
1620                Ok(Some(NostrRelayMessage::Closed {
1621                    subscription_id: sid,
1622                    message,
1623                })) if sid == subscription_id => {
1624                    warn!(
1625                        "[Peer {}] Nostr query closed for subscription {}: {}",
1626                        self.peer_id.short(),
1627                        subscription_id,
1628                        message
1629                    );
1630                    break;
1631                }
1632                Ok(Some(_)) => {}
1633                Ok(None) => break,
1634                Err(_) => {
1635                    warn!(
1636                        "[Peer {}] Nostr query timed out for subscription {}",
1637                        self.peer_id.short(),
1638                        subscription_id
1639                    );
1640                    break;
1641                }
1642            }
1643        }
1644
1645        let close = NostrClientMessage::close(subscription_id.clone());
1646        let _ = dc.send_text(close.as_json()).await;
1647
1648        let mut pending = self.pending_nostr_queries.lock().await;
1649        pending.remove(&subscription_key);
1650        debug!(
1651            "[Peer {}] Nostr query subscription {} collected {} event(s)",
1652            self.peer_id.short(),
1653            subscription_id,
1654            events.len()
1655        );
1656
1657        Ok(events)
1658    }
1659
1660    /// Send a mesh signaling frame as text over the data channel.
1661    pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
1662        let dc_guard = self.data_channel.lock().await;
1663        let dc = dc_guard
1664            .as_ref()
1665            .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1666            .clone();
1667        drop(dc_guard);
1668
1669        let text = serde_json::to_string(frame)?;
1670        dc.send_text(text).await?;
1671        Ok(())
1672    }
1673
1674    /// Send a message over the data channel
1675    pub async fn send_message(&self, msg: &DataMessage) -> Result<()> {
1676        let dc_guard = self.data_channel.lock().await;
1677        if let Some(ref dc) = *dc_guard {
1678            let wire = encode_message(msg)?;
1679            dc.send(&Bytes::from(wire)).await?;
1680        }
1681        Ok(())
1682    }
1683
1684    /// Close the connection
1685    pub async fn close(&self) -> Result<()> {
1686        {
1687            let dc_guard = self.data_channel.lock().await;
1688            if let Some(ref dc) = *dc_guard {
1689                dc.close().await?;
1690            }
1691        }
1692        self.pc.close().await?;
1693        Ok(())
1694    }
1695}
1696
1697fn relay_subscription_id(msg: &NostrRelayMessage) -> Option<String> {
1698    match msg {
1699        NostrRelayMessage::Event {
1700            subscription_id, ..
1701        } => Some(subscription_id.to_string()),
1702        NostrRelayMessage::EndOfStoredEvents(subscription_id) => Some(subscription_id.to_string()),
1703        NostrRelayMessage::Closed {
1704            subscription_id, ..
1705        } => Some(subscription_id.to_string()),
1706        NostrRelayMessage::Count {
1707            subscription_id, ..
1708        } => Some(subscription_id.to_string()),
1709        _ => None,
1710    }
1711}