1use anyhow::Result;
4use async_trait::async_trait;
5use bytes::Bytes;
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::{mpsc, oneshot, Mutex, Notify};
9use tracing::{debug, error, info, warn};
10use webrtc::api::interceptor_registry::register_default_interceptors;
11use webrtc::api::media_engine::MediaEngine;
12use webrtc::api::setting_engine::SettingEngine;
13use webrtc::api::APIBuilder;
14use webrtc::data_channel::data_channel_init::RTCDataChannelInit;
15use webrtc::data_channel::data_channel_message::DataChannelMessage;
16use webrtc::data_channel::data_channel_state::RTCDataChannelState;
17use webrtc::data_channel::RTCDataChannel;
18use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
19use webrtc::ice_transport::ice_server::RTCIceServer;
20use webrtc::interceptor::registry::Registry;
21use webrtc::peer_connection::configuration::RTCConfiguration;
22use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
23use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
24use webrtc::peer_connection::RTCPeerConnection;
25
26use crate::cashu::CashuQuoteState;
27use crate::relay_bridge::SharedMeshRelayClient;
28use crate::runtime_control::PeerStateEvent;
29use crate::runtime_peer::PeerDirection;
30use crate::transport::{PeerLink as RoutedPeerLink, TransportError as RoutedTransportError};
31use crate::types::{
32 validate_mesh_frame, MeshNostrFrame, PeerHTLConfig, PeerId, SignalingMessage,
33 BLOB_REQUEST_POLICY, DATA_CHANNEL_LABEL,
34};
35use crate::{
36 encode_payment_ack, encode_peer_hints, encode_quote_response, encode_request, encode_response,
37 hash_to_key, parse_message, DataChunk, DataMessage, DataRequest, DataResponse, PeerHints,
38};
39use nostr_sdk::nostr::{
40 ClientMessage as NostrClientMessage, Event, Filter as NostrFilter, JsonUtil as NostrJsonUtil,
41 RelayMessage as NostrRelayMessage, SubscriptionId as NostrSubscriptionId,
42};
43
44mod payments;
45
46use payments::{
47 handle_payment_ack_message, handle_payment_message, handle_quote_request_message,
48 process_chunk_message, send_quoted_chunk,
49};
50
51pub trait ContentStore: Send + Sync + 'static {
53 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>>;
54}
55
56pub struct PendingRequest {
58 pub hash: Vec<u8>,
59 pub response_tx: oneshot::Sender<Option<Vec<u8>>>,
60 pub quoted: Option<PendingQuotedRequest>,
61}
62
63pub struct PendingQuotedRequest {
64 pub quote_id: u64,
65 pub mint_url: String,
66 pub total_payment_sat: u64,
67 pub confirmed_payment_sat: u64,
68 pub next_chunk_index: u32,
69 pub total_chunks: Option<u32>,
70 pub assembled_data: Vec<u8>,
71 pub in_flight_payment: Option<PendingChunkPayment>,
72 pub buffered_chunk: Option<DataChunk>,
73}
74
75pub struct PendingChunkPayment {
76 pub chunk_index: u32,
77 pub amount_sat: u64,
78 pub mint_url: String,
79 pub operation_id: String,
80 pub final_chunk: bool,
81}
82
83impl PendingRequest {
84 pub fn standard(hash: Vec<u8>, response_tx: oneshot::Sender<Option<Vec<u8>>>) -> Self {
85 Self {
86 hash,
87 response_tx,
88 quoted: None,
89 }
90 }
91
92 pub fn quoted(
93 hash: Vec<u8>,
94 response_tx: oneshot::Sender<Option<Vec<u8>>>,
95 quote_id: u64,
96 mint_url: String,
97 total_payment_sat: u64,
98 ) -> Self {
99 Self {
100 hash,
101 response_tx,
102 quoted: Some(PendingQuotedRequest {
103 quote_id,
104 mint_url,
105 total_payment_sat,
106 confirmed_payment_sat: 0,
107 next_chunk_index: 0,
108 total_chunks: None,
109 assembled_data: Vec::new(),
110 in_flight_payment: None,
111 buffered_chunk: None,
112 }),
113 }
114 }
115}
116
117pub struct Peer {
119 pub peer_id: PeerId,
120 pub direction: PeerDirection,
121 pub created_at: std::time::Instant,
122 pub connected_at: Option<std::time::Instant>,
123
124 pc: Arc<RTCPeerConnection>,
125 pub data_channel: Arc<Mutex<Option<Arc<RTCDataChannel>>>>,
126 signaling_tx: mpsc::Sender<SignalingMessage>,
127 my_peer_id: PeerId,
128 store: Option<Arc<dyn ContentStore>>,
129 pub pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
130 pending_nostr_queries: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>>,
131 #[allow(dead_code)]
132 message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
133 #[allow(dead_code)]
134 message_rx: Option<mpsc::Receiver<(DataMessage, Option<Vec<u8>>)>>,
135 state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
136 nostr_relay: Option<SharedMeshRelayClient>,
137 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
138 cashu_quotes: Option<Arc<CashuQuoteState>>,
139 htl_config: PeerHTLConfig,
140 signal_urls: Vec<String>,
141}
142
143impl Peer {
144 pub async fn new(
145 peer_id: PeerId,
146 direction: PeerDirection,
147 my_peer_id: PeerId,
148 signaling_tx: mpsc::Sender<SignalingMessage>,
149 stun_servers: Vec<String>,
150 ) -> Result<Self> {
151 Self::new_with_store_and_events(
152 peer_id,
153 direction,
154 my_peer_id,
155 signaling_tx,
156 stun_servers,
157 None,
158 None,
159 None,
160 None,
161 None,
162 )
163 .await
164 }
165
166 pub async fn new_with_store(
167 peer_id: PeerId,
168 direction: PeerDirection,
169 my_peer_id: PeerId,
170 signaling_tx: mpsc::Sender<SignalingMessage>,
171 stun_servers: Vec<String>,
172 store: Option<Arc<dyn ContentStore>>,
173 ) -> Result<Self> {
174 Self::new_with_store_and_events(
175 peer_id,
176 direction,
177 my_peer_id,
178 signaling_tx,
179 stun_servers,
180 store,
181 None,
182 None,
183 None,
184 None,
185 )
186 .await
187 }
188
189 #[allow(clippy::too_many_arguments)]
190 pub async fn new_with_store_and_events(
191 peer_id: PeerId,
192 direction: PeerDirection,
193 my_peer_id: PeerId,
194 signaling_tx: mpsc::Sender<SignalingMessage>,
195 stun_servers: Vec<String>,
196 store: Option<Arc<dyn ContentStore>>,
197 state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
198 nostr_relay: Option<SharedMeshRelayClient>,
199 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
200 cashu_quotes: Option<Arc<CashuQuoteState>>,
201 ) -> Result<Self> {
202 let mut m = MediaEngine::default();
203 m.register_default_codecs()?;
204
205 let mut registry = Registry::new();
206 registry = register_default_interceptors(registry, &mut m)?;
207
208 let setting_engine = SettingEngine::default();
209 let api = APIBuilder::new()
210 .with_media_engine(m)
211 .with_interceptor_registry(registry)
212 .with_setting_engine(setting_engine)
213 .build();
214
215 let ice_servers: Vec<RTCIceServer> = stun_servers
216 .iter()
217 .map(|url| RTCIceServer {
218 urls: vec![url.clone()],
219 ..Default::default()
220 })
221 .collect();
222
223 let config = RTCConfiguration {
224 ice_servers,
225 ..Default::default()
226 };
227
228 let pc = Arc::new(api.new_peer_connection(config).await?);
229 let (message_tx, message_rx) = mpsc::channel(100);
230 Ok(Self {
231 peer_id,
232 direction,
233 created_at: std::time::Instant::now(),
234 connected_at: None,
235 pc,
236 data_channel: Arc::new(Mutex::new(None)),
237 signaling_tx,
238 my_peer_id,
239 store,
240 pending_requests: Arc::new(Mutex::new(HashMap::new())),
241 pending_nostr_queries: Arc::new(Mutex::new(HashMap::new())),
242 message_tx,
243 message_rx: Some(message_rx),
244 state_event_tx,
245 nostr_relay,
246 mesh_frame_tx,
247 cashu_quotes,
248 htl_config: PeerHTLConfig::random(),
249 signal_urls: Vec::new(),
250 })
251 }
252
253 pub fn set_signal_urls(&mut self, signal_urls: Vec<String>) {
254 self.signal_urls = signal_urls;
255 }
256
257 pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
258 self.store = Some(store);
259 }
260
261 pub fn state(&self) -> RTCPeerConnectionState {
262 self.pc.connection_state()
263 }
264
265 pub fn signaling_state(&self) -> webrtc::peer_connection::signaling_state::RTCSignalingState {
266 self.pc.signaling_state()
267 }
268
269 pub fn is_connected(&self) -> bool {
270 self.pc.connection_state() == RTCPeerConnectionState::Connected
271 }
272
273 pub fn htl_config(&self) -> &PeerHTLConfig {
274 &self.htl_config
275 }
276
277 pub async fn setup_handlers(&self) -> Result<()> {
278 let peer_id = self.peer_id.clone();
279 let signaling_tx = self.signaling_tx.clone();
280 let my_peer_id_str = self.my_peer_id.to_string();
281 let target_peer_id = self.peer_id.to_string();
282
283 self.pc
284 .on_ice_candidate(Box::new(move |candidate: Option<RTCIceCandidate>| {
285 let signaling_tx = signaling_tx.clone();
286 let my_peer_id_str = my_peer_id_str.clone();
287 let target_peer_id = target_peer_id.clone();
288
289 Box::pin(async move {
290 if let Some(c) = candidate {
291 if let Ok(init) = c.to_json() {
292 info!(
293 "ICE candidate generated: {}",
294 &init.candidate[..init.candidate.len().min(60)]
295 );
296 let msg = SignalingMessage::Candidate {
297 peer_id: my_peer_id_str.clone(),
298 target_peer_id: target_peer_id.clone(),
299 candidate: init.candidate,
300 sdp_m_line_index: init.sdp_mline_index,
301 sdp_mid: init.sdp_mid,
302 };
303 if let Err(e) = signaling_tx.send(msg).await {
304 error!("Failed to send ICE candidate: {}", e);
305 }
306 }
307 }
308 })
309 }));
310
311 let peer_id_log = peer_id.clone();
312 let state_event_tx = self.state_event_tx.clone();
313 self.pc
314 .on_peer_connection_state_change(Box::new(move |state: RTCPeerConnectionState| {
315 let peer_id = peer_id_log.clone();
316 let state_event_tx = state_event_tx.clone();
317 Box::pin(async move {
318 info!("Peer {} connection state: {:?}", peer_id.short(), state);
319
320 if let Some(tx) = state_event_tx {
321 let event = match state {
322 RTCPeerConnectionState::Connected => {
323 Some(PeerStateEvent::Connected(peer_id))
324 }
325 RTCPeerConnectionState::Failed => Some(PeerStateEvent::Failed(peer_id)),
326 RTCPeerConnectionState::Disconnected
327 | RTCPeerConnectionState::Closed => {
328 Some(PeerStateEvent::Disconnected(peer_id))
329 }
330 _ => None,
331 };
332 if let Some(event) = event {
333 if let Err(e) = tx.send(event).await {
334 error!("Failed to send peer state event: {}", e);
335 }
336 }
337 }
338 })
339 }));
340
341 Ok(())
342 }
343
344 pub async fn connect(&self) -> Result<serde_json::Value> {
345 let dc_init = RTCDataChannelInit {
346 ordered: Some(false),
347 ..Default::default()
348 };
349 let dc = self
350 .pc
351 .create_data_channel(DATA_CHANNEL_LABEL, Some(dc_init))
352 .await?;
353 self.setup_data_channel(dc.clone()).await?;
354 {
355 let mut dc_guard = self.data_channel.lock().await;
356 *dc_guard = Some(dc);
357 }
358
359 let offer = self.pc.create_offer(None).await?;
360 let mut gathering_complete = self.pc.gathering_complete_promise().await;
361 self.pc.set_local_description(offer).await?;
362
363 let _ = tokio::time::timeout(
364 std::time::Duration::from_secs(10),
365 gathering_complete.recv(),
366 )
367 .await;
368
369 let local_desc = self
370 .pc
371 .local_description()
372 .await
373 .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
374
375 debug!(
376 "Offer created, SDP len: {}, ice_gathering: {:?}",
377 local_desc.sdp.len(),
378 self.pc.ice_gathering_state()
379 );
380
381 Ok(serde_json::json!({
382 "type": local_desc.sdp_type.to_string().to_lowercase(),
383 "sdp": local_desc.sdp
384 }))
385 }
386
387 pub async fn handle_offer(&self, offer: serde_json::Value) -> Result<serde_json::Value> {
388 let sdp = offer
389 .get("sdp")
390 .and_then(|s| s.as_str())
391 .ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
392
393 let peer_id = self.peer_id.clone();
394 let message_tx = self.message_tx.clone();
395 let pending_requests = self.pending_requests.clone();
396 let pending_nostr_queries = self.pending_nostr_queries.clone();
397 let store = self.store.clone();
398 let data_channel_holder = self.data_channel.clone();
399 let nostr_relay = self.nostr_relay.clone();
400 let mesh_frame_tx = self.mesh_frame_tx.clone();
401 let cashu_quotes = self.cashu_quotes.clone();
402 let peer_pubkey = Some(self.peer_id.pubkey.clone());
403 let state_event_tx = self.state_event_tx.clone();
404 let signal_urls = self.signal_urls.clone();
405
406 self.pc
407 .on_data_channel(Box::new(move |dc: Arc<RTCDataChannel>| {
408 let peer_id = peer_id.clone();
409 let message_tx = message_tx.clone();
410 let pending_requests = pending_requests.clone();
411 let pending_nostr_queries = pending_nostr_queries.clone();
412 let store = store.clone();
413 let data_channel_holder = data_channel_holder.clone();
414 let nostr_relay = nostr_relay.clone();
415 let mesh_frame_tx = mesh_frame_tx.clone();
416 let cashu_quotes = cashu_quotes.clone();
417 let peer_pubkey = peer_pubkey.clone();
418 let state_event_tx = state_event_tx.clone();
419 let signal_urls = signal_urls.clone();
420
421 Box::pin(async move {
422 info!(
423 "Peer {} received data channel: {}",
424 peer_id.short(),
425 dc.label()
426 );
427
428 {
429 let mut dc_guard = data_channel_holder.lock().await;
430 *dc_guard = Some(dc.clone());
431 }
432
433 Self::setup_dc_handlers(
434 dc.clone(),
435 peer_id,
436 message_tx,
437 pending_requests,
438 pending_nostr_queries.clone(),
439 store,
440 nostr_relay,
441 mesh_frame_tx,
442 cashu_quotes,
443 peer_pubkey,
444 state_event_tx,
445 signal_urls,
446 )
447 .await;
448 })
449 }));
450
451 let offer_desc = RTCSessionDescription::offer(sdp.to_string())?;
452 self.pc.set_remote_description(offer_desc).await?;
453
454 let answer = self.pc.create_answer(None).await?;
455 let mut gathering_complete = self.pc.gathering_complete_promise().await;
456 self.pc.set_local_description(answer).await?;
457
458 let _ = tokio::time::timeout(
459 std::time::Duration::from_secs(10),
460 gathering_complete.recv(),
461 )
462 .await;
463
464 let local_desc = self
465 .pc
466 .local_description()
467 .await
468 .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
469
470 debug!(
471 "Answer created, SDP len: {}, ice_gathering: {:?}",
472 local_desc.sdp.len(),
473 self.pc.ice_gathering_state()
474 );
475
476 Ok(serde_json::json!({
477 "type": local_desc.sdp_type.to_string().to_lowercase(),
478 "sdp": local_desc.sdp
479 }))
480 }
481
482 pub async fn handle_answer(&self, answer: serde_json::Value) -> Result<()> {
483 let sdp = answer
484 .get("sdp")
485 .and_then(|s| s.as_str())
486 .ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
487
488 let answer_desc = RTCSessionDescription::answer(sdp.to_string())?;
489 self.pc.set_remote_description(answer_desc).await?;
490 Ok(())
491 }
492
493 pub async fn handle_candidate(&self, candidate: serde_json::Value) -> Result<()> {
494 let candidate_str = candidate
495 .get("candidate")
496 .and_then(|c| c.as_str())
497 .unwrap_or("");
498
499 let sdp_mid = candidate
500 .get("sdpMid")
501 .and_then(|m| m.as_str())
502 .map(|s| s.to_string());
503
504 let sdp_mline_index = candidate
505 .get("sdpMLineIndex")
506 .and_then(|i| i.as_u64())
507 .map(|i| i as u16);
508
509 if !candidate_str.is_empty() {
510 use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
511 let init = RTCIceCandidateInit {
512 candidate: candidate_str.to_string(),
513 sdp_mid,
514 sdp_mline_index,
515 username_fragment: candidate
516 .get("usernameFragment")
517 .and_then(|u| u.as_str())
518 .map(|s| s.to_string()),
519 };
520 self.pc.add_ice_candidate(init).await?;
521 }
522
523 Ok(())
524 }
525
526 async fn setup_data_channel(&self, dc: Arc<RTCDataChannel>) -> Result<()> {
527 let peer_id = self.peer_id.clone();
528 let message_tx = self.message_tx.clone();
529 let pending_requests = self.pending_requests.clone();
530 let store = self.store.clone();
531 let nostr_relay = self.nostr_relay.clone();
532 let mesh_frame_tx = self.mesh_frame_tx.clone();
533 let cashu_quotes = self.cashu_quotes.clone();
534 let peer_pubkey = Some(self.peer_id.pubkey.clone());
535 let state_event_tx = self.state_event_tx.clone();
536 let signal_urls = self.signal_urls.clone();
537
538 Self::setup_dc_handlers(
539 dc,
540 peer_id,
541 message_tx,
542 pending_requests,
543 self.pending_nostr_queries.clone(),
544 store,
545 nostr_relay,
546 mesh_frame_tx,
547 cashu_quotes,
548 peer_pubkey,
549 state_event_tx,
550 signal_urls,
551 )
552 .await;
553 Ok(())
554 }
555
556 #[allow(clippy::too_many_arguments)]
557 async fn setup_dc_handlers(
558 dc: Arc<RTCDataChannel>,
559 peer_id: PeerId,
560 message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
561 pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
562 pending_nostr_queries: Arc<
563 Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>,
564 >,
565 store: Option<Arc<dyn ContentStore>>,
566 nostr_relay: Option<SharedMeshRelayClient>,
567 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
568 cashu_quotes: Option<Arc<CashuQuoteState>>,
569 peer_pubkey: Option<String>,
570 state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
571 signal_urls: Vec<String>,
572 ) {
573 let label = dc.label().to_string();
574 let peer_short = peer_id.short();
575 let _pending_binary: Arc<Mutex<Option<u32>>> = Arc::new(Mutex::new(None));
576
577 let open_notify = nostr_relay.as_ref().map(|_| Arc::new(Notify::new()));
578 if let Some(ref notify) = open_notify {
579 if dc.ready_state() == RTCDataChannelState::Open {
580 notify.notify_one();
581 }
582 }
583
584 let mut nostr_client_id: Option<u64> = None;
585 if let Some(relay) = nostr_relay.clone() {
586 let client_id = relay.next_client_id();
587 let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
588 relay
589 .register_client(client_id, nostr_tx, peer_pubkey.clone())
590 .await;
591 nostr_client_id = Some(client_id);
592
593 if let Some(notify) = open_notify.clone() {
594 let dc_for_send = dc.clone();
595 tokio::spawn(async move {
596 notify.notified().await;
597 while let Some(text) = nostr_rx.recv().await {
598 if dc_for_send.send_text(text).await.is_err() {
599 break;
600 }
601 }
602 });
603 }
604 }
605
606 if let (Some(relay), Some(client_id)) = (nostr_relay.clone(), nostr_client_id) {
607 dc.on_close(Box::new(move || {
608 let relay = relay.clone();
609 Box::pin(async move {
610 relay.unregister_client(client_id).await;
611 })
612 }));
613 }
614
615 let open_notify_clone = open_notify.clone();
616 let dc_for_hints = dc.clone();
617 let signal_urls_for_open = signal_urls.clone();
618 let peer_short_open = peer_short.clone();
619 let label_clone = label.clone();
620 dc.on_open(Box::new(move || {
621 let peer_short_open = peer_short_open.clone();
622 let label_clone = label_clone.clone();
623 let open_notify = open_notify_clone.clone();
624 let dc_for_hints = dc_for_hints.clone();
625 let signal_urls = signal_urls_for_open.clone();
626 Box::pin(async move {
627 info!(
628 "[Peer {}] Data channel '{}' open",
629 peer_short_open, label_clone
630 );
631 if let Some(notify) = open_notify {
632 notify.notify_one();
633 }
634 if !signal_urls.is_empty() {
635 let hints = PeerHints { signal_urls };
636 let _ = dc_for_hints
637 .send(&Bytes::from(encode_peer_hints(&hints)))
638 .await;
639 }
640 })
641 }));
642
643 if dc.ready_state() == RTCDataChannelState::Open && !signal_urls.is_empty() {
644 let hints = PeerHints { signal_urls };
645 let _ = dc.send(&Bytes::from(encode_peer_hints(&hints))).await;
646 }
647
648 let dc_for_msg = dc.clone();
649 let peer_short_msg = peer_short.clone();
650 let _pending_binary_clone = _pending_binary.clone();
651 let store_clone = store.clone();
652 let nostr_relay_for_msg = nostr_relay.clone();
653 let nostr_client_id_for_msg = nostr_client_id;
654 let pending_nostr_queries_for_msg = pending_nostr_queries.clone();
655 let mesh_frame_tx_for_msg = mesh_frame_tx.clone();
656 let peer_id_for_msg = peer_id.clone();
657
658 dc.on_message(Box::new(move |msg: DataChannelMessage| {
659 let dc = dc_for_msg.clone();
660 let peer_short = peer_short_msg.clone();
661 let pending_requests = pending_requests.clone();
662 let _pending_binary = _pending_binary_clone.clone();
663 let _message_tx = message_tx.clone();
664 let store = store_clone.clone();
665 let nostr_relay = nostr_relay_for_msg.clone();
666 let nostr_client_id = nostr_client_id_for_msg;
667 let pending_nostr_queries = pending_nostr_queries_for_msg.clone();
668 let mesh_frame_tx = mesh_frame_tx_for_msg.clone();
669 let cashu_quotes = cashu_quotes.clone();
670 let state_event_tx = state_event_tx.clone();
671 let peer_id = peer_id_for_msg.clone();
672 let msg_data = msg.data.clone();
673
674 Box::pin(async move {
675 if msg.is_string {
676 if let Ok(text) = std::str::from_utf8(&msg_data) {
677 if let Ok(mesh_frame) = serde_json::from_str::<MeshNostrFrame>(text) {
678 match validate_mesh_frame(&mesh_frame) {
679 Ok(()) => {
680 if let Some(tx) = mesh_frame_tx {
681 let _ = tx.send((peer_id.clone(), mesh_frame)).await;
682 }
683 return;
684 }
685 Err(reason) => {
686 debug!(
687 "[Peer {}] Ignoring invalid mesh frame: {}",
688 peer_short, reason
689 );
690 }
691 }
692 }
693
694 if let Ok(relay_msg) = NostrRelayMessage::from_json(text) {
695 if let Some(sub_id) = relay_subscription_id(&relay_msg) {
696 let sender = {
697 let pending = pending_nostr_queries.lock().await;
698 pending.get(&sub_id).cloned()
699 };
700 if let Some(tx) = sender {
701 debug!(
702 "[Peer {}] Routed Nostr relay message for subscription {}",
703 peer_short, sub_id
704 );
705 let _ = tx.send(relay_msg);
706 return;
707 } else {
708 debug!(
709 "[Peer {}] Dropping Nostr relay message for unknown subscription {}",
710 peer_short, sub_id
711 );
712 }
713 }
714 }
715
716 if let Some(relay) = nostr_relay {
717 if let Ok(nostr_msg) = NostrClientMessage::from_json(text) {
718 if let Some(client_id) = nostr_client_id {
719 relay.handle_client_message(client_id, nostr_msg).await;
720 }
721 }
722 }
723 }
724 return;
725 }
726
727 debug!(
728 "[Peer {}] Received {} bytes on data channel",
729 peer_short,
730 msg_data.len()
731 );
732 match parse_message(&msg_data) {
733 Some(data_msg) => match data_msg {
734 DataMessage::Request(req) => {
735 let hash_hex = hash_to_hex(&req.h);
736 let hash_short = &hash_hex[..8.min(hash_hex.len())];
737 info!("[Peer {}] Received request for {}", peer_short, hash_short);
738
739 if let Some(cashu_quotes) = cashu_quotes.as_ref() {
740 if cashu_quotes
741 .should_refuse_requests_from_peer(&peer_id.to_string())
742 .await
743 {
744 info!(
745 "[Peer {}] Refusing request from peer with unpaid defaults",
746 peer_short
747 );
748 return;
749 }
750 }
751
752 let quoted_settlement = if let Some(quote_id) = req.q {
753 let Some(cashu_quotes) = cashu_quotes.as_ref() else {
754 info!(
755 "[Peer {}] Ignoring quoted request without Cashu settlement state",
756 peer_short
757 );
758 return;
759 };
760 match cashu_quotes
761 .take_valid_quote(&peer_id.to_string(), &req.h, quote_id)
762 .await
763 {
764 Some(settlement) => Some((quote_id, settlement)),
765 None => {
766 info!(
767 "[Peer {}] Ignoring request with invalid or expired quote {}",
768 peer_short, quote_id
769 );
770 return;
771 }
772 }
773 } else {
774 None
775 };
776
777 let data = if let Some(ref store) = store {
778 match store.get(&hash_hex) {
779 Ok(Some(data)) => {
780 info!(
781 "[Peer {}] Found {} in store ({} bytes)",
782 peer_short,
783 hash_short,
784 data.len()
785 );
786 Some(data)
787 }
788 Ok(None) => {
789 info!(
790 "[Peer {}] Hash {} not in store",
791 peer_short, hash_short
792 );
793 None
794 }
795 Err(e) => {
796 warn!("[Peer {}] Store error: {}", peer_short, e);
797 None
798 }
799 }
800 } else {
801 warn!(
802 "[Peer {}] No store configured - cannot serve requests",
803 peer_short
804 );
805 None
806 };
807
808 if let Some(data) = data {
809 let data_len = data.len();
810 if let (Some(cashu_quotes), Some((quote_id, settlement))) =
811 (cashu_quotes.as_ref(), quoted_settlement)
812 {
813 match cashu_quotes
814 .prepare_quoted_transfer(
815 &peer_id.to_string(),
816 &req.h,
817 quote_id,
818 &settlement,
819 data,
820 )
821 .await
822 {
823 Some((first_chunk, first_expected)) => {
824 if send_quoted_chunk(
825 &dc,
826 &peer_id,
827 &peer_short,
828 cashu_quotes,
829 first_chunk,
830 first_expected,
831 )
832 .await
833 {
834 info!(
835 "[Peer {}] Started quoted chunked response for {} ({} bytes)",
836 peer_short, hash_short, data_len
837 );
838 }
839 }
840 None => {
841 warn!(
842 "[Peer {}] Failed to prepare quoted transfer for {}",
843 peer_short, hash_short
844 );
845 }
846 }
847 } else {
848 let response = DataResponse {
849 h: req.h,
850 d: data,
851 i: None,
852 n: None,
853 };
854 let wire = encode_response(&response);
855 if let Err(e) = dc.send(&Bytes::from(wire)).await {
856 error!(
857 "[Peer {}] Failed to send response: {}",
858 peer_short, e
859 );
860 } else {
861 info!(
862 "[Peer {}] Sent response for {} ({} bytes)",
863 peer_short, hash_short, data_len
864 );
865 }
866 }
867 } else {
868 info!("[Peer {}] Content not found for {}", peer_short, hash_short);
869 }
870 }
871 DataMessage::Response(res) => {
872 let hash_hex = hash_to_hex(&res.h);
873 let hash_short = &hash_hex[..8.min(hash_hex.len())];
874 debug!(
875 "[Peer {}] Received response for {} ({} bytes)",
876 peer_short,
877 hash_short,
878 res.d.len()
879 );
880
881 let mut pending = pending_requests.lock().await;
882 if let Some(req) = pending.remove(&hash_hex) {
883 let _ = req.response_tx.send(Some(res.d));
884 }
885 }
886 DataMessage::QuoteRequest(req) => {
887 let response = handle_quote_request_message(
888 &peer_short,
889 &peer_id,
890 &store,
891 cashu_quotes.as_ref(),
892 &req,
893 )
894 .await;
895 if let Some(response) = response {
896 let wire = encode_quote_response(&response);
897 if let Err(e) = dc.send(&Bytes::from(wire)).await {
898 warn!(
899 "[Peer {}] Failed to send quote response: {}",
900 peer_short, e
901 );
902 }
903 }
904 }
905 DataMessage::QuoteResponse(res) => {
906 if let Some(cashu_quotes) = cashu_quotes.as_ref() {
907 let _ = cashu_quotes
908 .handle_quote_response(&peer_id.to_string(), res)
909 .await;
910 }
911 }
912 DataMessage::Chunk(chunk) => {
913 process_chunk_message(
914 &peer_short,
915 &peer_id,
916 &dc,
917 &pending_requests,
918 cashu_quotes.as_ref(),
919 chunk,
920 )
921 .await;
922 }
923 DataMessage::Payment(req) => {
924 let outcome =
925 handle_payment_message(&peer_id, cashu_quotes.as_ref(), &req).await;
926 let wire = encode_payment_ack(&outcome.ack);
927 if let Err(e) = dc.send(&Bytes::from(wire)).await {
928 warn!(
929 "[Peer {}] Failed to send payment ack: {}",
930 peer_short, e
931 );
932 }
933 if let (Some(cashu_quotes), Some((next_chunk, next_expected))) =
934 (cashu_quotes.as_ref(), outcome.next_chunk)
935 {
936 let _ = send_quoted_chunk(
937 &dc,
938 &peer_id,
939 &peer_short,
940 cashu_quotes,
941 next_chunk,
942 next_expected,
943 )
944 .await;
945 }
946 }
947 DataMessage::PaymentAck(res) => {
948 handle_payment_ack_message(
949 &peer_short,
950 &peer_id,
951 &dc,
952 &pending_requests,
953 cashu_quotes.as_ref(),
954 res,
955 )
956 .await;
957 }
958 DataMessage::PeerHints(hints) => {
959 if let Some(tx) = state_event_tx {
960 let _ = tx
961 .send(PeerStateEvent::SignalHints(
962 peer_id.clone(),
963 hints.signal_urls,
964 ))
965 .await;
966 }
967 }
968 DataMessage::PubsubInterest(_)
969 | DataMessage::PubsubFrame(_)
970 | DataMessage::PubsubInventory(_)
971 | DataMessage::PubsubWant(_) => {}
972 },
973 None => {
974 warn!("[Peer {}] Failed to parse message", peer_short);
975 let hex_dump: String = msg_data
976 .iter()
977 .take(50)
978 .map(|b| format!("{:02x}", b))
979 .collect();
980 warn!("[Peer {}] Message hex: {}", peer_short, hex_dump);
981 }
982 }
983 })
984 }));
985 }
986
987 pub fn has_data_channel(&self) -> bool {
988 self.data_channel
989 .try_lock()
990 .map(|guard| {
991 guard
992 .as_ref()
993 .map(|dc| dc.ready_state() == RTCDataChannelState::Open)
994 .unwrap_or(false)
995 })
996 .unwrap_or(false)
997 }
998
999 pub async fn request(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
1000 self.request_with_timeout(hash_hex, std::time::Duration::from_secs(10))
1001 .await
1002 }
1003
1004 pub async fn request_with_timeout(
1005 &self,
1006 hash_hex: &str,
1007 timeout: std::time::Duration,
1008 ) -> Result<Option<Vec<u8>>> {
1009 let dc_guard = self.data_channel.lock().await;
1010 let dc = dc_guard
1011 .as_ref()
1012 .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1013 .clone();
1014 drop(dc_guard);
1015
1016 let hash = hex::decode(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hex hash: {}", e))?;
1017 let (tx, rx) = oneshot::channel();
1018
1019 {
1020 let mut pending = self.pending_requests.lock().await;
1021 pending.insert(
1022 hash_hex.to_string(),
1023 PendingRequest::standard(hash.clone(), tx),
1024 );
1025 }
1026
1027 let req = DataRequest {
1028 h: hash,
1029 htl: BLOB_REQUEST_POLICY.max_htl,
1030 q: None,
1031 };
1032 let wire = encode_request(&req);
1033 dc.send(&Bytes::from(wire)).await?;
1034
1035 debug!(
1036 "[Peer {}] Sent request for {}",
1037 self.peer_id.short(),
1038 &hash_hex[..8.min(hash_hex.len())]
1039 );
1040
1041 match tokio::time::timeout(timeout, rx).await {
1042 Ok(Ok(data)) => Ok(data),
1043 Ok(Err(_)) => Ok(None),
1044 Err(_) => {
1045 let mut pending = self.pending_requests.lock().await;
1046 pending.remove(hash_hex);
1047 Ok(None)
1048 }
1049 }
1050 }
1051
1052 pub async fn query_nostr_events(
1053 &self,
1054 filters: Vec<NostrFilter>,
1055 timeout: std::time::Duration,
1056 ) -> Result<Vec<Event>> {
1057 let dc_guard = self.data_channel.lock().await;
1058 let dc = dc_guard
1059 .as_ref()
1060 .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1061 .clone();
1062 drop(dc_guard);
1063
1064 let subscription_id = NostrSubscriptionId::generate();
1065 let subscription_key = subscription_id.to_string();
1066 let (tx, mut rx) = mpsc::unbounded_channel::<NostrRelayMessage>();
1067
1068 {
1069 let mut pending = self.pending_nostr_queries.lock().await;
1070 pending.insert(subscription_key.clone(), tx);
1071 }
1072
1073 let req = NostrClientMessage::req(subscription_id.clone(), filters);
1074 if let Err(e) = dc.send_text(req.as_json()).await {
1075 let mut pending = self.pending_nostr_queries.lock().await;
1076 pending.remove(&subscription_key);
1077 return Err(e.into());
1078 }
1079 debug!(
1080 "[Peer {}] Sent Nostr REQ subscription {}",
1081 self.peer_id.short(),
1082 subscription_id
1083 );
1084
1085 let mut events = Vec::new();
1086 let deadline = tokio::time::Instant::now() + timeout;
1087
1088 loop {
1089 let now = tokio::time::Instant::now();
1090 if now >= deadline {
1091 break;
1092 }
1093 let remaining = deadline - now;
1094
1095 let next = tokio::time::timeout(remaining, rx.recv()).await;
1096 match next {
1097 Ok(Some(NostrRelayMessage::Event {
1098 subscription_id: sid,
1099 event,
1100 })) if sid == subscription_id => {
1101 debug!(
1102 "[Peer {}] Received Nostr EVENT for subscription {}",
1103 self.peer_id.short(),
1104 subscription_id
1105 );
1106 events.push(*event);
1107 }
1108 Ok(Some(NostrRelayMessage::EndOfStoredEvents(sid))) if sid == subscription_id => {
1109 debug!(
1110 "[Peer {}] Received Nostr EOSE for subscription {}",
1111 self.peer_id.short(),
1112 subscription_id
1113 );
1114 break;
1115 }
1116 Ok(Some(NostrRelayMessage::Closed {
1117 subscription_id: sid,
1118 message,
1119 })) if sid == subscription_id => {
1120 warn!(
1121 "[Peer {}] Nostr query closed for subscription {}: {}",
1122 self.peer_id.short(),
1123 subscription_id,
1124 message
1125 );
1126 break;
1127 }
1128 Ok(Some(_)) => {}
1129 Ok(None) => break,
1130 Err(_) => {
1131 warn!(
1132 "[Peer {}] Nostr query timed out for subscription {}",
1133 self.peer_id.short(),
1134 subscription_id
1135 );
1136 break;
1137 }
1138 }
1139 }
1140
1141 let close = NostrClientMessage::close(subscription_id.clone());
1142 let _ = dc.send_text(close.as_json()).await;
1143
1144 let mut pending = self.pending_nostr_queries.lock().await;
1145 pending.remove(&subscription_key);
1146 debug!(
1147 "[Peer {}] Nostr query subscription {} collected {} event(s)",
1148 self.peer_id.short(),
1149 subscription_id,
1150 events.len()
1151 );
1152
1153 Ok(events)
1154 }
1155
1156 pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
1157 let dc_guard = self.data_channel.lock().await;
1158 let dc = dc_guard
1159 .as_ref()
1160 .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1161 .clone();
1162 drop(dc_guard);
1163
1164 let text = serde_json::to_string(frame)?;
1165 dc.send_text(text).await?;
1166 Ok(())
1167 }
1168
1169 pub async fn send_message(&self, msg: &DataMessage) -> Result<()> {
1170 let dc_guard = self.data_channel.lock().await;
1171 if let Some(ref dc) = *dc_guard {
1172 let wire = encode_data_message(msg);
1173 dc.send(&Bytes::from(wire)).await?;
1174 }
1175 Ok(())
1176 }
1177
1178 pub async fn close(&self) -> Result<()> {
1179 {
1180 let dc_guard = self.data_channel.lock().await;
1181 if let Some(ref dc) = *dc_guard {
1182 dc.close().await?;
1183 }
1184 }
1185 self.pc.close().await?;
1186 Ok(())
1187 }
1188}
1189
1190fn hash_to_hex(hash: &[u8]) -> String {
1191 hash_to_key(hash)
1192}
1193
1194fn encode_data_message(msg: &DataMessage) -> Vec<u8> {
1195 match msg {
1196 DataMessage::Request(req) => encode_request(req),
1197 DataMessage::Response(res) => encode_response(res),
1198 DataMessage::QuoteRequest(req) => crate::encode_quote_request(req),
1199 DataMessage::QuoteResponse(res) => encode_quote_response(res),
1200 DataMessage::Payment(req) => crate::encode_payment(req),
1201 DataMessage::PaymentAck(res) => crate::encode_payment_ack(res),
1202 DataMessage::Chunk(chunk) => crate::encode_chunk(chunk),
1203 DataMessage::PeerHints(hints) => crate::encode_peer_hints(hints),
1204 DataMessage::PubsubInterest(interest) => crate::encode_pubsub_interest(interest),
1205 DataMessage::PubsubFrame(frame) => crate::encode_pubsub_frame(frame),
1206 DataMessage::PubsubInventory(inv) => crate::encode_pubsub_inventory(inv),
1207 DataMessage::PubsubWant(want) => crate::encode_pubsub_want(want),
1208 }
1209}
1210
1211fn relay_subscription_id(msg: &NostrRelayMessage) -> Option<String> {
1212 match msg {
1213 NostrRelayMessage::Event {
1214 subscription_id, ..
1215 } => Some(subscription_id.to_string()),
1216 NostrRelayMessage::EndOfStoredEvents(subscription_id) => Some(subscription_id.to_string()),
1217 NostrRelayMessage::Closed {
1218 subscription_id, ..
1219 } => Some(subscription_id.to_string()),
1220 NostrRelayMessage::Count {
1221 subscription_id, ..
1222 } => Some(subscription_id.to_string()),
1223 _ => None,
1224 }
1225}
1226
1227#[async_trait]
1228impl RoutedPeerLink for Peer {
1229 async fn send(&self, data: Vec<u8>) -> std::result::Result<(), RoutedTransportError> {
1230 let dc = self
1231 .data_channel
1232 .lock()
1233 .await
1234 .as_ref()
1235 .cloned()
1236 .ok_or(RoutedTransportError::NotConnected)?;
1237 dc.send(&Bytes::from(data))
1238 .await
1239 .map(|_| ())
1240 .map_err(|e| RoutedTransportError::SendFailed(e.to_string()))
1241 }
1242
1243 async fn recv(&self) -> Option<Vec<u8>> {
1244 None
1245 }
1246
1247 fn try_recv(&self) -> Option<Vec<u8>> {
1248 None
1249 }
1250
1251 fn is_open(&self) -> bool {
1252 self.has_data_channel()
1253 }
1254
1255 async fn close(&self) {
1256 let _ = Peer::close(self).await;
1257 }
1258}