Skip to main content

hashtree_cli/webrtc/
peer.rs

1//! WebRTC peer connection for hashtree data exchange
2
3use anyhow::Result;
4use bytes::Bytes;
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::{mpsc, oneshot, Mutex, Notify};
8use tracing::{debug, error, info, warn};
9use webrtc::api::interceptor_registry::register_default_interceptors;
10use webrtc::api::media_engine::MediaEngine;
11use webrtc::api::setting_engine::SettingEngine;
12use webrtc::api::APIBuilder;
13use webrtc::data_channel::data_channel_init::RTCDataChannelInit;
14use webrtc::data_channel::data_channel_message::DataChannelMessage;
15use webrtc::data_channel::data_channel_state::RTCDataChannelState;
16use webrtc::data_channel::RTCDataChannel;
17use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
18use webrtc::ice_transport::ice_server::RTCIceServer;
19use webrtc::interceptor::registry::Registry;
20use webrtc::peer_connection::configuration::RTCConfiguration;
21use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
22use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
23use webrtc::peer_connection::RTCPeerConnection;
24
25use super::cashu::{CashuQuoteState, ExpectedSettlement};
26use super::types::{
27    encode_chunk, encode_message, encode_payment, encode_payment_ack, encode_quote_response,
28    encode_request, encode_response, hash_to_hex, parse_message, validate_mesh_frame, DataChunk,
29    DataMessage, DataPayment, DataPaymentAck, DataQuoteRequest, DataRequest, DataResponse,
30    MeshNostrFrame, PeerDirection, PeerHTLConfig, PeerId, PeerStateEvent, SignalingMessage,
31    BLOB_REQUEST_POLICY,
32};
33use crate::nostr_relay::NostrRelay;
34use nostr::{
35    ClientMessage as NostrClientMessage, Filter as NostrFilter, JsonUtil as NostrJsonUtil,
36    RelayMessage as NostrRelayMessage, SubscriptionId as NostrSubscriptionId,
37};
38
39/// Trait for content storage that can be used by WebRTC peers
40pub trait ContentStore: Send + Sync + 'static {
41    /// Get content by hex hash
42    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>>;
43}
44
45/// Pending request tracking (keyed by hash hex)
46pub struct PendingRequest {
47    pub hash: Vec<u8>,
48    pub response_tx: oneshot::Sender<Option<Vec<u8>>>,
49    pub quoted: Option<PendingQuotedRequest>,
50}
51
52pub struct PendingQuotedRequest {
53    pub quote_id: u64,
54    pub mint_url: String,
55    pub total_payment_sat: u64,
56    pub confirmed_payment_sat: u64,
57    pub next_chunk_index: u32,
58    pub total_chunks: Option<u32>,
59    pub assembled_data: Vec<u8>,
60    pub in_flight_payment: Option<PendingChunkPayment>,
61    pub buffered_chunk: Option<DataChunk>,
62}
63
64pub struct PendingChunkPayment {
65    pub chunk_index: u32,
66    pub amount_sat: u64,
67    pub mint_url: String,
68    pub operation_id: String,
69    pub final_chunk: bool,
70}
71
72impl PendingRequest {
73    pub fn standard(hash: Vec<u8>, response_tx: oneshot::Sender<Option<Vec<u8>>>) -> Self {
74        Self {
75            hash,
76            response_tx,
77            quoted: None,
78        }
79    }
80
81    pub fn quoted(
82        hash: Vec<u8>,
83        response_tx: oneshot::Sender<Option<Vec<u8>>>,
84        quote_id: u64,
85        mint_url: String,
86        total_payment_sat: u64,
87    ) -> Self {
88        Self {
89            hash,
90            response_tx,
91            quoted: Some(PendingQuotedRequest {
92                quote_id,
93                mint_url,
94                total_payment_sat,
95                confirmed_payment_sat: 0,
96                next_chunk_index: 0,
97                total_chunks: None,
98                assembled_data: Vec::new(),
99                in_flight_payment: None,
100                buffered_chunk: None,
101            }),
102        }
103    }
104}
105
106async fn handle_quote_request_message(
107    peer_short: &str,
108    peer_id: &PeerId,
109    store: &Option<Arc<dyn ContentStore>>,
110    cashu_quotes: Option<&Arc<CashuQuoteState>>,
111    req: &DataQuoteRequest,
112) -> Option<super::types::DataQuoteResponse> {
113    let Some(cashu_quotes) = cashu_quotes else {
114        debug!(
115            "[Peer {}] Ignoring quote request without Cashu policy",
116            peer_short
117        );
118        return None;
119    };
120
121    if cashu_quotes
122        .should_refuse_requests_from_peer(&peer_id.to_string())
123        .await
124    {
125        return Some(
126            cashu_quotes
127                .build_quote_response(&peer_id.to_string(), req, false)
128                .await,
129        );
130    }
131
132    let hash_hex = hash_to_hex(&req.h);
133    let can_serve = if let Some(store) = store {
134        match store.get(&hash_hex) {
135            Ok(Some(_)) => true,
136            Ok(None) => false,
137            Err(e) => {
138                warn!("[Peer {}] Store error during quote: {}", peer_short, e);
139                false
140            }
141        }
142    } else {
143        false
144    };
145
146    Some(
147        cashu_quotes
148            .build_quote_response(&peer_id.to_string(), req, can_serve)
149            .await,
150    )
151}
152
153struct PaymentHandlingOutcome {
154    ack: DataPaymentAck,
155    next_chunk: Option<(DataChunk, ExpectedSettlement)>,
156}
157
158async fn send_quoted_chunk(
159    dc: &Arc<RTCDataChannel>,
160    peer_id: &PeerId,
161    peer_short: &str,
162    cashu_quotes: &Arc<CashuQuoteState>,
163    chunk: DataChunk,
164    expected: ExpectedSettlement,
165) -> bool {
166    let hash_hex = hash_to_hex(&chunk.h);
167    let wire = match encode_chunk(&chunk) {
168        Ok(wire) => wire,
169        Err(err) => {
170            warn!(
171                "[Peer {}] Failed to encode quoted chunk {} for quote {}: {}",
172                peer_short, chunk.c, chunk.q, err
173            );
174            return false;
175        }
176    };
177
178    if let Err(err) = dc.send(&Bytes::from(wire)).await {
179        warn!(
180            "[Peer {}] Failed to send quoted chunk {} for quote {}: {}",
181            peer_short, chunk.c, chunk.q, err
182        );
183        return false;
184    }
185
186    cashu_quotes
187        .register_expected_payment(peer_id.to_string(), hash_hex, chunk.q, expected)
188        .await;
189    true
190}
191
192async fn fail_pending_request(
193    pending_requests: &Arc<Mutex<HashMap<String, PendingRequest>>>,
194    cashu_quotes: Option<&Arc<CashuQuoteState>>,
195    hash_hex: &str,
196) {
197    let pending = pending_requests.lock().await.remove(hash_hex);
198    let Some(pending) = pending else {
199        return;
200    };
201
202    if let (Some(cashu_quotes), Some(quoted)) = (cashu_quotes, pending.quoted) {
203        if let Some(in_flight) = quoted.in_flight_payment {
204            let _ = cashu_quotes
205                .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
206                .await;
207        }
208    }
209    let _ = pending.response_tx.send(None);
210}
211
212async fn process_chunk_message(
213    peer_short: &str,
214    _peer_id: &PeerId,
215    dc: &Arc<RTCDataChannel>,
216    pending_requests: &Arc<Mutex<HashMap<String, PendingRequest>>>,
217    cashu_quotes: Option<&Arc<CashuQuoteState>>,
218    chunk: DataChunk,
219) {
220    let hash_hex = hash_to_hex(&chunk.h);
221    let Some(cashu_quotes) = cashu_quotes else {
222        fail_pending_request(pending_requests, None, &hash_hex).await;
223        return;
224    };
225
226    enum ChunkAction {
227        BufferOnly,
228        Fail,
229        Pay {
230            mint_url: String,
231            amount_sat: u64,
232            final_chunk: bool,
233        },
234    }
235
236    let action = {
237        let mut pending = pending_requests.lock().await;
238        let Some(request) = pending.get_mut(&hash_hex) else {
239            return;
240        };
241        match request.quoted.as_mut() {
242            None => ChunkAction::Fail,
243            Some(quoted) if quoted.quote_id != chunk.q || chunk.n == 0 => ChunkAction::Fail,
244            Some(quoted) => {
245                if let Some(in_flight) = quoted.in_flight_payment.as_ref() {
246                    let expected_buffer_index = in_flight.chunk_index + 1;
247                    if chunk.c == expected_buffer_index && quoted.buffered_chunk.is_none() {
248                        quoted.buffered_chunk = Some(chunk.clone());
249                        ChunkAction::BufferOnly
250                    } else {
251                        ChunkAction::Fail
252                    }
253                } else if chunk.c != quoted.next_chunk_index {
254                    ChunkAction::Fail
255                } else if let Some(total_chunks) = quoted.total_chunks {
256                    if total_chunks != chunk.n {
257                        ChunkAction::Fail
258                    } else {
259                        let next_total = quoted.confirmed_payment_sat.saturating_add(chunk.p);
260                        if next_total > quoted.total_payment_sat
261                            || (chunk.c + 1 == chunk.n && next_total != quoted.total_payment_sat)
262                        {
263                            ChunkAction::Fail
264                        } else {
265                            quoted.total_chunks = Some(chunk.n);
266                            quoted.assembled_data.extend_from_slice(&chunk.d);
267                            quoted.next_chunk_index += 1;
268                            ChunkAction::Pay {
269                                mint_url: quoted.mint_url.clone(),
270                                amount_sat: chunk.p,
271                                final_chunk: chunk.c + 1 == chunk.n,
272                            }
273                        }
274                    }
275                } else {
276                    let next_total = quoted.confirmed_payment_sat.saturating_add(chunk.p);
277                    if next_total > quoted.total_payment_sat
278                        || (chunk.c + 1 == chunk.n && next_total != quoted.total_payment_sat)
279                    {
280                        ChunkAction::Fail
281                    } else {
282                        quoted.total_chunks = Some(chunk.n);
283                        quoted.assembled_data.extend_from_slice(&chunk.d);
284                        quoted.next_chunk_index += 1;
285                        ChunkAction::Pay {
286                            mint_url: quoted.mint_url.clone(),
287                            amount_sat: chunk.p,
288                            final_chunk: chunk.c + 1 == chunk.n,
289                        }
290                    }
291                }
292            }
293        }
294    };
295
296    match action {
297        ChunkAction::BufferOnly => (),
298        ChunkAction::Fail => {
299            warn!(
300                "[Peer {}] Invalid quoted chunk {} for hash {}",
301                peer_short, chunk.c, hash_hex
302            );
303            fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
304        }
305        ChunkAction::Pay {
306            mint_url,
307            amount_sat,
308            final_chunk,
309        } => {
310            let payment = match cashu_quotes
311                .create_payment_token(&mint_url, amount_sat)
312                .await
313            {
314                Ok(payment) => payment,
315                Err(err) => {
316                    warn!(
317                        "[Peer {}] Failed to create payment token for chunk {} of {}: {}",
318                        peer_short, chunk.c, hash_hex, err
319                    );
320                    fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
321                    return;
322                }
323            };
324
325            {
326                let mut pending = pending_requests.lock().await;
327                let Some(request) = pending.get_mut(&hash_hex) else {
328                    let _ = cashu_quotes
329                        .revoke_payment_token(&payment.mint_url, &payment.operation_id)
330                        .await;
331                    return;
332                };
333                let Some(quoted) = request.quoted.as_mut() else {
334                    let _ = cashu_quotes
335                        .revoke_payment_token(&payment.mint_url, &payment.operation_id)
336                        .await;
337                    return;
338                };
339                quoted.in_flight_payment = Some(PendingChunkPayment {
340                    chunk_index: chunk.c,
341                    amount_sat,
342                    mint_url: payment.mint_url.clone(),
343                    operation_id: payment.operation_id.clone(),
344                    final_chunk,
345                });
346            }
347
348            let payment_msg = DataPayment {
349                h: chunk.h,
350                q: chunk.q,
351                c: chunk.c,
352                p: amount_sat,
353                m: Some(payment.mint_url.clone()),
354                tok: payment.token,
355            };
356            let wire = match encode_payment(&payment_msg) {
357                Ok(wire) => wire,
358                Err(err) => {
359                    warn!(
360                        "[Peer {}] Failed to encode payment for chunk {} of {}: {}",
361                        peer_short, chunk.c, hash_hex, err
362                    );
363                    fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
364                    return;
365                }
366            };
367            if let Err(err) = dc.send(&Bytes::from(wire)).await {
368                warn!(
369                    "[Peer {}] Failed to send payment for chunk {} of {}: {}",
370                    peer_short, chunk.c, hash_hex, err
371                );
372                fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
373            }
374        }
375    }
376}
377
378async fn handle_payment_ack_message(
379    peer_short: &str,
380    peer_id: &PeerId,
381    dc: &Arc<RTCDataChannel>,
382    pending_requests: &Arc<Mutex<HashMap<String, PendingRequest>>>,
383    cashu_quotes: Option<&Arc<CashuQuoteState>>,
384    ack: DataPaymentAck,
385) {
386    let Some(cashu_quotes) = cashu_quotes else {
387        return;
388    };
389    let hash_hex = hash_to_hex(&ack.h);
390    let mut buffered_next = None;
391    let mut completed = None;
392    let mut failed = None;
393    let mut confirmed_amount = None;
394    let mut completed_data = None;
395
396    {
397        let mut pending = pending_requests.lock().await;
398        let Some(request) = pending.get_mut(&hash_hex) else {
399            return;
400        };
401        let Some(quoted) = request.quoted.as_mut() else {
402            return;
403        };
404        let Some(in_flight) = quoted.in_flight_payment.take() else {
405            return;
406        };
407        if ack.q != quoted.quote_id || ack.c != in_flight.chunk_index {
408            quoted.in_flight_payment = Some(in_flight);
409            return;
410        }
411
412        if !ack.a {
413            failed = Some(in_flight);
414        } else {
415            quoted.confirmed_payment_sat = quoted
416                .confirmed_payment_sat
417                .saturating_add(in_flight.amount_sat);
418            confirmed_amount = Some(in_flight.amount_sat);
419            if in_flight.final_chunk {
420                completed_data = Some(quoted.assembled_data.clone());
421            } else if let Some(next_chunk) = quoted.buffered_chunk.take() {
422                buffered_next = Some(next_chunk);
423            }
424        }
425
426        if let Some(data) = completed_data.take() {
427            let finished = pending
428                .remove(&hash_hex)
429                .expect("pending request must exist");
430            completed = Some((finished.response_tx, data));
431        }
432    }
433
434    if let Some(amount_sat) = confirmed_amount {
435        cashu_quotes
436            .record_paid_peer(&peer_id.to_string(), amount_sat)
437            .await;
438    }
439
440    if let Some(in_flight) = failed {
441        warn!(
442            "[Peer {}] Payment ack rejected chunk {} for {}: {}",
443            peer_short,
444            ack.c,
445            hash_hex,
446            ack.e.as_deref().unwrap_or("payment rejected")
447        );
448        let _ = cashu_quotes
449            .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
450            .await;
451        let removed = pending_requests.lock().await.remove(&hash_hex);
452        if let Some(removed) = removed {
453            let _ = removed.response_tx.send(None);
454        }
455        return;
456    }
457
458    if let Some((tx, data)) = completed {
459        let _ = tx.send(Some(data));
460        return;
461    }
462
463    if let Some(next_chunk) = buffered_next {
464        process_chunk_message(
465            peer_short,
466            peer_id,
467            dc,
468            pending_requests,
469            Some(cashu_quotes),
470            next_chunk,
471        )
472        .await;
473    }
474}
475
476async fn handle_payment_message(
477    peer_id: &PeerId,
478    cashu_quotes: Option<&Arc<CashuQuoteState>>,
479    req: &DataPayment,
480) -> PaymentHandlingOutcome {
481    let nack = |err: String| PaymentHandlingOutcome {
482        ack: DataPaymentAck {
483            h: req.h.clone(),
484            q: req.q,
485            c: req.c,
486            a: false,
487            e: Some(err),
488        },
489        next_chunk: None,
490    };
491
492    let Some(cashu_quotes) = cashu_quotes else {
493        return nack("Cashu settlement unavailable".to_string());
494    };
495
496    let expected = match cashu_quotes
497        .claim_expected_payment(
498            &peer_id.to_string(),
499            &req.h,
500            req.q,
501            req.c,
502            req.p,
503            req.m.as_deref(),
504        )
505        .await
506    {
507        Ok(expected) => expected,
508        Err(err) => {
509            cashu_quotes
510                .record_payment_default_from_peer(&peer_id.to_string())
511                .await;
512            return nack(err.to_string());
513        }
514    };
515
516    match cashu_quotes.receive_payment_token(&req.tok).await {
517        Ok(received) if received.amount_sat >= expected.payment_sat => {
518            if expected.mint_url.as_deref() != Some(received.mint_url.as_str()) {
519                cashu_quotes
520                    .record_payment_default_from_peer(&peer_id.to_string())
521                    .await;
522                return nack("Received payment mint did not match quoted mint".to_string());
523            }
524            if let Err(err) = cashu_quotes
525                .record_receipt_from_peer(
526                    &peer_id.to_string(),
527                    &received.mint_url,
528                    received.amount_sat,
529                )
530                .await
531            {
532                warn!(
533                    "[Peer {}] Failed to persist Cashu mint success for {}: {}",
534                    peer_id.short(),
535                    received.mint_url,
536                    err
537                );
538            }
539
540            let next_chunk = if expected.final_chunk {
541                None
542            } else {
543                cashu_quotes
544                    .next_outgoing_chunk(&peer_id.to_string(), &req.h, req.q)
545                    .await
546            };
547
548            PaymentHandlingOutcome {
549                ack: DataPaymentAck {
550                    h: req.h.clone(),
551                    q: req.q,
552                    c: req.c,
553                    a: true,
554                    e: None,
555                },
556                next_chunk,
557            }
558        }
559        Ok(_) => {
560            cashu_quotes
561                .record_payment_default_from_peer(&peer_id.to_string())
562                .await;
563            nack("Received payment amount was below the quoted amount".to_string())
564        }
565        Err(err) => {
566            if let Some(mint_url) = expected.mint_url.as_deref().or(req.m.as_deref()) {
567                let _ = cashu_quotes.record_mint_receive_failure(mint_url).await;
568            }
569            nack(err.to_string())
570        }
571    }
572}
573
574/// WebRTC peer connection with data channel protocol
575pub struct Peer {
576    pub peer_id: PeerId,
577    pub direction: PeerDirection,
578    pub created_at: std::time::Instant,
579    pub connected_at: Option<std::time::Instant>,
580
581    pc: Arc<RTCPeerConnection>,
582    /// Data channel - can be set from callback when receiving channel from peer
583    pub data_channel: Arc<Mutex<Option<Arc<RTCDataChannel>>>>,
584    signaling_tx: mpsc::Sender<SignalingMessage>,
585    my_peer_id: PeerId,
586
587    // Content store for serving requests
588    store: Option<Arc<dyn ContentStore>>,
589
590    // Track pending outgoing requests (keyed by hash hex)
591    pub pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
592    // Track pending Nostr relay queries over data channel (keyed by subscription id)
593    pending_nostr_queries: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>>,
594
595    // Channel for incoming data messages
596    #[allow(dead_code)]
597    message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
598    #[allow(dead_code)]
599    message_rx: Option<mpsc::Receiver<(DataMessage, Option<Vec<u8>>)>>,
600
601    // Optional channel to notify signaling layer of state changes
602    state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
603
604    // Optional Nostr relay for text messages over data channel
605    nostr_relay: Option<Arc<NostrRelay>>,
606    // Optional channel for inbound relayless signaling mesh frames
607    mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
608    // Optional Cashu quote negotiation state shared with signaling.
609    cashu_quotes: Option<Arc<CashuQuoteState>>,
610    // Per-peer HTL randomness profile (reused across traffic classes)
611    htl_config: PeerHTLConfig,
612}
613
614impl Peer {
615    /// Create a new peer connection
616    pub async fn new(
617        peer_id: PeerId,
618        direction: PeerDirection,
619        my_peer_id: PeerId,
620        signaling_tx: mpsc::Sender<SignalingMessage>,
621        stun_servers: Vec<String>,
622    ) -> Result<Self> {
623        Self::new_with_store_and_events(
624            peer_id,
625            direction,
626            my_peer_id,
627            signaling_tx,
628            stun_servers,
629            None,
630            None,
631            None,
632            None,
633            None,
634        )
635        .await
636    }
637
638    /// Create a new peer connection with content store
639    pub async fn new_with_store(
640        peer_id: PeerId,
641        direction: PeerDirection,
642        my_peer_id: PeerId,
643        signaling_tx: mpsc::Sender<SignalingMessage>,
644        stun_servers: Vec<String>,
645        store: Option<Arc<dyn ContentStore>>,
646    ) -> Result<Self> {
647        Self::new_with_store_and_events(
648            peer_id,
649            direction,
650            my_peer_id,
651            signaling_tx,
652            stun_servers,
653            store,
654            None,
655            None,
656            None,
657            None,
658        )
659        .await
660    }
661
662    /// Create a new peer connection with content store and state event channel
663    #[allow(clippy::too_many_arguments)]
664    pub(crate) async fn new_with_store_and_events(
665        peer_id: PeerId,
666        direction: PeerDirection,
667        my_peer_id: PeerId,
668        signaling_tx: mpsc::Sender<SignalingMessage>,
669        stun_servers: Vec<String>,
670        store: Option<Arc<dyn ContentStore>>,
671        state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
672        nostr_relay: Option<Arc<NostrRelay>>,
673        mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
674        cashu_quotes: Option<Arc<CashuQuoteState>>,
675    ) -> Result<Self> {
676        // Create WebRTC API
677        let mut m = MediaEngine::default();
678        m.register_default_codecs()?;
679
680        let mut registry = Registry::new();
681        registry = register_default_interceptors(registry, &mut m)?;
682
683        // Enable mDNS temporarily for debugging
684        // Previously disabled due to https://github.com/webrtc-rs/webrtc/issues/616
685        let setting_engine = SettingEngine::default();
686        // Note: mDNS enabled by default
687
688        let api = APIBuilder::new()
689            .with_media_engine(m)
690            .with_interceptor_registry(registry)
691            .with_setting_engine(setting_engine)
692            .build();
693
694        // Configure ICE servers
695        let ice_servers: Vec<RTCIceServer> = stun_servers
696            .iter()
697            .map(|url| RTCIceServer {
698                urls: vec![url.clone()],
699                ..Default::default()
700            })
701            .collect();
702
703        let config = RTCConfiguration {
704            ice_servers,
705            ..Default::default()
706        };
707
708        let pc = Arc::new(api.new_peer_connection(config).await?);
709        let (message_tx, message_rx) = mpsc::channel(100);
710        Ok(Self {
711            peer_id,
712            direction,
713            created_at: std::time::Instant::now(),
714            connected_at: None,
715            pc,
716            data_channel: Arc::new(Mutex::new(None)),
717            signaling_tx,
718            my_peer_id,
719            store,
720            pending_requests: Arc::new(Mutex::new(HashMap::new())),
721            pending_nostr_queries: Arc::new(Mutex::new(HashMap::new())),
722            message_tx,
723            message_rx: Some(message_rx),
724            state_event_tx,
725            nostr_relay,
726            mesh_frame_tx,
727            cashu_quotes,
728            htl_config: PeerHTLConfig::random(),
729        })
730    }
731
732    /// Set content store
733    pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
734        self.store = Some(store);
735    }
736
737    /// Get connection state
738    pub fn state(&self) -> RTCPeerConnectionState {
739        self.pc.connection_state()
740    }
741
742    /// Get signaling state
743    pub fn signaling_state(&self) -> webrtc::peer_connection::signaling_state::RTCSignalingState {
744        self.pc.signaling_state()
745    }
746
747    /// Check if connected
748    pub fn is_connected(&self) -> bool {
749        self.pc.connection_state() == RTCPeerConnectionState::Connected
750    }
751
752    pub fn htl_config(&self) -> &PeerHTLConfig {
753        &self.htl_config
754    }
755
756    /// Setup event handlers for the peer connection
757    pub async fn setup_handlers(&self) -> Result<()> {
758        let peer_id = self.peer_id.clone();
759        let signaling_tx = self.signaling_tx.clone();
760        let my_peer_id_str = self.my_peer_id.to_string();
761        let recipient = self.peer_id.to_string();
762
763        // Handle ICE candidates - work MUST be inside the returned future
764        self.pc
765            .on_ice_candidate(Box::new(move |candidate: Option<RTCIceCandidate>| {
766                let signaling_tx = signaling_tx.clone();
767                let my_peer_id_str = my_peer_id_str.clone();
768                let recipient = recipient.clone();
769
770                Box::pin(async move {
771                    if let Some(c) = candidate {
772                        if let Ok(init) = c.to_json() {
773                            info!(
774                                "ICE candidate generated: {}",
775                                &init.candidate[..init.candidate.len().min(60)]
776                            );
777                            let msg = SignalingMessage::candidate(
778                                serde_json::to_value(&init).unwrap_or_default(),
779                                &recipient,
780                                &my_peer_id_str,
781                            );
782                            if let Err(e) = signaling_tx.send(msg).await {
783                                error!("Failed to send ICE candidate: {}", e);
784                            }
785                        }
786                    }
787                })
788            }));
789
790        // Handle connection state changes - work MUST be inside the returned future
791        let peer_id_log = peer_id.clone();
792        let state_event_tx = self.state_event_tx.clone();
793        self.pc
794            .on_peer_connection_state_change(Box::new(move |state: RTCPeerConnectionState| {
795                let peer_id = peer_id_log.clone();
796                let state_event_tx = state_event_tx.clone();
797                Box::pin(async move {
798                    info!("Peer {} connection state: {:?}", peer_id.short(), state);
799
800                    // Notify signaling layer of state changes
801                    if let Some(tx) = state_event_tx {
802                        let event = match state {
803                            RTCPeerConnectionState::Connected => {
804                                Some(PeerStateEvent::Connected(peer_id))
805                            }
806                            RTCPeerConnectionState::Failed => Some(PeerStateEvent::Failed(peer_id)),
807                            RTCPeerConnectionState::Disconnected
808                            | RTCPeerConnectionState::Closed => {
809                                Some(PeerStateEvent::Disconnected(peer_id))
810                            }
811                            _ => None,
812                        };
813                        if let Some(event) = event {
814                            if let Err(e) = tx.send(event).await {
815                                error!("Failed to send peer state event: {}", e);
816                            }
817                        }
818                    }
819                })
820            }));
821
822        Ok(())
823    }
824
825    /// Initiate connection (create offer) - for outbound connections
826    pub async fn connect(&self) -> Result<serde_json::Value> {
827        println!("[Peer {}] Creating data channel...", self.peer_id.short());
828        // Create data channel first
829        // Use unordered for better performance - protocol is stateless (each message self-describes)
830        let dc_init = RTCDataChannelInit {
831            ordered: Some(false),
832            ..Default::default()
833        };
834        let dc = self
835            .pc
836            .create_data_channel("hashtree", Some(dc_init))
837            .await?;
838        println!(
839            "[Peer {}] Data channel created, setting up handlers...",
840            self.peer_id.short()
841        );
842        self.setup_data_channel(dc.clone()).await?;
843        println!(
844            "[Peer {}] Handlers set up, storing data channel...",
845            self.peer_id.short()
846        );
847        {
848            let mut dc_guard = self.data_channel.lock().await;
849            *dc_guard = Some(dc);
850        }
851        println!("[Peer {}] Data channel stored", self.peer_id.short());
852
853        // Create offer and wait for ICE gathering to complete
854        // This ensures all ICE candidates are embedded in the SDP
855        let offer = self.pc.create_offer(None).await?;
856        let mut gathering_complete = self.pc.gathering_complete_promise().await;
857        self.pc.set_local_description(offer).await?;
858
859        // Wait for ICE gathering to complete (with timeout)
860        let _ = tokio::time::timeout(
861            std::time::Duration::from_secs(10),
862            gathering_complete.recv(),
863        )
864        .await;
865
866        // Get the local description with ICE candidates embedded
867        let local_desc = self
868            .pc
869            .local_description()
870            .await
871            .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
872
873        debug!(
874            "Offer created, SDP len: {}, ice_gathering: {:?}",
875            local_desc.sdp.len(),
876            self.pc.ice_gathering_state()
877        );
878
879        // Return offer as JSON
880        let offer_json = serde_json::json!({
881            "type": local_desc.sdp_type.to_string().to_lowercase(),
882            "sdp": local_desc.sdp
883        });
884
885        Ok(offer_json)
886    }
887
888    /// Handle incoming offer and create answer
889    pub async fn handle_offer(&self, offer: serde_json::Value) -> Result<serde_json::Value> {
890        let sdp = offer
891            .get("sdp")
892            .and_then(|s| s.as_str())
893            .ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
894
895        // Setup data channel handler BEFORE set_remote_description
896        // This ensures the handler is registered before any data channel events fire
897        let peer_id = self.peer_id.clone();
898        let message_tx = self.message_tx.clone();
899        let pending_requests = self.pending_requests.clone();
900        let pending_nostr_queries = self.pending_nostr_queries.clone();
901        let store = self.store.clone();
902        let data_channel_holder = self.data_channel.clone();
903        let nostr_relay = self.nostr_relay.clone();
904        let mesh_frame_tx = self.mesh_frame_tx.clone();
905        let cashu_quotes = self.cashu_quotes.clone();
906        let peer_pubkey = Some(self.peer_id.pubkey.clone());
907
908        self.pc
909            .on_data_channel(Box::new(move |dc: Arc<RTCDataChannel>| {
910                let peer_id = peer_id.clone();
911                let message_tx = message_tx.clone();
912                let pending_requests = pending_requests.clone();
913                let pending_nostr_queries = pending_nostr_queries.clone();
914                let store = store.clone();
915                let data_channel_holder = data_channel_holder.clone();
916                let nostr_relay = nostr_relay.clone();
917                let mesh_frame_tx = mesh_frame_tx.clone();
918                let cashu_quotes = cashu_quotes.clone();
919                let peer_pubkey = peer_pubkey.clone();
920
921                // Work MUST be inside the returned future
922                Box::pin(async move {
923                    info!(
924                        "Peer {} received data channel: {}",
925                        peer_id.short(),
926                        dc.label()
927                    );
928
929                    // Store the received data channel
930                    {
931                        let mut dc_guard = data_channel_holder.lock().await;
932                        *dc_guard = Some(dc.clone());
933                    }
934
935                    // Set up message handlers
936                    Self::setup_dc_handlers(
937                        dc.clone(),
938                        peer_id,
939                        message_tx,
940                        pending_requests,
941                        pending_nostr_queries.clone(),
942                        store,
943                        nostr_relay,
944                        mesh_frame_tx,
945                        cashu_quotes,
946                        peer_pubkey,
947                    )
948                    .await;
949                })
950            }));
951
952        // Set remote description after handler is registered
953        let offer_desc = RTCSessionDescription::offer(sdp.to_string())?;
954        self.pc.set_remote_description(offer_desc).await?;
955
956        // Create answer and wait for ICE gathering to complete
957        // This ensures all ICE candidates are embedded in the SDP
958        let answer = self.pc.create_answer(None).await?;
959        let mut gathering_complete = self.pc.gathering_complete_promise().await;
960        self.pc.set_local_description(answer).await?;
961
962        // Wait for ICE gathering to complete (with timeout)
963        let _ = tokio::time::timeout(
964            std::time::Duration::from_secs(10),
965            gathering_complete.recv(),
966        )
967        .await;
968
969        // Get the local description with ICE candidates embedded
970        let local_desc = self
971            .pc
972            .local_description()
973            .await
974            .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
975
976        debug!(
977            "Answer created, SDP len: {}, ice_gathering: {:?}",
978            local_desc.sdp.len(),
979            self.pc.ice_gathering_state()
980        );
981
982        let answer_json = serde_json::json!({
983            "type": local_desc.sdp_type.to_string().to_lowercase(),
984            "sdp": local_desc.sdp
985        });
986
987        Ok(answer_json)
988    }
989
990    /// Handle incoming answer
991    pub async fn handle_answer(&self, answer: serde_json::Value) -> Result<()> {
992        let sdp = answer
993            .get("sdp")
994            .and_then(|s| s.as_str())
995            .ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
996
997        let answer_desc = RTCSessionDescription::answer(sdp.to_string())?;
998        self.pc.set_remote_description(answer_desc).await?;
999
1000        Ok(())
1001    }
1002
1003    /// Handle incoming ICE candidate
1004    pub async fn handle_candidate(&self, candidate: serde_json::Value) -> Result<()> {
1005        let candidate_str = candidate
1006            .get("candidate")
1007            .and_then(|c| c.as_str())
1008            .unwrap_or("");
1009
1010        let sdp_mid = candidate
1011            .get("sdpMid")
1012            .and_then(|m| m.as_str())
1013            .map(|s| s.to_string());
1014
1015        let sdp_mline_index = candidate
1016            .get("sdpMLineIndex")
1017            .and_then(|i| i.as_u64())
1018            .map(|i| i as u16);
1019
1020        if !candidate_str.is_empty() {
1021            use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
1022            let init = RTCIceCandidateInit {
1023                candidate: candidate_str.to_string(),
1024                sdp_mid,
1025                sdp_mline_index,
1026                username_fragment: candidate
1027                    .get("usernameFragment")
1028                    .and_then(|u| u.as_str())
1029                    .map(|s| s.to_string()),
1030            };
1031            self.pc.add_ice_candidate(init).await?;
1032        }
1033
1034        Ok(())
1035    }
1036
1037    /// Setup data channel handlers
1038    async fn setup_data_channel(&self, dc: Arc<RTCDataChannel>) -> Result<()> {
1039        let peer_id = self.peer_id.clone();
1040        let message_tx = self.message_tx.clone();
1041        let pending_requests = self.pending_requests.clone();
1042        let store = self.store.clone();
1043        let nostr_relay = self.nostr_relay.clone();
1044        let mesh_frame_tx = self.mesh_frame_tx.clone();
1045        let cashu_quotes = self.cashu_quotes.clone();
1046        let peer_pubkey = Some(self.peer_id.pubkey.clone());
1047
1048        Self::setup_dc_handlers(
1049            dc,
1050            peer_id,
1051            message_tx,
1052            pending_requests,
1053            self.pending_nostr_queries.clone(),
1054            store,
1055            nostr_relay,
1056            mesh_frame_tx,
1057            cashu_quotes,
1058            peer_pubkey,
1059        )
1060        .await;
1061        Ok(())
1062    }
1063
1064    /// Setup handlers for a data channel (shared between outbound and inbound)
1065    #[allow(clippy::too_many_arguments)]
1066    async fn setup_dc_handlers(
1067        dc: Arc<RTCDataChannel>,
1068        peer_id: PeerId,
1069        message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
1070        pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
1071        pending_nostr_queries: Arc<
1072            Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>,
1073        >,
1074        store: Option<Arc<dyn ContentStore>>,
1075        nostr_relay: Option<Arc<NostrRelay>>,
1076        mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
1077        cashu_quotes: Option<Arc<CashuQuoteState>>,
1078        peer_pubkey: Option<String>,
1079    ) {
1080        let label = dc.label().to_string();
1081        let peer_short = peer_id.short();
1082
1083        // Track pending binary data (request_id -> expected after response)
1084        let _pending_binary: Arc<Mutex<Option<u32>>> = Arc::new(Mutex::new(None));
1085
1086        let open_notify = nostr_relay.as_ref().map(|_| Arc::new(Notify::new()));
1087        if let Some(ref notify) = open_notify {
1088            if dc.ready_state() == RTCDataChannelState::Open {
1089                // `notify_one` stores a permit if no waiter is active yet.
1090                notify.notify_one();
1091            }
1092        }
1093
1094        let mut nostr_client_id: Option<u64> = None;
1095        if let Some(relay) = nostr_relay.clone() {
1096            let client_id = relay.next_client_id();
1097            let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
1098            relay
1099                .register_client(client_id, nostr_tx, peer_pubkey.clone())
1100                .await;
1101            nostr_client_id = Some(client_id);
1102
1103            if let Some(notify) = open_notify.clone() {
1104                let dc_for_send = dc.clone();
1105                tokio::spawn(async move {
1106                    notify.notified().await;
1107                    while let Some(text) = nostr_rx.recv().await {
1108                        if dc_for_send.send_text(text).await.is_err() {
1109                            break;
1110                        }
1111                    }
1112                });
1113            }
1114        }
1115
1116        if let (Some(relay), Some(client_id)) = (nostr_relay.clone(), nostr_client_id) {
1117            dc.on_close(Box::new(move || {
1118                let relay = relay.clone();
1119                Box::pin(async move {
1120                    relay.unregister_client(client_id).await;
1121                })
1122            }));
1123        }
1124
1125        let open_notify_clone = open_notify.clone();
1126        let peer_short_open = peer_short.clone();
1127        let label_clone = label.clone();
1128        dc.on_open(Box::new(move || {
1129            let peer_short_open = peer_short_open.clone();
1130            let label_clone = label_clone.clone();
1131            let open_notify = open_notify_clone.clone();
1132            // Work MUST be inside the returned future
1133            Box::pin(async move {
1134                info!(
1135                    "[Peer {}] Data channel '{}' open",
1136                    peer_short_open, label_clone
1137                );
1138                if let Some(notify) = open_notify {
1139                    notify.notify_one();
1140                }
1141            })
1142        }));
1143
1144        let dc_for_msg = dc.clone();
1145        let peer_short_msg = peer_short.clone();
1146        let _pending_binary_clone = _pending_binary.clone();
1147        let store_clone = store.clone();
1148        let nostr_relay_for_msg = nostr_relay.clone();
1149        let nostr_client_id_for_msg = nostr_client_id;
1150        let pending_nostr_queries_for_msg = pending_nostr_queries.clone();
1151        let mesh_frame_tx_for_msg = mesh_frame_tx.clone();
1152        let peer_id_for_msg = peer_id.clone();
1153
1154        dc.on_message(Box::new(move |msg: DataChannelMessage| {
1155            let dc = dc_for_msg.clone();
1156            let peer_short = peer_short_msg.clone();
1157            let pending_requests = pending_requests.clone();
1158            let _pending_binary = _pending_binary_clone.clone();
1159            let _message_tx = message_tx.clone();
1160            let store = store_clone.clone();
1161            let nostr_relay = nostr_relay_for_msg.clone();
1162            let nostr_client_id = nostr_client_id_for_msg;
1163            let pending_nostr_queries = pending_nostr_queries_for_msg.clone();
1164            let mesh_frame_tx = mesh_frame_tx_for_msg.clone();
1165            let cashu_quotes = cashu_quotes.clone();
1166            let peer_id = peer_id_for_msg.clone();
1167            let msg_data = msg.data.clone();
1168
1169            // Work MUST be inside the returned future
1170            Box::pin(async move {
1171                if msg.is_string {
1172                    if let Ok(text) = std::str::from_utf8(&msg_data) {
1173                        if let Ok(mesh_frame) = serde_json::from_str::<MeshNostrFrame>(text) {
1174                            match validate_mesh_frame(&mesh_frame) {
1175                                Ok(()) => {
1176                                    if let Some(tx) = mesh_frame_tx {
1177                                        let _ = tx.send((peer_id.clone(), mesh_frame)).await;
1178                                    }
1179                                    return;
1180                                }
1181                                Err(reason) => {
1182                                    debug!(
1183                                        "[Peer {}] Ignoring invalid mesh frame: {}",
1184                                        peer_short, reason
1185                                    );
1186                                }
1187                            }
1188                        }
1189
1190                        // First, route relay responses to pending local queries.
1191                        if let Ok(relay_msg) = NostrRelayMessage::from_json(text) {
1192                            if let Some(sub_id) = relay_subscription_id(&relay_msg) {
1193                                let sender = {
1194                                    let pending = pending_nostr_queries.lock().await;
1195                                    pending.get(&sub_id).cloned()
1196                                };
1197                                if let Some(tx) = sender {
1198                                    debug!(
1199                                        "[Peer {}] Routed Nostr relay message for subscription {}",
1200                                        peer_short, sub_id
1201                                    );
1202                                    let _ = tx.send(relay_msg);
1203                                    return;
1204                                } else {
1205                                    debug!(
1206                                        "[Peer {}] Dropping Nostr relay message for unknown subscription {}",
1207                                        peer_short, sub_id
1208                                    );
1209                                }
1210                            }
1211                        }
1212
1213                        // Otherwise treat it as a client message to be handled by local relay.
1214                        if let Some(relay) = nostr_relay {
1215                            if let Ok(nostr_msg) = NostrClientMessage::from_json(text) {
1216                                if let Some(client_id) = nostr_client_id {
1217                                    relay.handle_client_message(client_id, nostr_msg).await;
1218                                }
1219                            }
1220                        }
1221                    }
1222                    return;
1223                }
1224                // All messages are binary with type prefix + MessagePack body
1225                debug!(
1226                    "[Peer {}] Received {} bytes on data channel",
1227                    peer_short,
1228                    msg_data.len()
1229                );
1230                match parse_message(&msg_data) {
1231                    Ok(data_msg) => match data_msg {
1232                        DataMessage::Request(req) => {
1233                            let hash_hex = hash_to_hex(&req.h);
1234                            let hash_short = &hash_hex[..8.min(hash_hex.len())];
1235                            info!("[Peer {}] Received request for {}", peer_short, hash_short);
1236
1237                            if let Some(cashu_quotes) = cashu_quotes.as_ref() {
1238                                if cashu_quotes
1239                                    .should_refuse_requests_from_peer(&peer_id.to_string())
1240                                    .await
1241                                {
1242                                    info!(
1243                                        "[Peer {}] Refusing request from peer with unpaid defaults",
1244                                        peer_short
1245                                    );
1246                                    return;
1247                                }
1248                            }
1249
1250                            let quoted_settlement = if let Some(quote_id) = req.q {
1251                                let Some(cashu_quotes) = cashu_quotes.as_ref() else {
1252                                    info!(
1253                                        "[Peer {}] Ignoring quoted request without Cashu settlement state",
1254                                        peer_short
1255                                    );
1256                                    return;
1257                                };
1258                                match cashu_quotes
1259                                    .take_valid_quote(&peer_id.to_string(), &req.h, quote_id)
1260                                    .await
1261                                {
1262                                    Some(settlement) => Some((quote_id, settlement)),
1263                                    None => {
1264                                        info!(
1265                                            "[Peer {}] Ignoring request with invalid or expired quote {}",
1266                                            peer_short, quote_id
1267                                        );
1268                                        return;
1269                                    }
1270                                }
1271                            } else {
1272                                None
1273                            };
1274
1275                            // Handle request - look up in store
1276                            let data = if let Some(ref store) = store {
1277                                match store.get(&hash_hex) {
1278                                    Ok(Some(data)) => {
1279                                        info!(
1280                                            "[Peer {}] Found {} in store ({} bytes)",
1281                                            peer_short,
1282                                            hash_short,
1283                                            data.len()
1284                                        );
1285                                        Some(data)
1286                                    }
1287                                    Ok(None) => {
1288                                        info!(
1289                                            "[Peer {}] Hash {} not in store",
1290                                            peer_short, hash_short
1291                                        );
1292                                        None
1293                                    }
1294                                    Err(e) => {
1295                                        warn!("[Peer {}] Store error: {}", peer_short, e);
1296                                        None
1297                                    }
1298                                }
1299                            } else {
1300                                warn!(
1301                                    "[Peer {}] No store configured - cannot serve requests",
1302                                    peer_short
1303                                );
1304                                None
1305                            };
1306
1307                            // Send response only if we have data
1308                            if let Some(data) = data {
1309                                let data_len = data.len();
1310                                if let (Some(cashu_quotes), Some((quote_id, settlement))) =
1311                                    (cashu_quotes.as_ref(), quoted_settlement)
1312                                {
1313                                    match cashu_quotes
1314                                        .prepare_quoted_transfer(
1315                                            &peer_id.to_string(),
1316                                            &req.h,
1317                                            quote_id,
1318                                            &settlement,
1319                                            data,
1320                                        )
1321                                        .await
1322                                    {
1323                                        Some((first_chunk, first_expected)) => {
1324                                            if send_quoted_chunk(
1325                                                &dc,
1326                                                &peer_id,
1327                                                &peer_short,
1328                                                cashu_quotes,
1329                                                first_chunk,
1330                                                first_expected,
1331                                            )
1332                                            .await
1333                                            {
1334                                                info!(
1335                                                    "[Peer {}] Started quoted chunked response for {} ({} bytes)",
1336                                                    peer_short, hash_short, data_len
1337                                                );
1338                                            }
1339                                        }
1340                                        None => {
1341                                            warn!(
1342                                                "[Peer {}] Failed to prepare quoted transfer for {}",
1343                                                peer_short, hash_short
1344                                            );
1345                                        }
1346                                    }
1347                                } else {
1348                                    let response = DataResponse { h: req.h, d: data };
1349                                    if let Ok(wire) = encode_response(&response) {
1350                                        if let Err(e) = dc.send(&Bytes::from(wire)).await {
1351                                            error!(
1352                                                "[Peer {}] Failed to send response: {}",
1353                                                peer_short, e
1354                                            );
1355                                        } else {
1356                                            info!(
1357                                                "[Peer {}] Sent response for {} ({} bytes)",
1358                                                peer_short, hash_short, data_len
1359                                            );
1360                                        }
1361                                    }
1362                                }
1363                            } else {
1364                                info!("[Peer {}] Content not found for {}", peer_short, hash_short);
1365                            }
1366                        }
1367                        DataMessage::Response(res) => {
1368                            let hash_hex = hash_to_hex(&res.h);
1369                            let hash_short = &hash_hex[..8.min(hash_hex.len())];
1370                            debug!(
1371                                "[Peer {}] Received response for {} ({} bytes)",
1372                                peer_short,
1373                                hash_short,
1374                                res.d.len()
1375                            );
1376
1377                            // Resolve the pending request by hash
1378                            let mut pending = pending_requests.lock().await;
1379                            if let Some(req) = pending.remove(&hash_hex) {
1380                                let _ = req.response_tx.send(Some(res.d));
1381                            }
1382                        }
1383                        DataMessage::QuoteRequest(req) => {
1384                            let response = handle_quote_request_message(
1385                                &peer_short,
1386                                &peer_id,
1387                                &store,
1388                                cashu_quotes.as_ref(),
1389                                &req,
1390                            )
1391                            .await;
1392                            if let Some(response) = response {
1393                                if let Ok(wire) = encode_quote_response(&response) {
1394                                    if let Err(e) = dc.send(&Bytes::from(wire)).await {
1395                                        warn!(
1396                                            "[Peer {}] Failed to send quote response: {}",
1397                                            peer_short, e
1398                                        );
1399                                    }
1400                                }
1401                            }
1402                        }
1403                        DataMessage::QuoteResponse(res) => {
1404                            if let Some(cashu_quotes) = cashu_quotes.as_ref() {
1405                                let _ = cashu_quotes
1406                                    .handle_quote_response(&peer_id.to_string(), res)
1407                                    .await;
1408                            }
1409                        }
1410                        DataMessage::Chunk(chunk) => {
1411                            process_chunk_message(
1412                                &peer_short,
1413                                &peer_id,
1414                                &dc,
1415                                &pending_requests,
1416                                cashu_quotes.as_ref(),
1417                                chunk,
1418                            )
1419                            .await;
1420                        }
1421                        DataMessage::Payment(req) => {
1422                            let outcome =
1423                                handle_payment_message(&peer_id, cashu_quotes.as_ref(), &req).await;
1424                            if let Ok(wire) = encode_payment_ack(&outcome.ack) {
1425                                if let Err(e) = dc.send(&Bytes::from(wire)).await {
1426                                    warn!(
1427                                        "[Peer {}] Failed to send payment ack: {}",
1428                                        peer_short, e
1429                                    );
1430                                }
1431                            }
1432                            if let (Some(cashu_quotes), Some((next_chunk, next_expected))) =
1433                                (cashu_quotes.as_ref(), outcome.next_chunk)
1434                            {
1435                                let _ = send_quoted_chunk(
1436                                    &dc,
1437                                    &peer_id,
1438                                    &peer_short,
1439                                    cashu_quotes,
1440                                    next_chunk,
1441                                    next_expected,
1442                                )
1443                                .await;
1444                            }
1445                        }
1446                        DataMessage::PaymentAck(res) => {
1447                            handle_payment_ack_message(
1448                                &peer_short,
1449                                &peer_id,
1450                                &dc,
1451                                &pending_requests,
1452                                cashu_quotes.as_ref(),
1453                                res,
1454                            )
1455                            .await;
1456                        }
1457                    },
1458                    Err(e) => {
1459                        warn!("[Peer {}] Failed to parse message: {:?}", peer_short, e);
1460                        // Log hex dump of first 50 bytes for debugging
1461                        let hex_dump: String = msg_data
1462                            .iter()
1463                            .take(50)
1464                            .map(|b| format!("{:02x}", b))
1465                            .collect();
1466                        warn!("[Peer {}] Message hex: {}", peer_short, hex_dump);
1467                    }
1468                }
1469            })
1470        }));
1471    }
1472
1473    /// Check if data channel is ready
1474    pub fn has_data_channel(&self) -> bool {
1475        // Use try_lock for non-async context
1476        self.data_channel
1477            .try_lock()
1478            .map(|guard| {
1479                guard
1480                    .as_ref()
1481                    .map(|dc| dc.ready_state() == RTCDataChannelState::Open)
1482                    .unwrap_or(false)
1483            })
1484            .unwrap_or(false)
1485    }
1486
1487    /// Request content by hash from this peer
1488    pub async fn request(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
1489        self.request_with_timeout(hash_hex, std::time::Duration::from_secs(10))
1490            .await
1491    }
1492
1493    /// Request content by hash from this peer with an explicit timeout.
1494    pub async fn request_with_timeout(
1495        &self,
1496        hash_hex: &str,
1497        timeout: std::time::Duration,
1498    ) -> Result<Option<Vec<u8>>> {
1499        let dc_guard = self.data_channel.lock().await;
1500        let dc = dc_guard
1501            .as_ref()
1502            .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1503            .clone();
1504        drop(dc_guard); // Release lock before async operations
1505
1506        // Convert hex to binary hash
1507        let hash = hex::decode(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hex hash: {}", e))?;
1508
1509        // Create response channel
1510        let (tx, rx) = oneshot::channel();
1511
1512        // Store pending request (keyed by hash hex)
1513        {
1514            let mut pending = self.pending_requests.lock().await;
1515            pending.insert(
1516                hash_hex.to_string(),
1517                PendingRequest::standard(hash.clone(), tx),
1518            );
1519        }
1520
1521        // Send request with blob-request default HTL (fresh request from us)
1522        let req = DataRequest {
1523            h: hash,
1524            htl: BLOB_REQUEST_POLICY.max_htl,
1525            q: None,
1526        };
1527        let wire = encode_request(&req)?;
1528        dc.send(&Bytes::from(wire)).await?;
1529
1530        debug!(
1531            "[Peer {}] Sent request for {}",
1532            self.peer_id.short(),
1533            &hash_hex[..8.min(hash_hex.len())]
1534        );
1535
1536        // Wait for response with timeout
1537        match tokio::time::timeout(timeout, rx).await {
1538            Ok(Ok(data)) => Ok(data),
1539            Ok(Err(_)) => {
1540                // Channel closed
1541                Ok(None)
1542            }
1543            Err(_) => {
1544                // Timeout - clean up pending request
1545                let mut pending = self.pending_requests.lock().await;
1546                pending.remove(hash_hex);
1547                Ok(None)
1548            }
1549        }
1550    }
1551
1552    /// Query a peer's embedded Nostr relay over the WebRTC data channel.
1553    /// Returns all events received before EOSE/timeout.
1554    pub async fn query_nostr_events(
1555        &self,
1556        filters: Vec<NostrFilter>,
1557        timeout: std::time::Duration,
1558    ) -> Result<Vec<nostr::Event>> {
1559        let dc_guard = self.data_channel.lock().await;
1560        let dc = dc_guard
1561            .as_ref()
1562            .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1563            .clone();
1564        drop(dc_guard);
1565
1566        let subscription_id = NostrSubscriptionId::generate();
1567        let subscription_key = subscription_id.to_string();
1568        let (tx, mut rx) = mpsc::unbounded_channel::<NostrRelayMessage>();
1569
1570        {
1571            let mut pending = self.pending_nostr_queries.lock().await;
1572            pending.insert(subscription_key.clone(), tx);
1573        }
1574
1575        let req = NostrClientMessage::req(subscription_id.clone(), filters);
1576        if let Err(e) = dc.send_text(req.as_json()).await {
1577            let mut pending = self.pending_nostr_queries.lock().await;
1578            pending.remove(&subscription_key);
1579            return Err(e.into());
1580        }
1581        debug!(
1582            "[Peer {}] Sent Nostr REQ subscription {}",
1583            self.peer_id.short(),
1584            subscription_id
1585        );
1586
1587        let mut events = Vec::new();
1588        let deadline = tokio::time::Instant::now() + timeout;
1589
1590        loop {
1591            let now = tokio::time::Instant::now();
1592            if now >= deadline {
1593                break;
1594            }
1595            let remaining = deadline - now;
1596
1597            let next = tokio::time::timeout(remaining, rx.recv()).await;
1598            match next {
1599                Ok(Some(NostrRelayMessage::Event {
1600                    subscription_id: sid,
1601                    event,
1602                })) if sid == subscription_id => {
1603                    debug!(
1604                        "[Peer {}] Received Nostr EVENT for subscription {}",
1605                        self.peer_id.short(),
1606                        subscription_id
1607                    );
1608                    events.push(*event);
1609                }
1610                Ok(Some(NostrRelayMessage::EndOfStoredEvents(sid))) if sid == subscription_id => {
1611                    debug!(
1612                        "[Peer {}] Received Nostr EOSE for subscription {}",
1613                        self.peer_id.short(),
1614                        subscription_id
1615                    );
1616                    break;
1617                }
1618                Ok(Some(NostrRelayMessage::Closed {
1619                    subscription_id: sid,
1620                    message,
1621                })) if sid == subscription_id => {
1622                    warn!(
1623                        "[Peer {}] Nostr query closed for subscription {}: {}",
1624                        self.peer_id.short(),
1625                        subscription_id,
1626                        message
1627                    );
1628                    break;
1629                }
1630                Ok(Some(_)) => {}
1631                Ok(None) => break,
1632                Err(_) => {
1633                    warn!(
1634                        "[Peer {}] Nostr query timed out for subscription {}",
1635                        self.peer_id.short(),
1636                        subscription_id
1637                    );
1638                    break;
1639                }
1640            }
1641        }
1642
1643        let close = NostrClientMessage::close(subscription_id.clone());
1644        let _ = dc.send_text(close.as_json()).await;
1645
1646        let mut pending = self.pending_nostr_queries.lock().await;
1647        pending.remove(&subscription_key);
1648        debug!(
1649            "[Peer {}] Nostr query subscription {} collected {} event(s)",
1650            self.peer_id.short(),
1651            subscription_id,
1652            events.len()
1653        );
1654
1655        Ok(events)
1656    }
1657
1658    /// Send a mesh signaling frame as text over the data channel.
1659    pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
1660        let dc_guard = self.data_channel.lock().await;
1661        let dc = dc_guard
1662            .as_ref()
1663            .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1664            .clone();
1665        drop(dc_guard);
1666
1667        let text = serde_json::to_string(frame)?;
1668        dc.send_text(text).await?;
1669        Ok(())
1670    }
1671
1672    /// Send a message over the data channel
1673    pub async fn send_message(&self, msg: &DataMessage) -> Result<()> {
1674        let dc_guard = self.data_channel.lock().await;
1675        if let Some(ref dc) = *dc_guard {
1676            let wire = encode_message(msg)?;
1677            dc.send(&Bytes::from(wire)).await?;
1678        }
1679        Ok(())
1680    }
1681
1682    /// Close the connection
1683    pub async fn close(&self) -> Result<()> {
1684        {
1685            let dc_guard = self.data_channel.lock().await;
1686            if let Some(ref dc) = *dc_guard {
1687                dc.close().await?;
1688            }
1689        }
1690        self.pc.close().await?;
1691        Ok(())
1692    }
1693}
1694
1695fn relay_subscription_id(msg: &NostrRelayMessage) -> Option<String> {
1696    match msg {
1697        NostrRelayMessage::Event {
1698            subscription_id, ..
1699        } => Some(subscription_id.to_string()),
1700        NostrRelayMessage::EndOfStoredEvents(subscription_id) => Some(subscription_id.to_string()),
1701        NostrRelayMessage::Closed {
1702            subscription_id, ..
1703        } => Some(subscription_id.to_string()),
1704        NostrRelayMessage::Count {
1705            subscription_id, ..
1706        } => Some(subscription_id.to_string()),
1707        _ => None,
1708    }
1709}