1use anyhow::Result;
4use async_trait::async_trait;
5use bytes::Bytes;
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::{mpsc, oneshot, Mutex, Notify};
9use tracing::{debug, error, info, warn};
10use webrtc::api::interceptor_registry::register_default_interceptors;
11use webrtc::api::media_engine::MediaEngine;
12use webrtc::api::setting_engine::SettingEngine;
13use webrtc::api::APIBuilder;
14use webrtc::data_channel::data_channel_init::RTCDataChannelInit;
15use webrtc::data_channel::data_channel_message::DataChannelMessage;
16use webrtc::data_channel::data_channel_state::RTCDataChannelState;
17use webrtc::data_channel::RTCDataChannel;
18use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
19use webrtc::ice_transport::ice_server::RTCIceServer;
20use webrtc::interceptor::registry::Registry;
21use webrtc::peer_connection::configuration::RTCConfiguration;
22use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
23use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
24use webrtc::peer_connection::RTCPeerConnection;
25
26use crate::cashu::CashuQuoteState;
27use crate::relay_bridge::SharedMeshRelayClient;
28use crate::runtime_control::PeerStateEvent;
29use crate::runtime_peer::PeerDirection;
30use crate::transport::{PeerLink as RoutedPeerLink, TransportError as RoutedTransportError};
31use crate::types::{
32 validate_mesh_frame, MeshNostrFrame, PeerHTLConfig, PeerId, SignalingMessage,
33 BLOB_REQUEST_POLICY, DATA_CHANNEL_LABEL,
34};
35use crate::{
36 encode_payment_ack, encode_quote_response, encode_request, encode_response, hash_to_key,
37 parse_message, DataChunk, DataMessage, DataRequest, DataResponse,
38};
39use nostr_sdk::nostr::{
40 ClientMessage as NostrClientMessage, Event, Filter as NostrFilter, JsonUtil as NostrJsonUtil,
41 RelayMessage as NostrRelayMessage, SubscriptionId as NostrSubscriptionId,
42};
43
44mod payments;
45
46use payments::{
47 handle_payment_ack_message, handle_payment_message, handle_quote_request_message,
48 process_chunk_message, send_quoted_chunk,
49};
50
51pub trait ContentStore: Send + Sync + 'static {
53 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>>;
54}
55
56pub struct PendingRequest {
58 pub hash: Vec<u8>,
59 pub response_tx: oneshot::Sender<Option<Vec<u8>>>,
60 pub quoted: Option<PendingQuotedRequest>,
61}
62
63pub struct PendingQuotedRequest {
64 pub quote_id: u64,
65 pub mint_url: String,
66 pub total_payment_sat: u64,
67 pub confirmed_payment_sat: u64,
68 pub next_chunk_index: u32,
69 pub total_chunks: Option<u32>,
70 pub assembled_data: Vec<u8>,
71 pub in_flight_payment: Option<PendingChunkPayment>,
72 pub buffered_chunk: Option<DataChunk>,
73}
74
75pub struct PendingChunkPayment {
76 pub chunk_index: u32,
77 pub amount_sat: u64,
78 pub mint_url: String,
79 pub operation_id: String,
80 pub final_chunk: bool,
81}
82
83impl PendingRequest {
84 pub fn standard(hash: Vec<u8>, response_tx: oneshot::Sender<Option<Vec<u8>>>) -> Self {
85 Self {
86 hash,
87 response_tx,
88 quoted: None,
89 }
90 }
91
92 pub fn quoted(
93 hash: Vec<u8>,
94 response_tx: oneshot::Sender<Option<Vec<u8>>>,
95 quote_id: u64,
96 mint_url: String,
97 total_payment_sat: u64,
98 ) -> Self {
99 Self {
100 hash,
101 response_tx,
102 quoted: Some(PendingQuotedRequest {
103 quote_id,
104 mint_url,
105 total_payment_sat,
106 confirmed_payment_sat: 0,
107 next_chunk_index: 0,
108 total_chunks: None,
109 assembled_data: Vec::new(),
110 in_flight_payment: None,
111 buffered_chunk: None,
112 }),
113 }
114 }
115}
116
117pub struct Peer {
119 pub peer_id: PeerId,
120 pub direction: PeerDirection,
121 pub created_at: std::time::Instant,
122 pub connected_at: Option<std::time::Instant>,
123
124 pc: Arc<RTCPeerConnection>,
125 pub data_channel: Arc<Mutex<Option<Arc<RTCDataChannel>>>>,
126 signaling_tx: mpsc::Sender<SignalingMessage>,
127 my_peer_id: PeerId,
128 store: Option<Arc<dyn ContentStore>>,
129 pub pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
130 pending_nostr_queries: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>>,
131 #[allow(dead_code)]
132 message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
133 #[allow(dead_code)]
134 message_rx: Option<mpsc::Receiver<(DataMessage, Option<Vec<u8>>)>>,
135 state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
136 nostr_relay: Option<SharedMeshRelayClient>,
137 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
138 cashu_quotes: Option<Arc<CashuQuoteState>>,
139 htl_config: PeerHTLConfig,
140}
141
142impl Peer {
143 pub async fn new(
144 peer_id: PeerId,
145 direction: PeerDirection,
146 my_peer_id: PeerId,
147 signaling_tx: mpsc::Sender<SignalingMessage>,
148 stun_servers: Vec<String>,
149 ) -> Result<Self> {
150 Self::new_with_store_and_events(
151 peer_id,
152 direction,
153 my_peer_id,
154 signaling_tx,
155 stun_servers,
156 None,
157 None,
158 None,
159 None,
160 None,
161 )
162 .await
163 }
164
165 pub async fn new_with_store(
166 peer_id: PeerId,
167 direction: PeerDirection,
168 my_peer_id: PeerId,
169 signaling_tx: mpsc::Sender<SignalingMessage>,
170 stun_servers: Vec<String>,
171 store: Option<Arc<dyn ContentStore>>,
172 ) -> Result<Self> {
173 Self::new_with_store_and_events(
174 peer_id,
175 direction,
176 my_peer_id,
177 signaling_tx,
178 stun_servers,
179 store,
180 None,
181 None,
182 None,
183 None,
184 )
185 .await
186 }
187
188 #[allow(clippy::too_many_arguments)]
189 pub async fn new_with_store_and_events(
190 peer_id: PeerId,
191 direction: PeerDirection,
192 my_peer_id: PeerId,
193 signaling_tx: mpsc::Sender<SignalingMessage>,
194 stun_servers: Vec<String>,
195 store: Option<Arc<dyn ContentStore>>,
196 state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
197 nostr_relay: Option<SharedMeshRelayClient>,
198 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
199 cashu_quotes: Option<Arc<CashuQuoteState>>,
200 ) -> Result<Self> {
201 let mut m = MediaEngine::default();
202 m.register_default_codecs()?;
203
204 let mut registry = Registry::new();
205 registry = register_default_interceptors(registry, &mut m)?;
206
207 let setting_engine = SettingEngine::default();
208 let api = APIBuilder::new()
209 .with_media_engine(m)
210 .with_interceptor_registry(registry)
211 .with_setting_engine(setting_engine)
212 .build();
213
214 let ice_servers: Vec<RTCIceServer> = stun_servers
215 .iter()
216 .map(|url| RTCIceServer {
217 urls: vec![url.clone()],
218 ..Default::default()
219 })
220 .collect();
221
222 let config = RTCConfiguration {
223 ice_servers,
224 ..Default::default()
225 };
226
227 let pc = Arc::new(api.new_peer_connection(config).await?);
228 let (message_tx, message_rx) = mpsc::channel(100);
229 Ok(Self {
230 peer_id,
231 direction,
232 created_at: std::time::Instant::now(),
233 connected_at: None,
234 pc,
235 data_channel: Arc::new(Mutex::new(None)),
236 signaling_tx,
237 my_peer_id,
238 store,
239 pending_requests: Arc::new(Mutex::new(HashMap::new())),
240 pending_nostr_queries: Arc::new(Mutex::new(HashMap::new())),
241 message_tx,
242 message_rx: Some(message_rx),
243 state_event_tx,
244 nostr_relay,
245 mesh_frame_tx,
246 cashu_quotes,
247 htl_config: PeerHTLConfig::random(),
248 })
249 }
250
251 pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
252 self.store = Some(store);
253 }
254
255 pub fn state(&self) -> RTCPeerConnectionState {
256 self.pc.connection_state()
257 }
258
259 pub fn signaling_state(&self) -> webrtc::peer_connection::signaling_state::RTCSignalingState {
260 self.pc.signaling_state()
261 }
262
263 pub fn is_connected(&self) -> bool {
264 self.pc.connection_state() == RTCPeerConnectionState::Connected
265 }
266
267 pub fn htl_config(&self) -> &PeerHTLConfig {
268 &self.htl_config
269 }
270
271 pub async fn setup_handlers(&self) -> Result<()> {
272 let peer_id = self.peer_id.clone();
273 let signaling_tx = self.signaling_tx.clone();
274 let my_peer_id_str = self.my_peer_id.to_string();
275 let target_peer_id = self.peer_id.to_string();
276
277 self.pc
278 .on_ice_candidate(Box::new(move |candidate: Option<RTCIceCandidate>| {
279 let signaling_tx = signaling_tx.clone();
280 let my_peer_id_str = my_peer_id_str.clone();
281 let target_peer_id = target_peer_id.clone();
282
283 Box::pin(async move {
284 if let Some(c) = candidate {
285 if let Ok(init) = c.to_json() {
286 info!(
287 "ICE candidate generated: {}",
288 &init.candidate[..init.candidate.len().min(60)]
289 );
290 let msg = SignalingMessage::Candidate {
291 peer_id: my_peer_id_str.clone(),
292 target_peer_id: target_peer_id.clone(),
293 candidate: init.candidate,
294 sdp_m_line_index: init.sdp_mline_index,
295 sdp_mid: init.sdp_mid,
296 };
297 if let Err(e) = signaling_tx.send(msg).await {
298 error!("Failed to send ICE candidate: {}", e);
299 }
300 }
301 }
302 })
303 }));
304
305 let peer_id_log = peer_id.clone();
306 let state_event_tx = self.state_event_tx.clone();
307 self.pc
308 .on_peer_connection_state_change(Box::new(move |state: RTCPeerConnectionState| {
309 let peer_id = peer_id_log.clone();
310 let state_event_tx = state_event_tx.clone();
311 Box::pin(async move {
312 info!("Peer {} connection state: {:?}", peer_id.short(), state);
313
314 if let Some(tx) = state_event_tx {
315 let event = match state {
316 RTCPeerConnectionState::Connected => {
317 Some(PeerStateEvent::Connected(peer_id))
318 }
319 RTCPeerConnectionState::Failed => Some(PeerStateEvent::Failed(peer_id)),
320 RTCPeerConnectionState::Disconnected
321 | RTCPeerConnectionState::Closed => {
322 Some(PeerStateEvent::Disconnected(peer_id))
323 }
324 _ => None,
325 };
326 if let Some(event) = event {
327 if let Err(e) = tx.send(event).await {
328 error!("Failed to send peer state event: {}", e);
329 }
330 }
331 }
332 })
333 }));
334
335 Ok(())
336 }
337
338 pub async fn connect(&self) -> Result<serde_json::Value> {
339 let dc_init = RTCDataChannelInit {
340 ordered: Some(false),
341 ..Default::default()
342 };
343 let dc = self
344 .pc
345 .create_data_channel(DATA_CHANNEL_LABEL, Some(dc_init))
346 .await?;
347 self.setup_data_channel(dc.clone()).await?;
348 {
349 let mut dc_guard = self.data_channel.lock().await;
350 *dc_guard = Some(dc);
351 }
352
353 let offer = self.pc.create_offer(None).await?;
354 let mut gathering_complete = self.pc.gathering_complete_promise().await;
355 self.pc.set_local_description(offer).await?;
356
357 let _ = tokio::time::timeout(
358 std::time::Duration::from_secs(10),
359 gathering_complete.recv(),
360 )
361 .await;
362
363 let local_desc = self
364 .pc
365 .local_description()
366 .await
367 .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
368
369 debug!(
370 "Offer created, SDP len: {}, ice_gathering: {:?}",
371 local_desc.sdp.len(),
372 self.pc.ice_gathering_state()
373 );
374
375 Ok(serde_json::json!({
376 "type": local_desc.sdp_type.to_string().to_lowercase(),
377 "sdp": local_desc.sdp
378 }))
379 }
380
381 pub async fn handle_offer(&self, offer: serde_json::Value) -> Result<serde_json::Value> {
382 let sdp = offer
383 .get("sdp")
384 .and_then(|s| s.as_str())
385 .ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
386
387 let peer_id = self.peer_id.clone();
388 let message_tx = self.message_tx.clone();
389 let pending_requests = self.pending_requests.clone();
390 let pending_nostr_queries = self.pending_nostr_queries.clone();
391 let store = self.store.clone();
392 let data_channel_holder = self.data_channel.clone();
393 let nostr_relay = self.nostr_relay.clone();
394 let mesh_frame_tx = self.mesh_frame_tx.clone();
395 let cashu_quotes = self.cashu_quotes.clone();
396 let peer_pubkey = Some(self.peer_id.pubkey.clone());
397
398 self.pc
399 .on_data_channel(Box::new(move |dc: Arc<RTCDataChannel>| {
400 let peer_id = peer_id.clone();
401 let message_tx = message_tx.clone();
402 let pending_requests = pending_requests.clone();
403 let pending_nostr_queries = pending_nostr_queries.clone();
404 let store = store.clone();
405 let data_channel_holder = data_channel_holder.clone();
406 let nostr_relay = nostr_relay.clone();
407 let mesh_frame_tx = mesh_frame_tx.clone();
408 let cashu_quotes = cashu_quotes.clone();
409 let peer_pubkey = peer_pubkey.clone();
410
411 Box::pin(async move {
412 info!(
413 "Peer {} received data channel: {}",
414 peer_id.short(),
415 dc.label()
416 );
417
418 {
419 let mut dc_guard = data_channel_holder.lock().await;
420 *dc_guard = Some(dc.clone());
421 }
422
423 Self::setup_dc_handlers(
424 dc.clone(),
425 peer_id,
426 message_tx,
427 pending_requests,
428 pending_nostr_queries.clone(),
429 store,
430 nostr_relay,
431 mesh_frame_tx,
432 cashu_quotes,
433 peer_pubkey,
434 )
435 .await;
436 })
437 }));
438
439 let offer_desc = RTCSessionDescription::offer(sdp.to_string())?;
440 self.pc.set_remote_description(offer_desc).await?;
441
442 let answer = self.pc.create_answer(None).await?;
443 let mut gathering_complete = self.pc.gathering_complete_promise().await;
444 self.pc.set_local_description(answer).await?;
445
446 let _ = tokio::time::timeout(
447 std::time::Duration::from_secs(10),
448 gathering_complete.recv(),
449 )
450 .await;
451
452 let local_desc = self
453 .pc
454 .local_description()
455 .await
456 .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
457
458 debug!(
459 "Answer created, SDP len: {}, ice_gathering: {:?}",
460 local_desc.sdp.len(),
461 self.pc.ice_gathering_state()
462 );
463
464 Ok(serde_json::json!({
465 "type": local_desc.sdp_type.to_string().to_lowercase(),
466 "sdp": local_desc.sdp
467 }))
468 }
469
470 pub async fn handle_answer(&self, answer: serde_json::Value) -> Result<()> {
471 let sdp = answer
472 .get("sdp")
473 .and_then(|s| s.as_str())
474 .ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
475
476 let answer_desc = RTCSessionDescription::answer(sdp.to_string())?;
477 self.pc.set_remote_description(answer_desc).await?;
478 Ok(())
479 }
480
481 pub async fn handle_candidate(&self, candidate: serde_json::Value) -> Result<()> {
482 let candidate_str = candidate
483 .get("candidate")
484 .and_then(|c| c.as_str())
485 .unwrap_or("");
486
487 let sdp_mid = candidate
488 .get("sdpMid")
489 .and_then(|m| m.as_str())
490 .map(|s| s.to_string());
491
492 let sdp_mline_index = candidate
493 .get("sdpMLineIndex")
494 .and_then(|i| i.as_u64())
495 .map(|i| i as u16);
496
497 if !candidate_str.is_empty() {
498 use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
499 let init = RTCIceCandidateInit {
500 candidate: candidate_str.to_string(),
501 sdp_mid,
502 sdp_mline_index,
503 username_fragment: candidate
504 .get("usernameFragment")
505 .and_then(|u| u.as_str())
506 .map(|s| s.to_string()),
507 };
508 self.pc.add_ice_candidate(init).await?;
509 }
510
511 Ok(())
512 }
513
514 async fn setup_data_channel(&self, dc: Arc<RTCDataChannel>) -> Result<()> {
515 let peer_id = self.peer_id.clone();
516 let message_tx = self.message_tx.clone();
517 let pending_requests = self.pending_requests.clone();
518 let store = self.store.clone();
519 let nostr_relay = self.nostr_relay.clone();
520 let mesh_frame_tx = self.mesh_frame_tx.clone();
521 let cashu_quotes = self.cashu_quotes.clone();
522 let peer_pubkey = Some(self.peer_id.pubkey.clone());
523
524 Self::setup_dc_handlers(
525 dc,
526 peer_id,
527 message_tx,
528 pending_requests,
529 self.pending_nostr_queries.clone(),
530 store,
531 nostr_relay,
532 mesh_frame_tx,
533 cashu_quotes,
534 peer_pubkey,
535 )
536 .await;
537 Ok(())
538 }
539
540 #[allow(clippy::too_many_arguments)]
541 async fn setup_dc_handlers(
542 dc: Arc<RTCDataChannel>,
543 peer_id: PeerId,
544 message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
545 pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
546 pending_nostr_queries: Arc<
547 Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>,
548 >,
549 store: Option<Arc<dyn ContentStore>>,
550 nostr_relay: Option<SharedMeshRelayClient>,
551 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
552 cashu_quotes: Option<Arc<CashuQuoteState>>,
553 peer_pubkey: Option<String>,
554 ) {
555 let label = dc.label().to_string();
556 let peer_short = peer_id.short();
557 let _pending_binary: Arc<Mutex<Option<u32>>> = Arc::new(Mutex::new(None));
558
559 let open_notify = nostr_relay.as_ref().map(|_| Arc::new(Notify::new()));
560 if let Some(ref notify) = open_notify {
561 if dc.ready_state() == RTCDataChannelState::Open {
562 notify.notify_one();
563 }
564 }
565
566 let mut nostr_client_id: Option<u64> = None;
567 if let Some(relay) = nostr_relay.clone() {
568 let client_id = relay.next_client_id();
569 let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
570 relay
571 .register_client(client_id, nostr_tx, peer_pubkey.clone())
572 .await;
573 nostr_client_id = Some(client_id);
574
575 if let Some(notify) = open_notify.clone() {
576 let dc_for_send = dc.clone();
577 tokio::spawn(async move {
578 notify.notified().await;
579 while let Some(text) = nostr_rx.recv().await {
580 if dc_for_send.send_text(text).await.is_err() {
581 break;
582 }
583 }
584 });
585 }
586 }
587
588 if let (Some(relay), Some(client_id)) = (nostr_relay.clone(), nostr_client_id) {
589 dc.on_close(Box::new(move || {
590 let relay = relay.clone();
591 Box::pin(async move {
592 relay.unregister_client(client_id).await;
593 })
594 }));
595 }
596
597 let open_notify_clone = open_notify.clone();
598 let peer_short_open = peer_short.clone();
599 let label_clone = label.clone();
600 dc.on_open(Box::new(move || {
601 let peer_short_open = peer_short_open.clone();
602 let label_clone = label_clone.clone();
603 let open_notify = open_notify_clone.clone();
604 Box::pin(async move {
605 info!(
606 "[Peer {}] Data channel '{}' open",
607 peer_short_open, label_clone
608 );
609 if let Some(notify) = open_notify {
610 notify.notify_one();
611 }
612 })
613 }));
614
615 let dc_for_msg = dc.clone();
616 let peer_short_msg = peer_short.clone();
617 let _pending_binary_clone = _pending_binary.clone();
618 let store_clone = store.clone();
619 let nostr_relay_for_msg = nostr_relay.clone();
620 let nostr_client_id_for_msg = nostr_client_id;
621 let pending_nostr_queries_for_msg = pending_nostr_queries.clone();
622 let mesh_frame_tx_for_msg = mesh_frame_tx.clone();
623 let peer_id_for_msg = peer_id.clone();
624
625 dc.on_message(Box::new(move |msg: DataChannelMessage| {
626 let dc = dc_for_msg.clone();
627 let peer_short = peer_short_msg.clone();
628 let pending_requests = pending_requests.clone();
629 let _pending_binary = _pending_binary_clone.clone();
630 let _message_tx = message_tx.clone();
631 let store = store_clone.clone();
632 let nostr_relay = nostr_relay_for_msg.clone();
633 let nostr_client_id = nostr_client_id_for_msg;
634 let pending_nostr_queries = pending_nostr_queries_for_msg.clone();
635 let mesh_frame_tx = mesh_frame_tx_for_msg.clone();
636 let cashu_quotes = cashu_quotes.clone();
637 let peer_id = peer_id_for_msg.clone();
638 let msg_data = msg.data.clone();
639
640 Box::pin(async move {
641 if msg.is_string {
642 if let Ok(text) = std::str::from_utf8(&msg_data) {
643 if let Ok(mesh_frame) = serde_json::from_str::<MeshNostrFrame>(text) {
644 match validate_mesh_frame(&mesh_frame) {
645 Ok(()) => {
646 if let Some(tx) = mesh_frame_tx {
647 let _ = tx.send((peer_id.clone(), mesh_frame)).await;
648 }
649 return;
650 }
651 Err(reason) => {
652 debug!(
653 "[Peer {}] Ignoring invalid mesh frame: {}",
654 peer_short, reason
655 );
656 }
657 }
658 }
659
660 if let Ok(relay_msg) = NostrRelayMessage::from_json(text) {
661 if let Some(sub_id) = relay_subscription_id(&relay_msg) {
662 let sender = {
663 let pending = pending_nostr_queries.lock().await;
664 pending.get(&sub_id).cloned()
665 };
666 if let Some(tx) = sender {
667 debug!(
668 "[Peer {}] Routed Nostr relay message for subscription {}",
669 peer_short, sub_id
670 );
671 let _ = tx.send(relay_msg);
672 return;
673 } else {
674 debug!(
675 "[Peer {}] Dropping Nostr relay message for unknown subscription {}",
676 peer_short, sub_id
677 );
678 }
679 }
680 }
681
682 if let Some(relay) = nostr_relay {
683 if let Ok(nostr_msg) = NostrClientMessage::from_json(text) {
684 if let Some(client_id) = nostr_client_id {
685 relay.handle_client_message(client_id, nostr_msg).await;
686 }
687 }
688 }
689 }
690 return;
691 }
692
693 debug!(
694 "[Peer {}] Received {} bytes on data channel",
695 peer_short,
696 msg_data.len()
697 );
698 match parse_message(&msg_data) {
699 Some(data_msg) => match data_msg {
700 DataMessage::Request(req) => {
701 let hash_hex = hash_to_hex(&req.h);
702 let hash_short = &hash_hex[..8.min(hash_hex.len())];
703 info!("[Peer {}] Received request for {}", peer_short, hash_short);
704
705 if let Some(cashu_quotes) = cashu_quotes.as_ref() {
706 if cashu_quotes
707 .should_refuse_requests_from_peer(&peer_id.to_string())
708 .await
709 {
710 info!(
711 "[Peer {}] Refusing request from peer with unpaid defaults",
712 peer_short
713 );
714 return;
715 }
716 }
717
718 let quoted_settlement = if let Some(quote_id) = req.q {
719 let Some(cashu_quotes) = cashu_quotes.as_ref() else {
720 info!(
721 "[Peer {}] Ignoring quoted request without Cashu settlement state",
722 peer_short
723 );
724 return;
725 };
726 match cashu_quotes
727 .take_valid_quote(&peer_id.to_string(), &req.h, quote_id)
728 .await
729 {
730 Some(settlement) => Some((quote_id, settlement)),
731 None => {
732 info!(
733 "[Peer {}] Ignoring request with invalid or expired quote {}",
734 peer_short, quote_id
735 );
736 return;
737 }
738 }
739 } else {
740 None
741 };
742
743 let data = if let Some(ref store) = store {
744 match store.get(&hash_hex) {
745 Ok(Some(data)) => {
746 info!(
747 "[Peer {}] Found {} in store ({} bytes)",
748 peer_short,
749 hash_short,
750 data.len()
751 );
752 Some(data)
753 }
754 Ok(None) => {
755 info!(
756 "[Peer {}] Hash {} not in store",
757 peer_short, hash_short
758 );
759 None
760 }
761 Err(e) => {
762 warn!("[Peer {}] Store error: {}", peer_short, e);
763 None
764 }
765 }
766 } else {
767 warn!(
768 "[Peer {}] No store configured - cannot serve requests",
769 peer_short
770 );
771 None
772 };
773
774 if let Some(data) = data {
775 let data_len = data.len();
776 if let (Some(cashu_quotes), Some((quote_id, settlement))) =
777 (cashu_quotes.as_ref(), quoted_settlement)
778 {
779 match cashu_quotes
780 .prepare_quoted_transfer(
781 &peer_id.to_string(),
782 &req.h,
783 quote_id,
784 &settlement,
785 data,
786 )
787 .await
788 {
789 Some((first_chunk, first_expected)) => {
790 if send_quoted_chunk(
791 &dc,
792 &peer_id,
793 &peer_short,
794 cashu_quotes,
795 first_chunk,
796 first_expected,
797 )
798 .await
799 {
800 info!(
801 "[Peer {}] Started quoted chunked response for {} ({} bytes)",
802 peer_short, hash_short, data_len
803 );
804 }
805 }
806 None => {
807 warn!(
808 "[Peer {}] Failed to prepare quoted transfer for {}",
809 peer_short, hash_short
810 );
811 }
812 }
813 } else {
814 let response = DataResponse {
815 h: req.h,
816 d: data,
817 i: None,
818 n: None,
819 };
820 let wire = encode_response(&response);
821 if let Err(e) = dc.send(&Bytes::from(wire)).await {
822 error!(
823 "[Peer {}] Failed to send response: {}",
824 peer_short, e
825 );
826 } else {
827 info!(
828 "[Peer {}] Sent response for {} ({} bytes)",
829 peer_short, hash_short, data_len
830 );
831 }
832 }
833 } else {
834 info!("[Peer {}] Content not found for {}", peer_short, hash_short);
835 }
836 }
837 DataMessage::Response(res) => {
838 let hash_hex = hash_to_hex(&res.h);
839 let hash_short = &hash_hex[..8.min(hash_hex.len())];
840 debug!(
841 "[Peer {}] Received response for {} ({} bytes)",
842 peer_short,
843 hash_short,
844 res.d.len()
845 );
846
847 let mut pending = pending_requests.lock().await;
848 if let Some(req) = pending.remove(&hash_hex) {
849 let _ = req.response_tx.send(Some(res.d));
850 }
851 }
852 DataMessage::QuoteRequest(req) => {
853 let response = handle_quote_request_message(
854 &peer_short,
855 &peer_id,
856 &store,
857 cashu_quotes.as_ref(),
858 &req,
859 )
860 .await;
861 if let Some(response) = response {
862 let wire = encode_quote_response(&response);
863 if let Err(e) = dc.send(&Bytes::from(wire)).await {
864 warn!(
865 "[Peer {}] Failed to send quote response: {}",
866 peer_short, e
867 );
868 }
869 }
870 }
871 DataMessage::QuoteResponse(res) => {
872 if let Some(cashu_quotes) = cashu_quotes.as_ref() {
873 let _ = cashu_quotes
874 .handle_quote_response(&peer_id.to_string(), res)
875 .await;
876 }
877 }
878 DataMessage::Chunk(chunk) => {
879 process_chunk_message(
880 &peer_short,
881 &peer_id,
882 &dc,
883 &pending_requests,
884 cashu_quotes.as_ref(),
885 chunk,
886 )
887 .await;
888 }
889 DataMessage::Payment(req) => {
890 let outcome =
891 handle_payment_message(&peer_id, cashu_quotes.as_ref(), &req).await;
892 let wire = encode_payment_ack(&outcome.ack);
893 if let Err(e) = dc.send(&Bytes::from(wire)).await {
894 warn!(
895 "[Peer {}] Failed to send payment ack: {}",
896 peer_short, e
897 );
898 }
899 if let (Some(cashu_quotes), Some((next_chunk, next_expected))) =
900 (cashu_quotes.as_ref(), outcome.next_chunk)
901 {
902 let _ = send_quoted_chunk(
903 &dc,
904 &peer_id,
905 &peer_short,
906 cashu_quotes,
907 next_chunk,
908 next_expected,
909 )
910 .await;
911 }
912 }
913 DataMessage::PaymentAck(res) => {
914 handle_payment_ack_message(
915 &peer_short,
916 &peer_id,
917 &dc,
918 &pending_requests,
919 cashu_quotes.as_ref(),
920 res,
921 )
922 .await;
923 }
924 },
925 None => {
926 warn!("[Peer {}] Failed to parse message", peer_short);
927 let hex_dump: String = msg_data
928 .iter()
929 .take(50)
930 .map(|b| format!("{:02x}", b))
931 .collect();
932 warn!("[Peer {}] Message hex: {}", peer_short, hex_dump);
933 }
934 }
935 })
936 }));
937 }
938
939 pub fn has_data_channel(&self) -> bool {
940 self.data_channel
941 .try_lock()
942 .map(|guard| {
943 guard
944 .as_ref()
945 .map(|dc| dc.ready_state() == RTCDataChannelState::Open)
946 .unwrap_or(false)
947 })
948 .unwrap_or(false)
949 }
950
951 pub async fn request(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
952 self.request_with_timeout(hash_hex, std::time::Duration::from_secs(10))
953 .await
954 }
955
956 pub async fn request_with_timeout(
957 &self,
958 hash_hex: &str,
959 timeout: std::time::Duration,
960 ) -> Result<Option<Vec<u8>>> {
961 let dc_guard = self.data_channel.lock().await;
962 let dc = dc_guard
963 .as_ref()
964 .ok_or_else(|| anyhow::anyhow!("No data channel"))?
965 .clone();
966 drop(dc_guard);
967
968 let hash = hex::decode(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hex hash: {}", e))?;
969 let (tx, rx) = oneshot::channel();
970
971 {
972 let mut pending = self.pending_requests.lock().await;
973 pending.insert(
974 hash_hex.to_string(),
975 PendingRequest::standard(hash.clone(), tx),
976 );
977 }
978
979 let req = DataRequest {
980 h: hash,
981 htl: BLOB_REQUEST_POLICY.max_htl,
982 q: None,
983 };
984 let wire = encode_request(&req);
985 dc.send(&Bytes::from(wire)).await?;
986
987 debug!(
988 "[Peer {}] Sent request for {}",
989 self.peer_id.short(),
990 &hash_hex[..8.min(hash_hex.len())]
991 );
992
993 match tokio::time::timeout(timeout, rx).await {
994 Ok(Ok(data)) => Ok(data),
995 Ok(Err(_)) => Ok(None),
996 Err(_) => {
997 let mut pending = self.pending_requests.lock().await;
998 pending.remove(hash_hex);
999 Ok(None)
1000 }
1001 }
1002 }
1003
1004 pub async fn query_nostr_events(
1005 &self,
1006 filters: Vec<NostrFilter>,
1007 timeout: std::time::Duration,
1008 ) -> Result<Vec<Event>> {
1009 let dc_guard = self.data_channel.lock().await;
1010 let dc = dc_guard
1011 .as_ref()
1012 .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1013 .clone();
1014 drop(dc_guard);
1015
1016 let subscription_id = NostrSubscriptionId::generate();
1017 let subscription_key = subscription_id.to_string();
1018 let (tx, mut rx) = mpsc::unbounded_channel::<NostrRelayMessage>();
1019
1020 {
1021 let mut pending = self.pending_nostr_queries.lock().await;
1022 pending.insert(subscription_key.clone(), tx);
1023 }
1024
1025 let req = NostrClientMessage::req(subscription_id.clone(), filters);
1026 if let Err(e) = dc.send_text(req.as_json()).await {
1027 let mut pending = self.pending_nostr_queries.lock().await;
1028 pending.remove(&subscription_key);
1029 return Err(e.into());
1030 }
1031 debug!(
1032 "[Peer {}] Sent Nostr REQ subscription {}",
1033 self.peer_id.short(),
1034 subscription_id
1035 );
1036
1037 let mut events = Vec::new();
1038 let deadline = tokio::time::Instant::now() + timeout;
1039
1040 loop {
1041 let now = tokio::time::Instant::now();
1042 if now >= deadline {
1043 break;
1044 }
1045 let remaining = deadline - now;
1046
1047 let next = tokio::time::timeout(remaining, rx.recv()).await;
1048 match next {
1049 Ok(Some(NostrRelayMessage::Event {
1050 subscription_id: sid,
1051 event,
1052 })) if sid == subscription_id => {
1053 debug!(
1054 "[Peer {}] Received Nostr EVENT for subscription {}",
1055 self.peer_id.short(),
1056 subscription_id
1057 );
1058 events.push(*event);
1059 }
1060 Ok(Some(NostrRelayMessage::EndOfStoredEvents(sid))) if sid == subscription_id => {
1061 debug!(
1062 "[Peer {}] Received Nostr EOSE for subscription {}",
1063 self.peer_id.short(),
1064 subscription_id
1065 );
1066 break;
1067 }
1068 Ok(Some(NostrRelayMessage::Closed {
1069 subscription_id: sid,
1070 message,
1071 })) if sid == subscription_id => {
1072 warn!(
1073 "[Peer {}] Nostr query closed for subscription {}: {}",
1074 self.peer_id.short(),
1075 subscription_id,
1076 message
1077 );
1078 break;
1079 }
1080 Ok(Some(_)) => {}
1081 Ok(None) => break,
1082 Err(_) => {
1083 warn!(
1084 "[Peer {}] Nostr query timed out for subscription {}",
1085 self.peer_id.short(),
1086 subscription_id
1087 );
1088 break;
1089 }
1090 }
1091 }
1092
1093 let close = NostrClientMessage::close(subscription_id.clone());
1094 let _ = dc.send_text(close.as_json()).await;
1095
1096 let mut pending = self.pending_nostr_queries.lock().await;
1097 pending.remove(&subscription_key);
1098 debug!(
1099 "[Peer {}] Nostr query subscription {} collected {} event(s)",
1100 self.peer_id.short(),
1101 subscription_id,
1102 events.len()
1103 );
1104
1105 Ok(events)
1106 }
1107
1108 pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
1109 let dc_guard = self.data_channel.lock().await;
1110 let dc = dc_guard
1111 .as_ref()
1112 .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1113 .clone();
1114 drop(dc_guard);
1115
1116 let text = serde_json::to_string(frame)?;
1117 dc.send_text(text).await?;
1118 Ok(())
1119 }
1120
1121 pub async fn send_message(&self, msg: &DataMessage) -> Result<()> {
1122 let dc_guard = self.data_channel.lock().await;
1123 if let Some(ref dc) = *dc_guard {
1124 let wire = encode_data_message(msg);
1125 dc.send(&Bytes::from(wire)).await?;
1126 }
1127 Ok(())
1128 }
1129
1130 pub async fn close(&self) -> Result<()> {
1131 {
1132 let dc_guard = self.data_channel.lock().await;
1133 if let Some(ref dc) = *dc_guard {
1134 dc.close().await?;
1135 }
1136 }
1137 self.pc.close().await?;
1138 Ok(())
1139 }
1140}
1141
1142fn hash_to_hex(hash: &[u8]) -> String {
1143 hash_to_key(hash)
1144}
1145
1146fn encode_data_message(msg: &DataMessage) -> Vec<u8> {
1147 match msg {
1148 DataMessage::Request(req) => encode_request(req),
1149 DataMessage::Response(res) => encode_response(res),
1150 DataMessage::QuoteRequest(req) => crate::encode_quote_request(req),
1151 DataMessage::QuoteResponse(res) => encode_quote_response(res),
1152 DataMessage::Payment(req) => crate::encode_payment(req),
1153 DataMessage::PaymentAck(res) => crate::encode_payment_ack(res),
1154 DataMessage::Chunk(chunk) => crate::encode_chunk(chunk),
1155 }
1156}
1157
1158fn relay_subscription_id(msg: &NostrRelayMessage) -> Option<String> {
1159 match msg {
1160 NostrRelayMessage::Event {
1161 subscription_id, ..
1162 } => Some(subscription_id.to_string()),
1163 NostrRelayMessage::EndOfStoredEvents(subscription_id) => Some(subscription_id.to_string()),
1164 NostrRelayMessage::Closed {
1165 subscription_id, ..
1166 } => Some(subscription_id.to_string()),
1167 NostrRelayMessage::Count {
1168 subscription_id, ..
1169 } => Some(subscription_id.to_string()),
1170 _ => None,
1171 }
1172}
1173
1174#[async_trait]
1175impl RoutedPeerLink for Peer {
1176 async fn send(&self, data: Vec<u8>) -> std::result::Result<(), RoutedTransportError> {
1177 let dc = self
1178 .data_channel
1179 .lock()
1180 .await
1181 .as_ref()
1182 .cloned()
1183 .ok_or(RoutedTransportError::NotConnected)?;
1184 dc.send(&Bytes::from(data))
1185 .await
1186 .map(|_| ())
1187 .map_err(|e| RoutedTransportError::SendFailed(e.to_string()))
1188 }
1189
1190 async fn recv(&self) -> Option<Vec<u8>> {
1191 None
1192 }
1193
1194 fn try_recv(&self) -> Option<Vec<u8>> {
1195 None
1196 }
1197
1198 fn is_open(&self) -> bool {
1199 self.has_data_channel()
1200 }
1201
1202 async fn close(&self) {
1203 let _ = Peer::close(self).await;
1204 }
1205}