Skip to main content

hashtree_network/
peer.rs

1//! WebRTC peer connection for hashtree data exchange
2
3use anyhow::Result;
4use async_trait::async_trait;
5use bytes::Bytes;
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::{mpsc, oneshot, Mutex, Notify};
9use tracing::{debug, error, info, warn};
10use webrtc::api::interceptor_registry::register_default_interceptors;
11use webrtc::api::media_engine::MediaEngine;
12use webrtc::api::setting_engine::SettingEngine;
13use webrtc::api::APIBuilder;
14use webrtc::data_channel::data_channel_init::RTCDataChannelInit;
15use webrtc::data_channel::data_channel_message::DataChannelMessage;
16use webrtc::data_channel::data_channel_state::RTCDataChannelState;
17use webrtc::data_channel::RTCDataChannel;
18use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
19use webrtc::ice_transport::ice_server::RTCIceServer;
20use webrtc::interceptor::registry::Registry;
21use webrtc::peer_connection::configuration::RTCConfiguration;
22use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
23use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
24use webrtc::peer_connection::RTCPeerConnection;
25
26use crate::cashu::CashuQuoteState;
27use crate::relay_bridge::SharedMeshRelayClient;
28use crate::runtime_control::PeerStateEvent;
29use crate::runtime_peer::PeerDirection;
30use crate::transport::{PeerLink as RoutedPeerLink, TransportError as RoutedTransportError};
31use crate::types::{
32    validate_mesh_frame, MeshNostrFrame, PeerHTLConfig, PeerId, SignalingMessage,
33    BLOB_REQUEST_POLICY, DATA_CHANNEL_LABEL,
34};
35use crate::{
36    encode_payment_ack, encode_peer_hints, encode_quote_response, encode_request, encode_response,
37    hash_to_key, parse_message, DataChunk, DataMessage, DataRequest, DataResponse, PeerHints,
38};
39use nostr_sdk::nostr::{
40    ClientMessage as NostrClientMessage, Event, Filter as NostrFilter, JsonUtil as NostrJsonUtil,
41    RelayMessage as NostrRelayMessage, SubscriptionId as NostrSubscriptionId,
42};
43
44mod payments;
45
46use payments::{
47    handle_payment_ack_message, handle_payment_message, handle_quote_request_message,
48    process_chunk_message, send_quoted_chunk,
49};
50
51/// Trait for content storage that can be used by WebRTC peers
52pub trait ContentStore: Send + Sync + 'static {
53    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>>;
54}
55
56/// Pending request tracking (keyed by hash hex)
57pub struct PendingRequest {
58    pub hash: Vec<u8>,
59    pub response_tx: oneshot::Sender<Option<Vec<u8>>>,
60    pub quoted: Option<PendingQuotedRequest>,
61}
62
63pub struct PendingQuotedRequest {
64    pub quote_id: u64,
65    pub mint_url: String,
66    pub total_payment_sat: u64,
67    pub confirmed_payment_sat: u64,
68    pub next_chunk_index: u32,
69    pub total_chunks: Option<u32>,
70    pub assembled_data: Vec<u8>,
71    pub in_flight_payment: Option<PendingChunkPayment>,
72    pub buffered_chunk: Option<DataChunk>,
73}
74
75pub struct PendingChunkPayment {
76    pub chunk_index: u32,
77    pub amount_sat: u64,
78    pub mint_url: String,
79    pub operation_id: String,
80    pub final_chunk: bool,
81}
82
83impl PendingRequest {
84    pub fn standard(hash: Vec<u8>, response_tx: oneshot::Sender<Option<Vec<u8>>>) -> Self {
85        Self {
86            hash,
87            response_tx,
88            quoted: None,
89        }
90    }
91
92    pub fn quoted(
93        hash: Vec<u8>,
94        response_tx: oneshot::Sender<Option<Vec<u8>>>,
95        quote_id: u64,
96        mint_url: String,
97        total_payment_sat: u64,
98    ) -> Self {
99        Self {
100            hash,
101            response_tx,
102            quoted: Some(PendingQuotedRequest {
103                quote_id,
104                mint_url,
105                total_payment_sat,
106                confirmed_payment_sat: 0,
107                next_chunk_index: 0,
108                total_chunks: None,
109                assembled_data: Vec::new(),
110                in_flight_payment: None,
111                buffered_chunk: None,
112            }),
113        }
114    }
115}
116
117/// WebRTC peer connection with data channel protocol
118pub struct Peer {
119    pub peer_id: PeerId,
120    pub direction: PeerDirection,
121    pub created_at: std::time::Instant,
122    pub connected_at: Option<std::time::Instant>,
123
124    pc: Arc<RTCPeerConnection>,
125    pub data_channel: Arc<Mutex<Option<Arc<RTCDataChannel>>>>,
126    signaling_tx: mpsc::Sender<SignalingMessage>,
127    my_peer_id: PeerId,
128    store: Option<Arc<dyn ContentStore>>,
129    pub pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
130    pending_nostr_queries: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>>,
131    #[allow(dead_code)]
132    message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
133    #[allow(dead_code)]
134    message_rx: Option<mpsc::Receiver<(DataMessage, Option<Vec<u8>>)>>,
135    state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
136    nostr_relay: Option<SharedMeshRelayClient>,
137    mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
138    cashu_quotes: Option<Arc<CashuQuoteState>>,
139    htl_config: PeerHTLConfig,
140    signal_urls: Vec<String>,
141}
142
143impl Peer {
144    pub async fn new(
145        peer_id: PeerId,
146        direction: PeerDirection,
147        my_peer_id: PeerId,
148        signaling_tx: mpsc::Sender<SignalingMessage>,
149        stun_servers: Vec<String>,
150    ) -> Result<Self> {
151        Self::new_with_store_and_events(
152            peer_id,
153            direction,
154            my_peer_id,
155            signaling_tx,
156            stun_servers,
157            None,
158            None,
159            None,
160            None,
161            None,
162        )
163        .await
164    }
165
166    pub async fn new_with_store(
167        peer_id: PeerId,
168        direction: PeerDirection,
169        my_peer_id: PeerId,
170        signaling_tx: mpsc::Sender<SignalingMessage>,
171        stun_servers: Vec<String>,
172        store: Option<Arc<dyn ContentStore>>,
173    ) -> Result<Self> {
174        Self::new_with_store_and_events(
175            peer_id,
176            direction,
177            my_peer_id,
178            signaling_tx,
179            stun_servers,
180            store,
181            None,
182            None,
183            None,
184            None,
185        )
186        .await
187    }
188
189    #[allow(clippy::too_many_arguments)]
190    pub async fn new_with_store_and_events(
191        peer_id: PeerId,
192        direction: PeerDirection,
193        my_peer_id: PeerId,
194        signaling_tx: mpsc::Sender<SignalingMessage>,
195        stun_servers: Vec<String>,
196        store: Option<Arc<dyn ContentStore>>,
197        state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
198        nostr_relay: Option<SharedMeshRelayClient>,
199        mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
200        cashu_quotes: Option<Arc<CashuQuoteState>>,
201    ) -> Result<Self> {
202        let mut m = MediaEngine::default();
203        m.register_default_codecs()?;
204
205        let mut registry = Registry::new();
206        registry = register_default_interceptors(registry, &mut m)?;
207
208        let setting_engine = SettingEngine::default();
209        let api = APIBuilder::new()
210            .with_media_engine(m)
211            .with_interceptor_registry(registry)
212            .with_setting_engine(setting_engine)
213            .build();
214
215        let ice_servers: Vec<RTCIceServer> = stun_servers
216            .iter()
217            .map(|url| RTCIceServer {
218                urls: vec![url.clone()],
219                ..Default::default()
220            })
221            .collect();
222
223        let config = RTCConfiguration {
224            ice_servers,
225            ..Default::default()
226        };
227
228        let pc = Arc::new(api.new_peer_connection(config).await?);
229        let (message_tx, message_rx) = mpsc::channel(100);
230        Ok(Self {
231            peer_id,
232            direction,
233            created_at: std::time::Instant::now(),
234            connected_at: None,
235            pc,
236            data_channel: Arc::new(Mutex::new(None)),
237            signaling_tx,
238            my_peer_id,
239            store,
240            pending_requests: Arc::new(Mutex::new(HashMap::new())),
241            pending_nostr_queries: Arc::new(Mutex::new(HashMap::new())),
242            message_tx,
243            message_rx: Some(message_rx),
244            state_event_tx,
245            nostr_relay,
246            mesh_frame_tx,
247            cashu_quotes,
248            htl_config: PeerHTLConfig::random(),
249            signal_urls: Vec::new(),
250        })
251    }
252
253    pub fn set_signal_urls(&mut self, signal_urls: Vec<String>) {
254        self.signal_urls = signal_urls;
255    }
256
257    pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
258        self.store = Some(store);
259    }
260
261    pub fn state(&self) -> RTCPeerConnectionState {
262        self.pc.connection_state()
263    }
264
265    pub fn signaling_state(&self) -> webrtc::peer_connection::signaling_state::RTCSignalingState {
266        self.pc.signaling_state()
267    }
268
269    pub fn is_connected(&self) -> bool {
270        self.pc.connection_state() == RTCPeerConnectionState::Connected
271    }
272
273    pub fn htl_config(&self) -> &PeerHTLConfig {
274        &self.htl_config
275    }
276
277    pub async fn setup_handlers(&self) -> Result<()> {
278        let peer_id = self.peer_id.clone();
279        let signaling_tx = self.signaling_tx.clone();
280        let my_peer_id_str = self.my_peer_id.to_string();
281        let target_peer_id = self.peer_id.to_string();
282
283        self.pc
284            .on_ice_candidate(Box::new(move |candidate: Option<RTCIceCandidate>| {
285                let signaling_tx = signaling_tx.clone();
286                let my_peer_id_str = my_peer_id_str.clone();
287                let target_peer_id = target_peer_id.clone();
288
289                Box::pin(async move {
290                    if let Some(c) = candidate {
291                        if let Ok(init) = c.to_json() {
292                            info!(
293                                "ICE candidate generated: {}",
294                                &init.candidate[..init.candidate.len().min(60)]
295                            );
296                            let msg = SignalingMessage::Candidate {
297                                peer_id: my_peer_id_str.clone(),
298                                target_peer_id: target_peer_id.clone(),
299                                candidate: init.candidate,
300                                sdp_m_line_index: init.sdp_mline_index,
301                                sdp_mid: init.sdp_mid,
302                            };
303                            if let Err(e) = signaling_tx.send(msg).await {
304                                error!("Failed to send ICE candidate: {}", e);
305                            }
306                        }
307                    }
308                })
309            }));
310
311        let peer_id_log = peer_id.clone();
312        let state_event_tx = self.state_event_tx.clone();
313        self.pc
314            .on_peer_connection_state_change(Box::new(move |state: RTCPeerConnectionState| {
315                let peer_id = peer_id_log.clone();
316                let state_event_tx = state_event_tx.clone();
317                Box::pin(async move {
318                    info!("Peer {} connection state: {:?}", peer_id.short(), state);
319
320                    if let Some(tx) = state_event_tx {
321                        let event = match state {
322                            RTCPeerConnectionState::Connected => {
323                                Some(PeerStateEvent::Connected(peer_id))
324                            }
325                            RTCPeerConnectionState::Failed => Some(PeerStateEvent::Failed(peer_id)),
326                            RTCPeerConnectionState::Disconnected
327                            | RTCPeerConnectionState::Closed => {
328                                Some(PeerStateEvent::Disconnected(peer_id))
329                            }
330                            _ => None,
331                        };
332                        if let Some(event) = event {
333                            if let Err(e) = tx.send(event).await {
334                                error!("Failed to send peer state event: {}", e);
335                            }
336                        }
337                    }
338                })
339            }));
340
341        Ok(())
342    }
343
344    pub async fn connect(&self) -> Result<serde_json::Value> {
345        let dc_init = RTCDataChannelInit {
346            ordered: Some(false),
347            ..Default::default()
348        };
349        let dc = self
350            .pc
351            .create_data_channel(DATA_CHANNEL_LABEL, Some(dc_init))
352            .await?;
353        self.setup_data_channel(dc.clone()).await?;
354        {
355            let mut dc_guard = self.data_channel.lock().await;
356            *dc_guard = Some(dc);
357        }
358
359        let offer = self.pc.create_offer(None).await?;
360        let mut gathering_complete = self.pc.gathering_complete_promise().await;
361        self.pc.set_local_description(offer).await?;
362
363        let _ = tokio::time::timeout(
364            std::time::Duration::from_secs(10),
365            gathering_complete.recv(),
366        )
367        .await;
368
369        let local_desc = self
370            .pc
371            .local_description()
372            .await
373            .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
374
375        debug!(
376            "Offer created, SDP len: {}, ice_gathering: {:?}",
377            local_desc.sdp.len(),
378            self.pc.ice_gathering_state()
379        );
380
381        Ok(serde_json::json!({
382            "type": local_desc.sdp_type.to_string().to_lowercase(),
383            "sdp": local_desc.sdp
384        }))
385    }
386
387    pub async fn handle_offer(&self, offer: serde_json::Value) -> Result<serde_json::Value> {
388        let sdp = offer
389            .get("sdp")
390            .and_then(|s| s.as_str())
391            .ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
392
393        let peer_id = self.peer_id.clone();
394        let message_tx = self.message_tx.clone();
395        let pending_requests = self.pending_requests.clone();
396        let pending_nostr_queries = self.pending_nostr_queries.clone();
397        let store = self.store.clone();
398        let data_channel_holder = self.data_channel.clone();
399        let nostr_relay = self.nostr_relay.clone();
400        let mesh_frame_tx = self.mesh_frame_tx.clone();
401        let cashu_quotes = self.cashu_quotes.clone();
402        let peer_pubkey = Some(self.peer_id.pubkey.clone());
403        let state_event_tx = self.state_event_tx.clone();
404        let signal_urls = self.signal_urls.clone();
405
406        self.pc
407            .on_data_channel(Box::new(move |dc: Arc<RTCDataChannel>| {
408                let peer_id = peer_id.clone();
409                let message_tx = message_tx.clone();
410                let pending_requests = pending_requests.clone();
411                let pending_nostr_queries = pending_nostr_queries.clone();
412                let store = store.clone();
413                let data_channel_holder = data_channel_holder.clone();
414                let nostr_relay = nostr_relay.clone();
415                let mesh_frame_tx = mesh_frame_tx.clone();
416                let cashu_quotes = cashu_quotes.clone();
417                let peer_pubkey = peer_pubkey.clone();
418                let state_event_tx = state_event_tx.clone();
419                let signal_urls = signal_urls.clone();
420
421                Box::pin(async move {
422                    info!(
423                        "Peer {} received data channel: {}",
424                        peer_id.short(),
425                        dc.label()
426                    );
427
428                    {
429                        let mut dc_guard = data_channel_holder.lock().await;
430                        *dc_guard = Some(dc.clone());
431                    }
432
433                    Self::setup_dc_handlers(
434                        dc.clone(),
435                        peer_id,
436                        message_tx,
437                        pending_requests,
438                        pending_nostr_queries.clone(),
439                        store,
440                        nostr_relay,
441                        mesh_frame_tx,
442                        cashu_quotes,
443                        peer_pubkey,
444                        state_event_tx,
445                        signal_urls,
446                    )
447                    .await;
448                })
449            }));
450
451        let offer_desc = RTCSessionDescription::offer(sdp.to_string())?;
452        self.pc.set_remote_description(offer_desc).await?;
453
454        let answer = self.pc.create_answer(None).await?;
455        let mut gathering_complete = self.pc.gathering_complete_promise().await;
456        self.pc.set_local_description(answer).await?;
457
458        let _ = tokio::time::timeout(
459            std::time::Duration::from_secs(10),
460            gathering_complete.recv(),
461        )
462        .await;
463
464        let local_desc = self
465            .pc
466            .local_description()
467            .await
468            .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
469
470        debug!(
471            "Answer created, SDP len: {}, ice_gathering: {:?}",
472            local_desc.sdp.len(),
473            self.pc.ice_gathering_state()
474        );
475
476        Ok(serde_json::json!({
477            "type": local_desc.sdp_type.to_string().to_lowercase(),
478            "sdp": local_desc.sdp
479        }))
480    }
481
482    pub async fn handle_answer(&self, answer: serde_json::Value) -> Result<()> {
483        let sdp = answer
484            .get("sdp")
485            .and_then(|s| s.as_str())
486            .ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
487
488        let answer_desc = RTCSessionDescription::answer(sdp.to_string())?;
489        self.pc.set_remote_description(answer_desc).await?;
490        Ok(())
491    }
492
493    pub async fn handle_candidate(&self, candidate: serde_json::Value) -> Result<()> {
494        let candidate_str = candidate
495            .get("candidate")
496            .and_then(|c| c.as_str())
497            .unwrap_or("");
498
499        let sdp_mid = candidate
500            .get("sdpMid")
501            .and_then(|m| m.as_str())
502            .map(|s| s.to_string());
503
504        let sdp_mline_index = candidate
505            .get("sdpMLineIndex")
506            .and_then(|i| i.as_u64())
507            .map(|i| i as u16);
508
509        if !candidate_str.is_empty() {
510            use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
511            let init = RTCIceCandidateInit {
512                candidate: candidate_str.to_string(),
513                sdp_mid,
514                sdp_mline_index,
515                username_fragment: candidate
516                    .get("usernameFragment")
517                    .and_then(|u| u.as_str())
518                    .map(|s| s.to_string()),
519            };
520            self.pc.add_ice_candidate(init).await?;
521        }
522
523        Ok(())
524    }
525
526    async fn setup_data_channel(&self, dc: Arc<RTCDataChannel>) -> Result<()> {
527        let peer_id = self.peer_id.clone();
528        let message_tx = self.message_tx.clone();
529        let pending_requests = self.pending_requests.clone();
530        let store = self.store.clone();
531        let nostr_relay = self.nostr_relay.clone();
532        let mesh_frame_tx = self.mesh_frame_tx.clone();
533        let cashu_quotes = self.cashu_quotes.clone();
534        let peer_pubkey = Some(self.peer_id.pubkey.clone());
535        let state_event_tx = self.state_event_tx.clone();
536        let signal_urls = self.signal_urls.clone();
537
538        Self::setup_dc_handlers(
539            dc,
540            peer_id,
541            message_tx,
542            pending_requests,
543            self.pending_nostr_queries.clone(),
544            store,
545            nostr_relay,
546            mesh_frame_tx,
547            cashu_quotes,
548            peer_pubkey,
549            state_event_tx,
550            signal_urls,
551        )
552        .await;
553        Ok(())
554    }
555
556    #[allow(clippy::too_many_arguments)]
557    async fn setup_dc_handlers(
558        dc: Arc<RTCDataChannel>,
559        peer_id: PeerId,
560        message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
561        pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
562        pending_nostr_queries: Arc<
563            Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>,
564        >,
565        store: Option<Arc<dyn ContentStore>>,
566        nostr_relay: Option<SharedMeshRelayClient>,
567        mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
568        cashu_quotes: Option<Arc<CashuQuoteState>>,
569        peer_pubkey: Option<String>,
570        state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
571        signal_urls: Vec<String>,
572    ) {
573        let label = dc.label().to_string();
574        let peer_short = peer_id.short();
575        let _pending_binary: Arc<Mutex<Option<u32>>> = Arc::new(Mutex::new(None));
576
577        let open_notify = nostr_relay.as_ref().map(|_| Arc::new(Notify::new()));
578        if let Some(ref notify) = open_notify {
579            if dc.ready_state() == RTCDataChannelState::Open {
580                notify.notify_one();
581            }
582        }
583
584        let mut nostr_client_id: Option<u64> = None;
585        if let Some(relay) = nostr_relay.clone() {
586            let client_id = relay.next_client_id();
587            let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
588            relay
589                .register_client(client_id, nostr_tx, peer_pubkey.clone())
590                .await;
591            nostr_client_id = Some(client_id);
592
593            if let Some(notify) = open_notify.clone() {
594                let dc_for_send = dc.clone();
595                tokio::spawn(async move {
596                    notify.notified().await;
597                    while let Some(text) = nostr_rx.recv().await {
598                        if dc_for_send.send_text(text).await.is_err() {
599                            break;
600                        }
601                    }
602                });
603            }
604        }
605
606        if let (Some(relay), Some(client_id)) = (nostr_relay.clone(), nostr_client_id) {
607            dc.on_close(Box::new(move || {
608                let relay = relay.clone();
609                Box::pin(async move {
610                    relay.unregister_client(client_id).await;
611                })
612            }));
613        }
614
615        let open_notify_clone = open_notify.clone();
616        let dc_for_hints = dc.clone();
617        let signal_urls_for_open = signal_urls.clone();
618        let peer_short_open = peer_short.clone();
619        let label_clone = label.clone();
620        dc.on_open(Box::new(move || {
621            let peer_short_open = peer_short_open.clone();
622            let label_clone = label_clone.clone();
623            let open_notify = open_notify_clone.clone();
624            let dc_for_hints = dc_for_hints.clone();
625            let signal_urls = signal_urls_for_open.clone();
626            Box::pin(async move {
627                info!(
628                    "[Peer {}] Data channel '{}' open",
629                    peer_short_open, label_clone
630                );
631                if let Some(notify) = open_notify {
632                    notify.notify_one();
633                }
634                if !signal_urls.is_empty() {
635                    let hints = PeerHints { signal_urls };
636                    let _ = dc_for_hints
637                        .send(&Bytes::from(encode_peer_hints(&hints)))
638                        .await;
639                }
640            })
641        }));
642
643        if dc.ready_state() == RTCDataChannelState::Open && !signal_urls.is_empty() {
644            let hints = PeerHints { signal_urls };
645            let _ = dc.send(&Bytes::from(encode_peer_hints(&hints))).await;
646        }
647
648        let dc_for_msg = dc.clone();
649        let peer_short_msg = peer_short.clone();
650        let _pending_binary_clone = _pending_binary.clone();
651        let store_clone = store.clone();
652        let nostr_relay_for_msg = nostr_relay.clone();
653        let nostr_client_id_for_msg = nostr_client_id;
654        let pending_nostr_queries_for_msg = pending_nostr_queries.clone();
655        let mesh_frame_tx_for_msg = mesh_frame_tx.clone();
656        let peer_id_for_msg = peer_id.clone();
657
658        dc.on_message(Box::new(move |msg: DataChannelMessage| {
659            let dc = dc_for_msg.clone();
660            let peer_short = peer_short_msg.clone();
661            let pending_requests = pending_requests.clone();
662            let _pending_binary = _pending_binary_clone.clone();
663            let _message_tx = message_tx.clone();
664            let store = store_clone.clone();
665            let nostr_relay = nostr_relay_for_msg.clone();
666            let nostr_client_id = nostr_client_id_for_msg;
667            let pending_nostr_queries = pending_nostr_queries_for_msg.clone();
668            let mesh_frame_tx = mesh_frame_tx_for_msg.clone();
669            let cashu_quotes = cashu_quotes.clone();
670            let state_event_tx = state_event_tx.clone();
671            let peer_id = peer_id_for_msg.clone();
672            let msg_data = msg.data.clone();
673
674            Box::pin(async move {
675                if msg.is_string {
676                    if let Ok(text) = std::str::from_utf8(&msg_data) {
677                        if let Ok(mesh_frame) = serde_json::from_str::<MeshNostrFrame>(text) {
678                            match validate_mesh_frame(&mesh_frame) {
679                                Ok(()) => {
680                                    if let Some(tx) = mesh_frame_tx {
681                                        let _ = tx.send((peer_id.clone(), mesh_frame)).await;
682                                    }
683                                    return;
684                                }
685                                Err(reason) => {
686                                    debug!(
687                                        "[Peer {}] Ignoring invalid mesh frame: {}",
688                                        peer_short, reason
689                                    );
690                                }
691                            }
692                        }
693
694                        if let Ok(relay_msg) = NostrRelayMessage::from_json(text) {
695                            if let Some(sub_id) = relay_subscription_id(&relay_msg) {
696                                let sender = {
697                                    let pending = pending_nostr_queries.lock().await;
698                                    pending.get(&sub_id).cloned()
699                                };
700                                if let Some(tx) = sender {
701                                    debug!(
702                                        "[Peer {}] Routed Nostr relay message for subscription {}",
703                                        peer_short, sub_id
704                                    );
705                                    let _ = tx.send(relay_msg);
706                                    return;
707                                } else {
708                                    debug!(
709                                        "[Peer {}] Dropping Nostr relay message for unknown subscription {}",
710                                        peer_short, sub_id
711                                    );
712                                }
713                            }
714                        }
715
716                        if let Some(relay) = nostr_relay {
717                            if let Ok(nostr_msg) = NostrClientMessage::from_json(text) {
718                                if let Some(client_id) = nostr_client_id {
719                                    relay.handle_client_message(client_id, nostr_msg).await;
720                                }
721                            }
722                        }
723                    }
724                    return;
725                }
726
727                debug!(
728                    "[Peer {}] Received {} bytes on data channel",
729                    peer_short,
730                    msg_data.len()
731                );
732                match parse_message(&msg_data) {
733                    Some(data_msg) => match data_msg {
734                        DataMessage::Request(req) => {
735                            let hash_hex = hash_to_hex(&req.h);
736                            let hash_short = &hash_hex[..8.min(hash_hex.len())];
737                            info!("[Peer {}] Received request for {}", peer_short, hash_short);
738
739                            if let Some(cashu_quotes) = cashu_quotes.as_ref() {
740                                if cashu_quotes
741                                    .should_refuse_requests_from_peer(&peer_id.to_string())
742                                    .await
743                                {
744                                    info!(
745                                        "[Peer {}] Refusing request from peer with unpaid defaults",
746                                        peer_short
747                                    );
748                                    return;
749                                }
750                            }
751
752                            let quoted_settlement = if let Some(quote_id) = req.q {
753                                let Some(cashu_quotes) = cashu_quotes.as_ref() else {
754                                    info!(
755                                        "[Peer {}] Ignoring quoted request without Cashu settlement state",
756                                        peer_short
757                                    );
758                                    return;
759                                };
760                                match cashu_quotes
761                                    .take_valid_quote(&peer_id.to_string(), &req.h, quote_id)
762                                    .await
763                                {
764                                    Some(settlement) => Some((quote_id, settlement)),
765                                    None => {
766                                        info!(
767                                            "[Peer {}] Ignoring request with invalid or expired quote {}",
768                                            peer_short, quote_id
769                                        );
770                                        return;
771                                    }
772                                }
773                            } else {
774                                None
775                            };
776
777                            let data = if let Some(ref store) = store {
778                                match store.get(&hash_hex) {
779                                    Ok(Some(data)) => {
780                                        info!(
781                                            "[Peer {}] Found {} in store ({} bytes)",
782                                            peer_short,
783                                            hash_short,
784                                            data.len()
785                                        );
786                                        Some(data)
787                                    }
788                                    Ok(None) => {
789                                        info!(
790                                            "[Peer {}] Hash {} not in store",
791                                            peer_short, hash_short
792                                        );
793                                        None
794                                    }
795                                    Err(e) => {
796                                        warn!("[Peer {}] Store error: {}", peer_short, e);
797                                        None
798                                    }
799                                }
800                            } else {
801                                warn!(
802                                    "[Peer {}] No store configured - cannot serve requests",
803                                    peer_short
804                                );
805                                None
806                            };
807
808                            if let Some(data) = data {
809                                let data_len = data.len();
810                                if let (Some(cashu_quotes), Some((quote_id, settlement))) =
811                                    (cashu_quotes.as_ref(), quoted_settlement)
812                                {
813                                    match cashu_quotes
814                                        .prepare_quoted_transfer(
815                                            &peer_id.to_string(),
816                                            &req.h,
817                                            quote_id,
818                                            &settlement,
819                                            data,
820                                        )
821                                        .await
822                                    {
823                                        Some((first_chunk, first_expected)) => {
824                                            if send_quoted_chunk(
825                                                &dc,
826                                                &peer_id,
827                                                &peer_short,
828                                                cashu_quotes,
829                                                first_chunk,
830                                                first_expected,
831                                            )
832                                            .await
833                                            {
834                                                info!(
835                                                    "[Peer {}] Started quoted chunked response for {} ({} bytes)",
836                                                    peer_short, hash_short, data_len
837                                                );
838                                            }
839                                        }
840                                        None => {
841                                            warn!(
842                                                "[Peer {}] Failed to prepare quoted transfer for {}",
843                                                peer_short, hash_short
844                                            );
845                                        }
846                                    }
847                                } else {
848                                    let response = DataResponse {
849                                        h: req.h,
850                                        d: data,
851                                        i: None,
852                                        n: None,
853                                    };
854                                    let wire = encode_response(&response);
855                                    if let Err(e) = dc.send(&Bytes::from(wire)).await {
856                                        error!(
857                                            "[Peer {}] Failed to send response: {}",
858                                            peer_short, e
859                                        );
860                                    } else {
861                                        info!(
862                                            "[Peer {}] Sent response for {} ({} bytes)",
863                                            peer_short, hash_short, data_len
864                                        );
865                                    }
866                                }
867                            } else {
868                                info!("[Peer {}] Content not found for {}", peer_short, hash_short);
869                            }
870                        }
871                        DataMessage::Response(res) => {
872                            let hash_hex = hash_to_hex(&res.h);
873                            let hash_short = &hash_hex[..8.min(hash_hex.len())];
874                            debug!(
875                                "[Peer {}] Received response for {} ({} bytes)",
876                                peer_short,
877                                hash_short,
878                                res.d.len()
879                            );
880
881                            let mut pending = pending_requests.lock().await;
882                            if let Some(req) = pending.remove(&hash_hex) {
883                                let _ = req.response_tx.send(Some(res.d));
884                            }
885                        }
886                        DataMessage::QuoteRequest(req) => {
887                            let response = handle_quote_request_message(
888                                &peer_short,
889                                &peer_id,
890                                &store,
891                                cashu_quotes.as_ref(),
892                                &req,
893                            )
894                            .await;
895                            if let Some(response) = response {
896                                let wire = encode_quote_response(&response);
897                                if let Err(e) = dc.send(&Bytes::from(wire)).await {
898                                    warn!(
899                                        "[Peer {}] Failed to send quote response: {}",
900                                        peer_short, e
901                                    );
902                                }
903                            }
904                        }
905                        DataMessage::QuoteResponse(res) => {
906                            if let Some(cashu_quotes) = cashu_quotes.as_ref() {
907                                let _ = cashu_quotes
908                                    .handle_quote_response(&peer_id.to_string(), res)
909                                    .await;
910                            }
911                        }
912                        DataMessage::Chunk(chunk) => {
913                            process_chunk_message(
914                                &peer_short,
915                                &peer_id,
916                                &dc,
917                                &pending_requests,
918                                cashu_quotes.as_ref(),
919                                chunk,
920                            )
921                            .await;
922                        }
923                        DataMessage::Payment(req) => {
924                            let outcome =
925                                handle_payment_message(&peer_id, cashu_quotes.as_ref(), &req).await;
926                            let wire = encode_payment_ack(&outcome.ack);
927                            if let Err(e) = dc.send(&Bytes::from(wire)).await {
928                                warn!(
929                                    "[Peer {}] Failed to send payment ack: {}",
930                                    peer_short, e
931                                );
932                            }
933                            if let (Some(cashu_quotes), Some((next_chunk, next_expected))) =
934                                (cashu_quotes.as_ref(), outcome.next_chunk)
935                            {
936                                let _ = send_quoted_chunk(
937                                    &dc,
938                                    &peer_id,
939                                    &peer_short,
940                                    cashu_quotes,
941                                    next_chunk,
942                                    next_expected,
943                                )
944                                .await;
945                            }
946                        }
947                        DataMessage::PaymentAck(res) => {
948                            handle_payment_ack_message(
949                                &peer_short,
950                                &peer_id,
951                                &dc,
952                                &pending_requests,
953                                cashu_quotes.as_ref(),
954                                res,
955                            )
956                            .await;
957                        }
958                        DataMessage::PeerHints(hints) => {
959                            if let Some(tx) = state_event_tx {
960                                let _ = tx
961                                    .send(PeerStateEvent::SignalHints(
962                                        peer_id.clone(),
963                                        hints.signal_urls,
964                                    ))
965                                    .await;
966                            }
967                        }
968                    },
969                    None => {
970                        warn!("[Peer {}] Failed to parse message", peer_short);
971                        let hex_dump: String = msg_data
972                            .iter()
973                            .take(50)
974                            .map(|b| format!("{:02x}", b))
975                            .collect();
976                        warn!("[Peer {}] Message hex: {}", peer_short, hex_dump);
977                    }
978                }
979            })
980        }));
981    }
982
983    pub fn has_data_channel(&self) -> bool {
984        self.data_channel
985            .try_lock()
986            .map(|guard| {
987                guard
988                    .as_ref()
989                    .map(|dc| dc.ready_state() == RTCDataChannelState::Open)
990                    .unwrap_or(false)
991            })
992            .unwrap_or(false)
993    }
994
995    pub async fn request(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
996        self.request_with_timeout(hash_hex, std::time::Duration::from_secs(10))
997            .await
998    }
999
1000    pub async fn request_with_timeout(
1001        &self,
1002        hash_hex: &str,
1003        timeout: std::time::Duration,
1004    ) -> Result<Option<Vec<u8>>> {
1005        let dc_guard = self.data_channel.lock().await;
1006        let dc = dc_guard
1007            .as_ref()
1008            .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1009            .clone();
1010        drop(dc_guard);
1011
1012        let hash = hex::decode(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hex hash: {}", e))?;
1013        let (tx, rx) = oneshot::channel();
1014
1015        {
1016            let mut pending = self.pending_requests.lock().await;
1017            pending.insert(
1018                hash_hex.to_string(),
1019                PendingRequest::standard(hash.clone(), tx),
1020            );
1021        }
1022
1023        let req = DataRequest {
1024            h: hash,
1025            htl: BLOB_REQUEST_POLICY.max_htl,
1026            q: None,
1027        };
1028        let wire = encode_request(&req);
1029        dc.send(&Bytes::from(wire)).await?;
1030
1031        debug!(
1032            "[Peer {}] Sent request for {}",
1033            self.peer_id.short(),
1034            &hash_hex[..8.min(hash_hex.len())]
1035        );
1036
1037        match tokio::time::timeout(timeout, rx).await {
1038            Ok(Ok(data)) => Ok(data),
1039            Ok(Err(_)) => Ok(None),
1040            Err(_) => {
1041                let mut pending = self.pending_requests.lock().await;
1042                pending.remove(hash_hex);
1043                Ok(None)
1044            }
1045        }
1046    }
1047
1048    pub async fn query_nostr_events(
1049        &self,
1050        filters: Vec<NostrFilter>,
1051        timeout: std::time::Duration,
1052    ) -> Result<Vec<Event>> {
1053        let dc_guard = self.data_channel.lock().await;
1054        let dc = dc_guard
1055            .as_ref()
1056            .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1057            .clone();
1058        drop(dc_guard);
1059
1060        let subscription_id = NostrSubscriptionId::generate();
1061        let subscription_key = subscription_id.to_string();
1062        let (tx, mut rx) = mpsc::unbounded_channel::<NostrRelayMessage>();
1063
1064        {
1065            let mut pending = self.pending_nostr_queries.lock().await;
1066            pending.insert(subscription_key.clone(), tx);
1067        }
1068
1069        let req = NostrClientMessage::req(subscription_id.clone(), filters);
1070        if let Err(e) = dc.send_text(req.as_json()).await {
1071            let mut pending = self.pending_nostr_queries.lock().await;
1072            pending.remove(&subscription_key);
1073            return Err(e.into());
1074        }
1075        debug!(
1076            "[Peer {}] Sent Nostr REQ subscription {}",
1077            self.peer_id.short(),
1078            subscription_id
1079        );
1080
1081        let mut events = Vec::new();
1082        let deadline = tokio::time::Instant::now() + timeout;
1083
1084        loop {
1085            let now = tokio::time::Instant::now();
1086            if now >= deadline {
1087                break;
1088            }
1089            let remaining = deadline - now;
1090
1091            let next = tokio::time::timeout(remaining, rx.recv()).await;
1092            match next {
1093                Ok(Some(NostrRelayMessage::Event {
1094                    subscription_id: sid,
1095                    event,
1096                })) if sid == subscription_id => {
1097                    debug!(
1098                        "[Peer {}] Received Nostr EVENT for subscription {}",
1099                        self.peer_id.short(),
1100                        subscription_id
1101                    );
1102                    events.push(*event);
1103                }
1104                Ok(Some(NostrRelayMessage::EndOfStoredEvents(sid))) if sid == subscription_id => {
1105                    debug!(
1106                        "[Peer {}] Received Nostr EOSE for subscription {}",
1107                        self.peer_id.short(),
1108                        subscription_id
1109                    );
1110                    break;
1111                }
1112                Ok(Some(NostrRelayMessage::Closed {
1113                    subscription_id: sid,
1114                    message,
1115                })) if sid == subscription_id => {
1116                    warn!(
1117                        "[Peer {}] Nostr query closed for subscription {}: {}",
1118                        self.peer_id.short(),
1119                        subscription_id,
1120                        message
1121                    );
1122                    break;
1123                }
1124                Ok(Some(_)) => {}
1125                Ok(None) => break,
1126                Err(_) => {
1127                    warn!(
1128                        "[Peer {}] Nostr query timed out for subscription {}",
1129                        self.peer_id.short(),
1130                        subscription_id
1131                    );
1132                    break;
1133                }
1134            }
1135        }
1136
1137        let close = NostrClientMessage::close(subscription_id.clone());
1138        let _ = dc.send_text(close.as_json()).await;
1139
1140        let mut pending = self.pending_nostr_queries.lock().await;
1141        pending.remove(&subscription_key);
1142        debug!(
1143            "[Peer {}] Nostr query subscription {} collected {} event(s)",
1144            self.peer_id.short(),
1145            subscription_id,
1146            events.len()
1147        );
1148
1149        Ok(events)
1150    }
1151
1152    pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
1153        let dc_guard = self.data_channel.lock().await;
1154        let dc = dc_guard
1155            .as_ref()
1156            .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1157            .clone();
1158        drop(dc_guard);
1159
1160        let text = serde_json::to_string(frame)?;
1161        dc.send_text(text).await?;
1162        Ok(())
1163    }
1164
1165    pub async fn send_message(&self, msg: &DataMessage) -> Result<()> {
1166        let dc_guard = self.data_channel.lock().await;
1167        if let Some(ref dc) = *dc_guard {
1168            let wire = encode_data_message(msg);
1169            dc.send(&Bytes::from(wire)).await?;
1170        }
1171        Ok(())
1172    }
1173
1174    pub async fn close(&self) -> Result<()> {
1175        {
1176            let dc_guard = self.data_channel.lock().await;
1177            if let Some(ref dc) = *dc_guard {
1178                dc.close().await?;
1179            }
1180        }
1181        self.pc.close().await?;
1182        Ok(())
1183    }
1184}
1185
1186fn hash_to_hex(hash: &[u8]) -> String {
1187    hash_to_key(hash)
1188}
1189
1190fn encode_data_message(msg: &DataMessage) -> Vec<u8> {
1191    match msg {
1192        DataMessage::Request(req) => encode_request(req),
1193        DataMessage::Response(res) => encode_response(res),
1194        DataMessage::QuoteRequest(req) => crate::encode_quote_request(req),
1195        DataMessage::QuoteResponse(res) => encode_quote_response(res),
1196        DataMessage::Payment(req) => crate::encode_payment(req),
1197        DataMessage::PaymentAck(res) => crate::encode_payment_ack(res),
1198        DataMessage::Chunk(chunk) => crate::encode_chunk(chunk),
1199        DataMessage::PeerHints(hints) => crate::encode_peer_hints(hints),
1200    }
1201}
1202
1203fn relay_subscription_id(msg: &NostrRelayMessage) -> Option<String> {
1204    match msg {
1205        NostrRelayMessage::Event {
1206            subscription_id, ..
1207        } => Some(subscription_id.to_string()),
1208        NostrRelayMessage::EndOfStoredEvents(subscription_id) => Some(subscription_id.to_string()),
1209        NostrRelayMessage::Closed {
1210            subscription_id, ..
1211        } => Some(subscription_id.to_string()),
1212        NostrRelayMessage::Count {
1213            subscription_id, ..
1214        } => Some(subscription_id.to_string()),
1215        _ => None,
1216    }
1217}
1218
1219#[async_trait]
1220impl RoutedPeerLink for Peer {
1221    async fn send(&self, data: Vec<u8>) -> std::result::Result<(), RoutedTransportError> {
1222        let dc = self
1223            .data_channel
1224            .lock()
1225            .await
1226            .as_ref()
1227            .cloned()
1228            .ok_or(RoutedTransportError::NotConnected)?;
1229        dc.send(&Bytes::from(data))
1230            .await
1231            .map(|_| ())
1232            .map_err(|e| RoutedTransportError::SendFailed(e.to_string()))
1233    }
1234
1235    async fn recv(&self) -> Option<Vec<u8>> {
1236        None
1237    }
1238
1239    fn try_recv(&self) -> Option<Vec<u8>> {
1240        None
1241    }
1242
1243    fn is_open(&self) -> bool {
1244        self.has_data_channel()
1245    }
1246
1247    async fn close(&self) {
1248        let _ = Peer::close(self).await;
1249    }
1250}