Skip to main content

hashtree_network/
peer.rs

1//! WebRTC peer connection for hashtree data exchange
2
3use anyhow::Result;
4use async_trait::async_trait;
5use bytes::Bytes;
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::{mpsc, oneshot, Mutex, Notify};
9use tracing::{debug, error, info, warn};
10use webrtc::api::interceptor_registry::register_default_interceptors;
11use webrtc::api::media_engine::MediaEngine;
12use webrtc::api::setting_engine::SettingEngine;
13use webrtc::api::APIBuilder;
14use webrtc::data_channel::data_channel_init::RTCDataChannelInit;
15use webrtc::data_channel::data_channel_message::DataChannelMessage;
16use webrtc::data_channel::data_channel_state::RTCDataChannelState;
17use webrtc::data_channel::RTCDataChannel;
18use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
19use webrtc::ice_transport::ice_server::RTCIceServer;
20use webrtc::interceptor::registry::Registry;
21use webrtc::peer_connection::configuration::RTCConfiguration;
22use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
23use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
24use webrtc::peer_connection::RTCPeerConnection;
25
26use crate::cashu::CashuQuoteState;
27use crate::relay_bridge::SharedMeshRelayClient;
28use crate::runtime_control::PeerStateEvent;
29use crate::runtime_peer::PeerDirection;
30use crate::transport::{PeerLink as RoutedPeerLink, TransportError as RoutedTransportError};
31use crate::types::{
32    validate_mesh_frame, MeshNostrFrame, PeerHTLConfig, PeerId, SignalingMessage,
33    BLOB_REQUEST_POLICY, DATA_CHANNEL_LABEL,
34};
35use crate::{
36    encode_payment_ack, encode_quote_response, encode_request, encode_response, hash_to_key,
37    parse_message, DataChunk, DataMessage, DataRequest, DataResponse,
38};
39use nostr_sdk::nostr::{
40    ClientMessage as NostrClientMessage, Event, Filter as NostrFilter, JsonUtil as NostrJsonUtil,
41    RelayMessage as NostrRelayMessage, SubscriptionId as NostrSubscriptionId,
42};
43
44mod payments;
45
46use payments::{
47    handle_payment_ack_message, handle_payment_message, handle_quote_request_message,
48    process_chunk_message, send_quoted_chunk,
49};
50
51/// Trait for content storage that can be used by WebRTC peers
52pub trait ContentStore: Send + Sync + 'static {
53    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>>;
54}
55
56/// Pending request tracking (keyed by hash hex)
57pub struct PendingRequest {
58    pub hash: Vec<u8>,
59    pub response_tx: oneshot::Sender<Option<Vec<u8>>>,
60    pub quoted: Option<PendingQuotedRequest>,
61}
62
63pub struct PendingQuotedRequest {
64    pub quote_id: u64,
65    pub mint_url: String,
66    pub total_payment_sat: u64,
67    pub confirmed_payment_sat: u64,
68    pub next_chunk_index: u32,
69    pub total_chunks: Option<u32>,
70    pub assembled_data: Vec<u8>,
71    pub in_flight_payment: Option<PendingChunkPayment>,
72    pub buffered_chunk: Option<DataChunk>,
73}
74
75pub struct PendingChunkPayment {
76    pub chunk_index: u32,
77    pub amount_sat: u64,
78    pub mint_url: String,
79    pub operation_id: String,
80    pub final_chunk: bool,
81}
82
83impl PendingRequest {
84    pub fn standard(hash: Vec<u8>, response_tx: oneshot::Sender<Option<Vec<u8>>>) -> Self {
85        Self {
86            hash,
87            response_tx,
88            quoted: None,
89        }
90    }
91
92    pub fn quoted(
93        hash: Vec<u8>,
94        response_tx: oneshot::Sender<Option<Vec<u8>>>,
95        quote_id: u64,
96        mint_url: String,
97        total_payment_sat: u64,
98    ) -> Self {
99        Self {
100            hash,
101            response_tx,
102            quoted: Some(PendingQuotedRequest {
103                quote_id,
104                mint_url,
105                total_payment_sat,
106                confirmed_payment_sat: 0,
107                next_chunk_index: 0,
108                total_chunks: None,
109                assembled_data: Vec::new(),
110                in_flight_payment: None,
111                buffered_chunk: None,
112            }),
113        }
114    }
115}
116
117/// WebRTC peer connection with data channel protocol
118pub struct Peer {
119    pub peer_id: PeerId,
120    pub direction: PeerDirection,
121    pub created_at: std::time::Instant,
122    pub connected_at: Option<std::time::Instant>,
123
124    pc: Arc<RTCPeerConnection>,
125    pub data_channel: Arc<Mutex<Option<Arc<RTCDataChannel>>>>,
126    signaling_tx: mpsc::Sender<SignalingMessage>,
127    my_peer_id: PeerId,
128    store: Option<Arc<dyn ContentStore>>,
129    pub pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
130    pending_nostr_queries: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>>,
131    #[allow(dead_code)]
132    message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
133    #[allow(dead_code)]
134    message_rx: Option<mpsc::Receiver<(DataMessage, Option<Vec<u8>>)>>,
135    state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
136    nostr_relay: Option<SharedMeshRelayClient>,
137    mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
138    cashu_quotes: Option<Arc<CashuQuoteState>>,
139    htl_config: PeerHTLConfig,
140}
141
142impl Peer {
143    pub async fn new(
144        peer_id: PeerId,
145        direction: PeerDirection,
146        my_peer_id: PeerId,
147        signaling_tx: mpsc::Sender<SignalingMessage>,
148        stun_servers: Vec<String>,
149    ) -> Result<Self> {
150        Self::new_with_store_and_events(
151            peer_id,
152            direction,
153            my_peer_id,
154            signaling_tx,
155            stun_servers,
156            None,
157            None,
158            None,
159            None,
160            None,
161        )
162        .await
163    }
164
165    pub async fn new_with_store(
166        peer_id: PeerId,
167        direction: PeerDirection,
168        my_peer_id: PeerId,
169        signaling_tx: mpsc::Sender<SignalingMessage>,
170        stun_servers: Vec<String>,
171        store: Option<Arc<dyn ContentStore>>,
172    ) -> Result<Self> {
173        Self::new_with_store_and_events(
174            peer_id,
175            direction,
176            my_peer_id,
177            signaling_tx,
178            stun_servers,
179            store,
180            None,
181            None,
182            None,
183            None,
184        )
185        .await
186    }
187
188    #[allow(clippy::too_many_arguments)]
189    pub async fn new_with_store_and_events(
190        peer_id: PeerId,
191        direction: PeerDirection,
192        my_peer_id: PeerId,
193        signaling_tx: mpsc::Sender<SignalingMessage>,
194        stun_servers: Vec<String>,
195        store: Option<Arc<dyn ContentStore>>,
196        state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
197        nostr_relay: Option<SharedMeshRelayClient>,
198        mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
199        cashu_quotes: Option<Arc<CashuQuoteState>>,
200    ) -> Result<Self> {
201        let mut m = MediaEngine::default();
202        m.register_default_codecs()?;
203
204        let mut registry = Registry::new();
205        registry = register_default_interceptors(registry, &mut m)?;
206
207        let setting_engine = SettingEngine::default();
208        let api = APIBuilder::new()
209            .with_media_engine(m)
210            .with_interceptor_registry(registry)
211            .with_setting_engine(setting_engine)
212            .build();
213
214        let ice_servers: Vec<RTCIceServer> = stun_servers
215            .iter()
216            .map(|url| RTCIceServer {
217                urls: vec![url.clone()],
218                ..Default::default()
219            })
220            .collect();
221
222        let config = RTCConfiguration {
223            ice_servers,
224            ..Default::default()
225        };
226
227        let pc = Arc::new(api.new_peer_connection(config).await?);
228        let (message_tx, message_rx) = mpsc::channel(100);
229        Ok(Self {
230            peer_id,
231            direction,
232            created_at: std::time::Instant::now(),
233            connected_at: None,
234            pc,
235            data_channel: Arc::new(Mutex::new(None)),
236            signaling_tx,
237            my_peer_id,
238            store,
239            pending_requests: Arc::new(Mutex::new(HashMap::new())),
240            pending_nostr_queries: Arc::new(Mutex::new(HashMap::new())),
241            message_tx,
242            message_rx: Some(message_rx),
243            state_event_tx,
244            nostr_relay,
245            mesh_frame_tx,
246            cashu_quotes,
247            htl_config: PeerHTLConfig::random(),
248        })
249    }
250
251    pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
252        self.store = Some(store);
253    }
254
255    pub fn state(&self) -> RTCPeerConnectionState {
256        self.pc.connection_state()
257    }
258
259    pub fn signaling_state(&self) -> webrtc::peer_connection::signaling_state::RTCSignalingState {
260        self.pc.signaling_state()
261    }
262
263    pub fn is_connected(&self) -> bool {
264        self.pc.connection_state() == RTCPeerConnectionState::Connected
265    }
266
267    pub fn htl_config(&self) -> &PeerHTLConfig {
268        &self.htl_config
269    }
270
271    pub async fn setup_handlers(&self) -> Result<()> {
272        let peer_id = self.peer_id.clone();
273        let signaling_tx = self.signaling_tx.clone();
274        let my_peer_id_str = self.my_peer_id.to_string();
275        let target_peer_id = self.peer_id.to_string();
276
277        self.pc
278            .on_ice_candidate(Box::new(move |candidate: Option<RTCIceCandidate>| {
279                let signaling_tx = signaling_tx.clone();
280                let my_peer_id_str = my_peer_id_str.clone();
281                let target_peer_id = target_peer_id.clone();
282
283                Box::pin(async move {
284                    if let Some(c) = candidate {
285                        if let Ok(init) = c.to_json() {
286                            info!(
287                                "ICE candidate generated: {}",
288                                &init.candidate[..init.candidate.len().min(60)]
289                            );
290                            let msg = SignalingMessage::Candidate {
291                                peer_id: my_peer_id_str.clone(),
292                                target_peer_id: target_peer_id.clone(),
293                                candidate: init.candidate,
294                                sdp_m_line_index: init.sdp_mline_index,
295                                sdp_mid: init.sdp_mid,
296                            };
297                            if let Err(e) = signaling_tx.send(msg).await {
298                                error!("Failed to send ICE candidate: {}", e);
299                            }
300                        }
301                    }
302                })
303            }));
304
305        let peer_id_log = peer_id.clone();
306        let state_event_tx = self.state_event_tx.clone();
307        self.pc
308            .on_peer_connection_state_change(Box::new(move |state: RTCPeerConnectionState| {
309                let peer_id = peer_id_log.clone();
310                let state_event_tx = state_event_tx.clone();
311                Box::pin(async move {
312                    info!("Peer {} connection state: {:?}", peer_id.short(), state);
313
314                    if let Some(tx) = state_event_tx {
315                        let event = match state {
316                            RTCPeerConnectionState::Connected => {
317                                Some(PeerStateEvent::Connected(peer_id))
318                            }
319                            RTCPeerConnectionState::Failed => Some(PeerStateEvent::Failed(peer_id)),
320                            RTCPeerConnectionState::Disconnected
321                            | RTCPeerConnectionState::Closed => {
322                                Some(PeerStateEvent::Disconnected(peer_id))
323                            }
324                            _ => None,
325                        };
326                        if let Some(event) = event {
327                            if let Err(e) = tx.send(event).await {
328                                error!("Failed to send peer state event: {}", e);
329                            }
330                        }
331                    }
332                })
333            }));
334
335        Ok(())
336    }
337
338    pub async fn connect(&self) -> Result<serde_json::Value> {
339        let dc_init = RTCDataChannelInit {
340            ordered: Some(false),
341            ..Default::default()
342        };
343        let dc = self
344            .pc
345            .create_data_channel(DATA_CHANNEL_LABEL, Some(dc_init))
346            .await?;
347        self.setup_data_channel(dc.clone()).await?;
348        {
349            let mut dc_guard = self.data_channel.lock().await;
350            *dc_guard = Some(dc);
351        }
352
353        let offer = self.pc.create_offer(None).await?;
354        let mut gathering_complete = self.pc.gathering_complete_promise().await;
355        self.pc.set_local_description(offer).await?;
356
357        let _ = tokio::time::timeout(
358            std::time::Duration::from_secs(10),
359            gathering_complete.recv(),
360        )
361        .await;
362
363        let local_desc = self
364            .pc
365            .local_description()
366            .await
367            .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
368
369        debug!(
370            "Offer created, SDP len: {}, ice_gathering: {:?}",
371            local_desc.sdp.len(),
372            self.pc.ice_gathering_state()
373        );
374
375        Ok(serde_json::json!({
376            "type": local_desc.sdp_type.to_string().to_lowercase(),
377            "sdp": local_desc.sdp
378        }))
379    }
380
381    pub async fn handle_offer(&self, offer: serde_json::Value) -> Result<serde_json::Value> {
382        let sdp = offer
383            .get("sdp")
384            .and_then(|s| s.as_str())
385            .ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
386
387        let peer_id = self.peer_id.clone();
388        let message_tx = self.message_tx.clone();
389        let pending_requests = self.pending_requests.clone();
390        let pending_nostr_queries = self.pending_nostr_queries.clone();
391        let store = self.store.clone();
392        let data_channel_holder = self.data_channel.clone();
393        let nostr_relay = self.nostr_relay.clone();
394        let mesh_frame_tx = self.mesh_frame_tx.clone();
395        let cashu_quotes = self.cashu_quotes.clone();
396        let peer_pubkey = Some(self.peer_id.pubkey.clone());
397
398        self.pc
399            .on_data_channel(Box::new(move |dc: Arc<RTCDataChannel>| {
400                let peer_id = peer_id.clone();
401                let message_tx = message_tx.clone();
402                let pending_requests = pending_requests.clone();
403                let pending_nostr_queries = pending_nostr_queries.clone();
404                let store = store.clone();
405                let data_channel_holder = data_channel_holder.clone();
406                let nostr_relay = nostr_relay.clone();
407                let mesh_frame_tx = mesh_frame_tx.clone();
408                let cashu_quotes = cashu_quotes.clone();
409                let peer_pubkey = peer_pubkey.clone();
410
411                Box::pin(async move {
412                    info!(
413                        "Peer {} received data channel: {}",
414                        peer_id.short(),
415                        dc.label()
416                    );
417
418                    {
419                        let mut dc_guard = data_channel_holder.lock().await;
420                        *dc_guard = Some(dc.clone());
421                    }
422
423                    Self::setup_dc_handlers(
424                        dc.clone(),
425                        peer_id,
426                        message_tx,
427                        pending_requests,
428                        pending_nostr_queries.clone(),
429                        store,
430                        nostr_relay,
431                        mesh_frame_tx,
432                        cashu_quotes,
433                        peer_pubkey,
434                    )
435                    .await;
436                })
437            }));
438
439        let offer_desc = RTCSessionDescription::offer(sdp.to_string())?;
440        self.pc.set_remote_description(offer_desc).await?;
441
442        let answer = self.pc.create_answer(None).await?;
443        let mut gathering_complete = self.pc.gathering_complete_promise().await;
444        self.pc.set_local_description(answer).await?;
445
446        let _ = tokio::time::timeout(
447            std::time::Duration::from_secs(10),
448            gathering_complete.recv(),
449        )
450        .await;
451
452        let local_desc = self
453            .pc
454            .local_description()
455            .await
456            .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
457
458        debug!(
459            "Answer created, SDP len: {}, ice_gathering: {:?}",
460            local_desc.sdp.len(),
461            self.pc.ice_gathering_state()
462        );
463
464        Ok(serde_json::json!({
465            "type": local_desc.sdp_type.to_string().to_lowercase(),
466            "sdp": local_desc.sdp
467        }))
468    }
469
470    pub async fn handle_answer(&self, answer: serde_json::Value) -> Result<()> {
471        let sdp = answer
472            .get("sdp")
473            .and_then(|s| s.as_str())
474            .ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
475
476        let answer_desc = RTCSessionDescription::answer(sdp.to_string())?;
477        self.pc.set_remote_description(answer_desc).await?;
478        Ok(())
479    }
480
481    pub async fn handle_candidate(&self, candidate: serde_json::Value) -> Result<()> {
482        let candidate_str = candidate
483            .get("candidate")
484            .and_then(|c| c.as_str())
485            .unwrap_or("");
486
487        let sdp_mid = candidate
488            .get("sdpMid")
489            .and_then(|m| m.as_str())
490            .map(|s| s.to_string());
491
492        let sdp_mline_index = candidate
493            .get("sdpMLineIndex")
494            .and_then(|i| i.as_u64())
495            .map(|i| i as u16);
496
497        if !candidate_str.is_empty() {
498            use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
499            let init = RTCIceCandidateInit {
500                candidate: candidate_str.to_string(),
501                sdp_mid,
502                sdp_mline_index,
503                username_fragment: candidate
504                    .get("usernameFragment")
505                    .and_then(|u| u.as_str())
506                    .map(|s| s.to_string()),
507            };
508            self.pc.add_ice_candidate(init).await?;
509        }
510
511        Ok(())
512    }
513
514    async fn setup_data_channel(&self, dc: Arc<RTCDataChannel>) -> Result<()> {
515        let peer_id = self.peer_id.clone();
516        let message_tx = self.message_tx.clone();
517        let pending_requests = self.pending_requests.clone();
518        let store = self.store.clone();
519        let nostr_relay = self.nostr_relay.clone();
520        let mesh_frame_tx = self.mesh_frame_tx.clone();
521        let cashu_quotes = self.cashu_quotes.clone();
522        let peer_pubkey = Some(self.peer_id.pubkey.clone());
523
524        Self::setup_dc_handlers(
525            dc,
526            peer_id,
527            message_tx,
528            pending_requests,
529            self.pending_nostr_queries.clone(),
530            store,
531            nostr_relay,
532            mesh_frame_tx,
533            cashu_quotes,
534            peer_pubkey,
535        )
536        .await;
537        Ok(())
538    }
539
540    #[allow(clippy::too_many_arguments)]
541    async fn setup_dc_handlers(
542        dc: Arc<RTCDataChannel>,
543        peer_id: PeerId,
544        message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
545        pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
546        pending_nostr_queries: Arc<
547            Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>,
548        >,
549        store: Option<Arc<dyn ContentStore>>,
550        nostr_relay: Option<SharedMeshRelayClient>,
551        mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
552        cashu_quotes: Option<Arc<CashuQuoteState>>,
553        peer_pubkey: Option<String>,
554    ) {
555        let label = dc.label().to_string();
556        let peer_short = peer_id.short();
557        let _pending_binary: Arc<Mutex<Option<u32>>> = Arc::new(Mutex::new(None));
558
559        let open_notify = nostr_relay.as_ref().map(|_| Arc::new(Notify::new()));
560        if let Some(ref notify) = open_notify {
561            if dc.ready_state() == RTCDataChannelState::Open {
562                notify.notify_one();
563            }
564        }
565
566        let mut nostr_client_id: Option<u64> = None;
567        if let Some(relay) = nostr_relay.clone() {
568            let client_id = relay.next_client_id();
569            let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
570            relay
571                .register_client(client_id, nostr_tx, peer_pubkey.clone())
572                .await;
573            nostr_client_id = Some(client_id);
574
575            if let Some(notify) = open_notify.clone() {
576                let dc_for_send = dc.clone();
577                tokio::spawn(async move {
578                    notify.notified().await;
579                    while let Some(text) = nostr_rx.recv().await {
580                        if dc_for_send.send_text(text).await.is_err() {
581                            break;
582                        }
583                    }
584                });
585            }
586        }
587
588        if let (Some(relay), Some(client_id)) = (nostr_relay.clone(), nostr_client_id) {
589            dc.on_close(Box::new(move || {
590                let relay = relay.clone();
591                Box::pin(async move {
592                    relay.unregister_client(client_id).await;
593                })
594            }));
595        }
596
597        let open_notify_clone = open_notify.clone();
598        let peer_short_open = peer_short.clone();
599        let label_clone = label.clone();
600        dc.on_open(Box::new(move || {
601            let peer_short_open = peer_short_open.clone();
602            let label_clone = label_clone.clone();
603            let open_notify = open_notify_clone.clone();
604            Box::pin(async move {
605                info!(
606                    "[Peer {}] Data channel '{}' open",
607                    peer_short_open, label_clone
608                );
609                if let Some(notify) = open_notify {
610                    notify.notify_one();
611                }
612            })
613        }));
614
615        let dc_for_msg = dc.clone();
616        let peer_short_msg = peer_short.clone();
617        let _pending_binary_clone = _pending_binary.clone();
618        let store_clone = store.clone();
619        let nostr_relay_for_msg = nostr_relay.clone();
620        let nostr_client_id_for_msg = nostr_client_id;
621        let pending_nostr_queries_for_msg = pending_nostr_queries.clone();
622        let mesh_frame_tx_for_msg = mesh_frame_tx.clone();
623        let peer_id_for_msg = peer_id.clone();
624
625        dc.on_message(Box::new(move |msg: DataChannelMessage| {
626            let dc = dc_for_msg.clone();
627            let peer_short = peer_short_msg.clone();
628            let pending_requests = pending_requests.clone();
629            let _pending_binary = _pending_binary_clone.clone();
630            let _message_tx = message_tx.clone();
631            let store = store_clone.clone();
632            let nostr_relay = nostr_relay_for_msg.clone();
633            let nostr_client_id = nostr_client_id_for_msg;
634            let pending_nostr_queries = pending_nostr_queries_for_msg.clone();
635            let mesh_frame_tx = mesh_frame_tx_for_msg.clone();
636            let cashu_quotes = cashu_quotes.clone();
637            let peer_id = peer_id_for_msg.clone();
638            let msg_data = msg.data.clone();
639
640            Box::pin(async move {
641                if msg.is_string {
642                    if let Ok(text) = std::str::from_utf8(&msg_data) {
643                        if let Ok(mesh_frame) = serde_json::from_str::<MeshNostrFrame>(text) {
644                            match validate_mesh_frame(&mesh_frame) {
645                                Ok(()) => {
646                                    if let Some(tx) = mesh_frame_tx {
647                                        let _ = tx.send((peer_id.clone(), mesh_frame)).await;
648                                    }
649                                    return;
650                                }
651                                Err(reason) => {
652                                    debug!(
653                                        "[Peer {}] Ignoring invalid mesh frame: {}",
654                                        peer_short, reason
655                                    );
656                                }
657                            }
658                        }
659
660                        if let Ok(relay_msg) = NostrRelayMessage::from_json(text) {
661                            if let Some(sub_id) = relay_subscription_id(&relay_msg) {
662                                let sender = {
663                                    let pending = pending_nostr_queries.lock().await;
664                                    pending.get(&sub_id).cloned()
665                                };
666                                if let Some(tx) = sender {
667                                    debug!(
668                                        "[Peer {}] Routed Nostr relay message for subscription {}",
669                                        peer_short, sub_id
670                                    );
671                                    let _ = tx.send(relay_msg);
672                                    return;
673                                } else {
674                                    debug!(
675                                        "[Peer {}] Dropping Nostr relay message for unknown subscription {}",
676                                        peer_short, sub_id
677                                    );
678                                }
679                            }
680                        }
681
682                        if let Some(relay) = nostr_relay {
683                            if let Ok(nostr_msg) = NostrClientMessage::from_json(text) {
684                                if let Some(client_id) = nostr_client_id {
685                                    relay.handle_client_message(client_id, nostr_msg).await;
686                                }
687                            }
688                        }
689                    }
690                    return;
691                }
692
693                debug!(
694                    "[Peer {}] Received {} bytes on data channel",
695                    peer_short,
696                    msg_data.len()
697                );
698                match parse_message(&msg_data) {
699                    Some(data_msg) => match data_msg {
700                        DataMessage::Request(req) => {
701                            let hash_hex = hash_to_hex(&req.h);
702                            let hash_short = &hash_hex[..8.min(hash_hex.len())];
703                            info!("[Peer {}] Received request for {}", peer_short, hash_short);
704
705                            if let Some(cashu_quotes) = cashu_quotes.as_ref() {
706                                if cashu_quotes
707                                    .should_refuse_requests_from_peer(&peer_id.to_string())
708                                    .await
709                                {
710                                    info!(
711                                        "[Peer {}] Refusing request from peer with unpaid defaults",
712                                        peer_short
713                                    );
714                                    return;
715                                }
716                            }
717
718                            let quoted_settlement = if let Some(quote_id) = req.q {
719                                let Some(cashu_quotes) = cashu_quotes.as_ref() else {
720                                    info!(
721                                        "[Peer {}] Ignoring quoted request without Cashu settlement state",
722                                        peer_short
723                                    );
724                                    return;
725                                };
726                                match cashu_quotes
727                                    .take_valid_quote(&peer_id.to_string(), &req.h, quote_id)
728                                    .await
729                                {
730                                    Some(settlement) => Some((quote_id, settlement)),
731                                    None => {
732                                        info!(
733                                            "[Peer {}] Ignoring request with invalid or expired quote {}",
734                                            peer_short, quote_id
735                                        );
736                                        return;
737                                    }
738                                }
739                            } else {
740                                None
741                            };
742
743                            let data = if let Some(ref store) = store {
744                                match store.get(&hash_hex) {
745                                    Ok(Some(data)) => {
746                                        info!(
747                                            "[Peer {}] Found {} in store ({} bytes)",
748                                            peer_short,
749                                            hash_short,
750                                            data.len()
751                                        );
752                                        Some(data)
753                                    }
754                                    Ok(None) => {
755                                        info!(
756                                            "[Peer {}] Hash {} not in store",
757                                            peer_short, hash_short
758                                        );
759                                        None
760                                    }
761                                    Err(e) => {
762                                        warn!("[Peer {}] Store error: {}", peer_short, e);
763                                        None
764                                    }
765                                }
766                            } else {
767                                warn!(
768                                    "[Peer {}] No store configured - cannot serve requests",
769                                    peer_short
770                                );
771                                None
772                            };
773
774                            if let Some(data) = data {
775                                let data_len = data.len();
776                                if let (Some(cashu_quotes), Some((quote_id, settlement))) =
777                                    (cashu_quotes.as_ref(), quoted_settlement)
778                                {
779                                    match cashu_quotes
780                                        .prepare_quoted_transfer(
781                                            &peer_id.to_string(),
782                                            &req.h,
783                                            quote_id,
784                                            &settlement,
785                                            data,
786                                        )
787                                        .await
788                                    {
789                                        Some((first_chunk, first_expected)) => {
790                                            if send_quoted_chunk(
791                                                &dc,
792                                                &peer_id,
793                                                &peer_short,
794                                                cashu_quotes,
795                                                first_chunk,
796                                                first_expected,
797                                            )
798                                            .await
799                                            {
800                                                info!(
801                                                    "[Peer {}] Started quoted chunked response for {} ({} bytes)",
802                                                    peer_short, hash_short, data_len
803                                                );
804                                            }
805                                        }
806                                        None => {
807                                            warn!(
808                                                "[Peer {}] Failed to prepare quoted transfer for {}",
809                                                peer_short, hash_short
810                                            );
811                                        }
812                                    }
813                                } else {
814                                    let response = DataResponse {
815                                        h: req.h,
816                                        d: data,
817                                        i: None,
818                                        n: None,
819                                    };
820                                    let wire = encode_response(&response);
821                                    if let Err(e) = dc.send(&Bytes::from(wire)).await {
822                                        error!(
823                                            "[Peer {}] Failed to send response: {}",
824                                            peer_short, e
825                                        );
826                                    } else {
827                                        info!(
828                                            "[Peer {}] Sent response for {} ({} bytes)",
829                                            peer_short, hash_short, data_len
830                                        );
831                                    }
832                                }
833                            } else {
834                                info!("[Peer {}] Content not found for {}", peer_short, hash_short);
835                            }
836                        }
837                        DataMessage::Response(res) => {
838                            let hash_hex = hash_to_hex(&res.h);
839                            let hash_short = &hash_hex[..8.min(hash_hex.len())];
840                            debug!(
841                                "[Peer {}] Received response for {} ({} bytes)",
842                                peer_short,
843                                hash_short,
844                                res.d.len()
845                            );
846
847                            let mut pending = pending_requests.lock().await;
848                            if let Some(req) = pending.remove(&hash_hex) {
849                                let _ = req.response_tx.send(Some(res.d));
850                            }
851                        }
852                        DataMessage::QuoteRequest(req) => {
853                            let response = handle_quote_request_message(
854                                &peer_short,
855                                &peer_id,
856                                &store,
857                                cashu_quotes.as_ref(),
858                                &req,
859                            )
860                            .await;
861                            if let Some(response) = response {
862                                let wire = encode_quote_response(&response);
863                                if let Err(e) = dc.send(&Bytes::from(wire)).await {
864                                    warn!(
865                                        "[Peer {}] Failed to send quote response: {}",
866                                        peer_short, e
867                                    );
868                                }
869                            }
870                        }
871                        DataMessage::QuoteResponse(res) => {
872                            if let Some(cashu_quotes) = cashu_quotes.as_ref() {
873                                let _ = cashu_quotes
874                                    .handle_quote_response(&peer_id.to_string(), res)
875                                    .await;
876                            }
877                        }
878                        DataMessage::Chunk(chunk) => {
879                            process_chunk_message(
880                                &peer_short,
881                                &peer_id,
882                                &dc,
883                                &pending_requests,
884                                cashu_quotes.as_ref(),
885                                chunk,
886                            )
887                            .await;
888                        }
889                        DataMessage::Payment(req) => {
890                            let outcome =
891                                handle_payment_message(&peer_id, cashu_quotes.as_ref(), &req).await;
892                            let wire = encode_payment_ack(&outcome.ack);
893                            if let Err(e) = dc.send(&Bytes::from(wire)).await {
894                                warn!(
895                                    "[Peer {}] Failed to send payment ack: {}",
896                                    peer_short, e
897                                );
898                            }
899                            if let (Some(cashu_quotes), Some((next_chunk, next_expected))) =
900                                (cashu_quotes.as_ref(), outcome.next_chunk)
901                            {
902                                let _ = send_quoted_chunk(
903                                    &dc,
904                                    &peer_id,
905                                    &peer_short,
906                                    cashu_quotes,
907                                    next_chunk,
908                                    next_expected,
909                                )
910                                .await;
911                            }
912                        }
913                        DataMessage::PaymentAck(res) => {
914                            handle_payment_ack_message(
915                                &peer_short,
916                                &peer_id,
917                                &dc,
918                                &pending_requests,
919                                cashu_quotes.as_ref(),
920                                res,
921                            )
922                            .await;
923                        }
924                    },
925                    None => {
926                        warn!("[Peer {}] Failed to parse message", peer_short);
927                        let hex_dump: String = msg_data
928                            .iter()
929                            .take(50)
930                            .map(|b| format!("{:02x}", b))
931                            .collect();
932                        warn!("[Peer {}] Message hex: {}", peer_short, hex_dump);
933                    }
934                }
935            })
936        }));
937    }
938
939    pub fn has_data_channel(&self) -> bool {
940        self.data_channel
941            .try_lock()
942            .map(|guard| {
943                guard
944                    .as_ref()
945                    .map(|dc| dc.ready_state() == RTCDataChannelState::Open)
946                    .unwrap_or(false)
947            })
948            .unwrap_or(false)
949    }
950
951    pub async fn request(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
952        self.request_with_timeout(hash_hex, std::time::Duration::from_secs(10))
953            .await
954    }
955
956    pub async fn request_with_timeout(
957        &self,
958        hash_hex: &str,
959        timeout: std::time::Duration,
960    ) -> Result<Option<Vec<u8>>> {
961        let dc_guard = self.data_channel.lock().await;
962        let dc = dc_guard
963            .as_ref()
964            .ok_or_else(|| anyhow::anyhow!("No data channel"))?
965            .clone();
966        drop(dc_guard);
967
968        let hash = hex::decode(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hex hash: {}", e))?;
969        let (tx, rx) = oneshot::channel();
970
971        {
972            let mut pending = self.pending_requests.lock().await;
973            pending.insert(
974                hash_hex.to_string(),
975                PendingRequest::standard(hash.clone(), tx),
976            );
977        }
978
979        let req = DataRequest {
980            h: hash,
981            htl: BLOB_REQUEST_POLICY.max_htl,
982            q: None,
983        };
984        let wire = encode_request(&req);
985        dc.send(&Bytes::from(wire)).await?;
986
987        debug!(
988            "[Peer {}] Sent request for {}",
989            self.peer_id.short(),
990            &hash_hex[..8.min(hash_hex.len())]
991        );
992
993        match tokio::time::timeout(timeout, rx).await {
994            Ok(Ok(data)) => Ok(data),
995            Ok(Err(_)) => Ok(None),
996            Err(_) => {
997                let mut pending = self.pending_requests.lock().await;
998                pending.remove(hash_hex);
999                Ok(None)
1000            }
1001        }
1002    }
1003
1004    pub async fn query_nostr_events(
1005        &self,
1006        filters: Vec<NostrFilter>,
1007        timeout: std::time::Duration,
1008    ) -> Result<Vec<Event>> {
1009        let dc_guard = self.data_channel.lock().await;
1010        let dc = dc_guard
1011            .as_ref()
1012            .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1013            .clone();
1014        drop(dc_guard);
1015
1016        let subscription_id = NostrSubscriptionId::generate();
1017        let subscription_key = subscription_id.to_string();
1018        let (tx, mut rx) = mpsc::unbounded_channel::<NostrRelayMessage>();
1019
1020        {
1021            let mut pending = self.pending_nostr_queries.lock().await;
1022            pending.insert(subscription_key.clone(), tx);
1023        }
1024
1025        let req = NostrClientMessage::req(subscription_id.clone(), filters);
1026        if let Err(e) = dc.send_text(req.as_json()).await {
1027            let mut pending = self.pending_nostr_queries.lock().await;
1028            pending.remove(&subscription_key);
1029            return Err(e.into());
1030        }
1031        debug!(
1032            "[Peer {}] Sent Nostr REQ subscription {}",
1033            self.peer_id.short(),
1034            subscription_id
1035        );
1036
1037        let mut events = Vec::new();
1038        let deadline = tokio::time::Instant::now() + timeout;
1039
1040        loop {
1041            let now = tokio::time::Instant::now();
1042            if now >= deadline {
1043                break;
1044            }
1045            let remaining = deadline - now;
1046
1047            let next = tokio::time::timeout(remaining, rx.recv()).await;
1048            match next {
1049                Ok(Some(NostrRelayMessage::Event {
1050                    subscription_id: sid,
1051                    event,
1052                })) if sid == subscription_id => {
1053                    debug!(
1054                        "[Peer {}] Received Nostr EVENT for subscription {}",
1055                        self.peer_id.short(),
1056                        subscription_id
1057                    );
1058                    events.push(*event);
1059                }
1060                Ok(Some(NostrRelayMessage::EndOfStoredEvents(sid))) if sid == subscription_id => {
1061                    debug!(
1062                        "[Peer {}] Received Nostr EOSE for subscription {}",
1063                        self.peer_id.short(),
1064                        subscription_id
1065                    );
1066                    break;
1067                }
1068                Ok(Some(NostrRelayMessage::Closed {
1069                    subscription_id: sid,
1070                    message,
1071                })) if sid == subscription_id => {
1072                    warn!(
1073                        "[Peer {}] Nostr query closed for subscription {}: {}",
1074                        self.peer_id.short(),
1075                        subscription_id,
1076                        message
1077                    );
1078                    break;
1079                }
1080                Ok(Some(_)) => {}
1081                Ok(None) => break,
1082                Err(_) => {
1083                    warn!(
1084                        "[Peer {}] Nostr query timed out for subscription {}",
1085                        self.peer_id.short(),
1086                        subscription_id
1087                    );
1088                    break;
1089                }
1090            }
1091        }
1092
1093        let close = NostrClientMessage::close(subscription_id.clone());
1094        let _ = dc.send_text(close.as_json()).await;
1095
1096        let mut pending = self.pending_nostr_queries.lock().await;
1097        pending.remove(&subscription_key);
1098        debug!(
1099            "[Peer {}] Nostr query subscription {} collected {} event(s)",
1100            self.peer_id.short(),
1101            subscription_id,
1102            events.len()
1103        );
1104
1105        Ok(events)
1106    }
1107
1108    pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
1109        let dc_guard = self.data_channel.lock().await;
1110        let dc = dc_guard
1111            .as_ref()
1112            .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1113            .clone();
1114        drop(dc_guard);
1115
1116        let text = serde_json::to_string(frame)?;
1117        dc.send_text(text).await?;
1118        Ok(())
1119    }
1120
1121    pub async fn send_message(&self, msg: &DataMessage) -> Result<()> {
1122        let dc_guard = self.data_channel.lock().await;
1123        if let Some(ref dc) = *dc_guard {
1124            let wire = encode_data_message(msg);
1125            dc.send(&Bytes::from(wire)).await?;
1126        }
1127        Ok(())
1128    }
1129
1130    pub async fn close(&self) -> Result<()> {
1131        {
1132            let dc_guard = self.data_channel.lock().await;
1133            if let Some(ref dc) = *dc_guard {
1134                dc.close().await?;
1135            }
1136        }
1137        self.pc.close().await?;
1138        Ok(())
1139    }
1140}
1141
1142fn hash_to_hex(hash: &[u8]) -> String {
1143    hash_to_key(hash)
1144}
1145
1146fn encode_data_message(msg: &DataMessage) -> Vec<u8> {
1147    match msg {
1148        DataMessage::Request(req) => encode_request(req),
1149        DataMessage::Response(res) => encode_response(res),
1150        DataMessage::QuoteRequest(req) => crate::encode_quote_request(req),
1151        DataMessage::QuoteResponse(res) => encode_quote_response(res),
1152        DataMessage::Payment(req) => crate::encode_payment(req),
1153        DataMessage::PaymentAck(res) => crate::encode_payment_ack(res),
1154        DataMessage::Chunk(chunk) => crate::encode_chunk(chunk),
1155    }
1156}
1157
1158fn relay_subscription_id(msg: &NostrRelayMessage) -> Option<String> {
1159    match msg {
1160        NostrRelayMessage::Event {
1161            subscription_id, ..
1162        } => Some(subscription_id.to_string()),
1163        NostrRelayMessage::EndOfStoredEvents(subscription_id) => Some(subscription_id.to_string()),
1164        NostrRelayMessage::Closed {
1165            subscription_id, ..
1166        } => Some(subscription_id.to_string()),
1167        NostrRelayMessage::Count {
1168            subscription_id, ..
1169        } => Some(subscription_id.to_string()),
1170        _ => None,
1171    }
1172}
1173
1174#[async_trait]
1175impl RoutedPeerLink for Peer {
1176    async fn send(&self, data: Vec<u8>) -> std::result::Result<(), RoutedTransportError> {
1177        let dc = self
1178            .data_channel
1179            .lock()
1180            .await
1181            .as_ref()
1182            .cloned()
1183            .ok_or(RoutedTransportError::NotConnected)?;
1184        dc.send(&Bytes::from(data))
1185            .await
1186            .map(|_| ())
1187            .map_err(|e| RoutedTransportError::SendFailed(e.to_string()))
1188    }
1189
1190    async fn recv(&self) -> Option<Vec<u8>> {
1191        None
1192    }
1193
1194    fn try_recv(&self) -> Option<Vec<u8>> {
1195        None
1196    }
1197
1198    fn is_open(&self) -> bool {
1199        self.has_data_channel()
1200    }
1201
1202    async fn close(&self) {
1203        let _ = Peer::close(self).await;
1204    }
1205}