1use anyhow::Result;
4use async_trait::async_trait;
5use bytes::Bytes;
6use hashtree_network::{PeerLink as RoutedPeerLink, TransportError as RoutedTransportError};
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::{mpsc, oneshot, Mutex, Notify};
10use tracing::{debug, error, info, warn};
11use webrtc::api::interceptor_registry::register_default_interceptors;
12use webrtc::api::media_engine::MediaEngine;
13use webrtc::api::setting_engine::SettingEngine;
14use webrtc::api::APIBuilder;
15use webrtc::data_channel::data_channel_init::RTCDataChannelInit;
16use webrtc::data_channel::data_channel_message::DataChannelMessage;
17use webrtc::data_channel::data_channel_state::RTCDataChannelState;
18use webrtc::data_channel::RTCDataChannel;
19use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
20use webrtc::ice_transport::ice_server::RTCIceServer;
21use webrtc::interceptor::registry::Registry;
22use webrtc::peer_connection::configuration::RTCConfiguration;
23use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
24use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
25use webrtc::peer_connection::RTCPeerConnection;
26
27use super::cashu::{CashuQuoteState, ExpectedSettlement};
28use super::types::{
29 encode_chunk, encode_message, encode_payment, encode_payment_ack, encode_quote_response,
30 encode_request, encode_response, hash_to_hex, parse_message, validate_mesh_frame, DataChunk,
31 DataMessage, DataPayment, DataPaymentAck, DataQuoteRequest, DataRequest, DataResponse,
32 MeshNostrFrame, PeerDirection, PeerHTLConfig, PeerId, PeerStateEvent, SignalingMessage,
33 BLOB_REQUEST_POLICY,
34};
35use crate::nostr_relay::NostrRelay;
36use nostr::{
37 ClientMessage as NostrClientMessage, Filter as NostrFilter, JsonUtil as NostrJsonUtil,
38 RelayMessage as NostrRelayMessage, SubscriptionId as NostrSubscriptionId,
39};
40
41pub trait ContentStore: Send + Sync + 'static {
43 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>>;
45}
46
47pub struct PendingRequest {
49 pub hash: Vec<u8>,
50 pub response_tx: oneshot::Sender<Option<Vec<u8>>>,
51 pub quoted: Option<PendingQuotedRequest>,
52}
53
54pub struct PendingQuotedRequest {
55 pub quote_id: u64,
56 pub mint_url: String,
57 pub total_payment_sat: u64,
58 pub confirmed_payment_sat: u64,
59 pub next_chunk_index: u32,
60 pub total_chunks: Option<u32>,
61 pub assembled_data: Vec<u8>,
62 pub in_flight_payment: Option<PendingChunkPayment>,
63 pub buffered_chunk: Option<DataChunk>,
64}
65
66pub struct PendingChunkPayment {
67 pub chunk_index: u32,
68 pub amount_sat: u64,
69 pub mint_url: String,
70 pub operation_id: String,
71 pub final_chunk: bool,
72}
73
74impl PendingRequest {
75 pub fn standard(hash: Vec<u8>, response_tx: oneshot::Sender<Option<Vec<u8>>>) -> Self {
76 Self {
77 hash,
78 response_tx,
79 quoted: None,
80 }
81 }
82
83 pub fn quoted(
84 hash: Vec<u8>,
85 response_tx: oneshot::Sender<Option<Vec<u8>>>,
86 quote_id: u64,
87 mint_url: String,
88 total_payment_sat: u64,
89 ) -> Self {
90 Self {
91 hash,
92 response_tx,
93 quoted: Some(PendingQuotedRequest {
94 quote_id,
95 mint_url,
96 total_payment_sat,
97 confirmed_payment_sat: 0,
98 next_chunk_index: 0,
99 total_chunks: None,
100 assembled_data: Vec::new(),
101 in_flight_payment: None,
102 buffered_chunk: None,
103 }),
104 }
105 }
106}
107
108async fn handle_quote_request_message(
109 peer_short: &str,
110 peer_id: &PeerId,
111 store: &Option<Arc<dyn ContentStore>>,
112 cashu_quotes: Option<&Arc<CashuQuoteState>>,
113 req: &DataQuoteRequest,
114) -> Option<super::types::DataQuoteResponse> {
115 let Some(cashu_quotes) = cashu_quotes else {
116 debug!(
117 "[Peer {}] Ignoring quote request without Cashu policy",
118 peer_short
119 );
120 return None;
121 };
122
123 if cashu_quotes
124 .should_refuse_requests_from_peer(&peer_id.to_string())
125 .await
126 {
127 return Some(
128 cashu_quotes
129 .build_quote_response(&peer_id.to_string(), req, false)
130 .await,
131 );
132 }
133
134 let hash_hex = hash_to_hex(&req.h);
135 let can_serve = if let Some(store) = store {
136 match store.get(&hash_hex) {
137 Ok(Some(_)) => true,
138 Ok(None) => false,
139 Err(e) => {
140 warn!("[Peer {}] Store error during quote: {}", peer_short, e);
141 false
142 }
143 }
144 } else {
145 false
146 };
147
148 Some(
149 cashu_quotes
150 .build_quote_response(&peer_id.to_string(), req, can_serve)
151 .await,
152 )
153}
154
155struct PaymentHandlingOutcome {
156 ack: DataPaymentAck,
157 next_chunk: Option<(DataChunk, ExpectedSettlement)>,
158}
159
160async fn send_quoted_chunk(
161 dc: &Arc<RTCDataChannel>,
162 peer_id: &PeerId,
163 peer_short: &str,
164 cashu_quotes: &Arc<CashuQuoteState>,
165 chunk: DataChunk,
166 expected: ExpectedSettlement,
167) -> bool {
168 let hash_hex = hash_to_hex(&chunk.h);
169 let wire = match encode_chunk(&chunk) {
170 Ok(wire) => wire,
171 Err(err) => {
172 warn!(
173 "[Peer {}] Failed to encode quoted chunk {} for quote {}: {}",
174 peer_short, chunk.c, chunk.q, err
175 );
176 return false;
177 }
178 };
179
180 if let Err(err) = dc.send(&Bytes::from(wire)).await {
181 warn!(
182 "[Peer {}] Failed to send quoted chunk {} for quote {}: {}",
183 peer_short, chunk.c, chunk.q, err
184 );
185 return false;
186 }
187
188 cashu_quotes
189 .register_expected_payment(peer_id.to_string(), hash_hex, chunk.q, expected)
190 .await;
191 true
192}
193
194async fn fail_pending_request(
195 pending_requests: &Arc<Mutex<HashMap<String, PendingRequest>>>,
196 cashu_quotes: Option<&Arc<CashuQuoteState>>,
197 hash_hex: &str,
198) {
199 let pending = pending_requests.lock().await.remove(hash_hex);
200 let Some(pending) = pending else {
201 return;
202 };
203
204 if let (Some(cashu_quotes), Some(quoted)) = (cashu_quotes, pending.quoted) {
205 if let Some(in_flight) = quoted.in_flight_payment {
206 let _ = cashu_quotes
207 .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
208 .await;
209 }
210 }
211 let _ = pending.response_tx.send(None);
212}
213
214async fn process_chunk_message(
215 peer_short: &str,
216 _peer_id: &PeerId,
217 dc: &Arc<RTCDataChannel>,
218 pending_requests: &Arc<Mutex<HashMap<String, PendingRequest>>>,
219 cashu_quotes: Option<&Arc<CashuQuoteState>>,
220 chunk: DataChunk,
221) {
222 let hash_hex = hash_to_hex(&chunk.h);
223 let Some(cashu_quotes) = cashu_quotes else {
224 fail_pending_request(pending_requests, None, &hash_hex).await;
225 return;
226 };
227
228 enum ChunkAction {
229 BufferOnly,
230 Fail,
231 Pay {
232 mint_url: String,
233 amount_sat: u64,
234 final_chunk: bool,
235 },
236 }
237
238 let action = {
239 let mut pending = pending_requests.lock().await;
240 let Some(request) = pending.get_mut(&hash_hex) else {
241 return;
242 };
243 match request.quoted.as_mut() {
244 None => ChunkAction::Fail,
245 Some(quoted) if quoted.quote_id != chunk.q || chunk.n == 0 => ChunkAction::Fail,
246 Some(quoted) => {
247 if let Some(in_flight) = quoted.in_flight_payment.as_ref() {
248 let expected_buffer_index = in_flight.chunk_index + 1;
249 if chunk.c == expected_buffer_index && quoted.buffered_chunk.is_none() {
250 quoted.buffered_chunk = Some(chunk.clone());
251 ChunkAction::BufferOnly
252 } else {
253 ChunkAction::Fail
254 }
255 } else if chunk.c != quoted.next_chunk_index {
256 ChunkAction::Fail
257 } else if let Some(total_chunks) = quoted.total_chunks {
258 if total_chunks != chunk.n {
259 ChunkAction::Fail
260 } else {
261 let next_total = quoted.confirmed_payment_sat.saturating_add(chunk.p);
262 if next_total > quoted.total_payment_sat
263 || (chunk.c + 1 == chunk.n && next_total != quoted.total_payment_sat)
264 {
265 ChunkAction::Fail
266 } else {
267 quoted.total_chunks = Some(chunk.n);
268 quoted.assembled_data.extend_from_slice(&chunk.d);
269 quoted.next_chunk_index += 1;
270 ChunkAction::Pay {
271 mint_url: quoted.mint_url.clone(),
272 amount_sat: chunk.p,
273 final_chunk: chunk.c + 1 == chunk.n,
274 }
275 }
276 }
277 } else {
278 let next_total = quoted.confirmed_payment_sat.saturating_add(chunk.p);
279 if next_total > quoted.total_payment_sat
280 || (chunk.c + 1 == chunk.n && next_total != quoted.total_payment_sat)
281 {
282 ChunkAction::Fail
283 } else {
284 quoted.total_chunks = Some(chunk.n);
285 quoted.assembled_data.extend_from_slice(&chunk.d);
286 quoted.next_chunk_index += 1;
287 ChunkAction::Pay {
288 mint_url: quoted.mint_url.clone(),
289 amount_sat: chunk.p,
290 final_chunk: chunk.c + 1 == chunk.n,
291 }
292 }
293 }
294 }
295 }
296 };
297
298 match action {
299 ChunkAction::BufferOnly => (),
300 ChunkAction::Fail => {
301 warn!(
302 "[Peer {}] Invalid quoted chunk {} for hash {}",
303 peer_short, chunk.c, hash_hex
304 );
305 fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
306 }
307 ChunkAction::Pay {
308 mint_url,
309 amount_sat,
310 final_chunk,
311 } => {
312 let payment = match cashu_quotes
313 .create_payment_token(&mint_url, amount_sat)
314 .await
315 {
316 Ok(payment) => payment,
317 Err(err) => {
318 warn!(
319 "[Peer {}] Failed to create payment token for chunk {} of {}: {}",
320 peer_short, chunk.c, hash_hex, err
321 );
322 fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
323 return;
324 }
325 };
326
327 {
328 let mut pending = pending_requests.lock().await;
329 let Some(request) = pending.get_mut(&hash_hex) else {
330 let _ = cashu_quotes
331 .revoke_payment_token(&payment.mint_url, &payment.operation_id)
332 .await;
333 return;
334 };
335 let Some(quoted) = request.quoted.as_mut() else {
336 let _ = cashu_quotes
337 .revoke_payment_token(&payment.mint_url, &payment.operation_id)
338 .await;
339 return;
340 };
341 quoted.in_flight_payment = Some(PendingChunkPayment {
342 chunk_index: chunk.c,
343 amount_sat,
344 mint_url: payment.mint_url.clone(),
345 operation_id: payment.operation_id.clone(),
346 final_chunk,
347 });
348 }
349
350 let payment_msg = DataPayment {
351 h: chunk.h,
352 q: chunk.q,
353 c: chunk.c,
354 p: amount_sat,
355 m: Some(payment.mint_url.clone()),
356 tok: payment.token,
357 };
358 let wire = match encode_payment(&payment_msg) {
359 Ok(wire) => wire,
360 Err(err) => {
361 warn!(
362 "[Peer {}] Failed to encode payment for chunk {} of {}: {}",
363 peer_short, chunk.c, hash_hex, err
364 );
365 fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
366 return;
367 }
368 };
369 if let Err(err) = dc.send(&Bytes::from(wire)).await {
370 warn!(
371 "[Peer {}] Failed to send payment for chunk {} of {}: {}",
372 peer_short, chunk.c, hash_hex, err
373 );
374 fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
375 }
376 }
377 }
378}
379
380async fn handle_payment_ack_message(
381 peer_short: &str,
382 peer_id: &PeerId,
383 dc: &Arc<RTCDataChannel>,
384 pending_requests: &Arc<Mutex<HashMap<String, PendingRequest>>>,
385 cashu_quotes: Option<&Arc<CashuQuoteState>>,
386 ack: DataPaymentAck,
387) {
388 let Some(cashu_quotes) = cashu_quotes else {
389 return;
390 };
391 let hash_hex = hash_to_hex(&ack.h);
392 let mut buffered_next = None;
393 let mut completed = None;
394 let mut failed = None;
395 let mut confirmed_amount = None;
396 let mut completed_data = None;
397
398 {
399 let mut pending = pending_requests.lock().await;
400 let Some(request) = pending.get_mut(&hash_hex) else {
401 return;
402 };
403 let Some(quoted) = request.quoted.as_mut() else {
404 return;
405 };
406 let Some(in_flight) = quoted.in_flight_payment.take() else {
407 return;
408 };
409 if ack.q != quoted.quote_id || ack.c != in_flight.chunk_index {
410 quoted.in_flight_payment = Some(in_flight);
411 return;
412 }
413
414 if !ack.a {
415 failed = Some(in_flight);
416 } else {
417 quoted.confirmed_payment_sat = quoted
418 .confirmed_payment_sat
419 .saturating_add(in_flight.amount_sat);
420 confirmed_amount = Some(in_flight.amount_sat);
421 if in_flight.final_chunk {
422 completed_data = Some(quoted.assembled_data.clone());
423 } else if let Some(next_chunk) = quoted.buffered_chunk.take() {
424 buffered_next = Some(next_chunk);
425 }
426 }
427
428 if let Some(data) = completed_data.take() {
429 let finished = pending
430 .remove(&hash_hex)
431 .expect("pending request must exist");
432 completed = Some((finished.response_tx, data));
433 }
434 }
435
436 if let Some(amount_sat) = confirmed_amount {
437 cashu_quotes
438 .record_paid_peer(&peer_id.to_string(), amount_sat)
439 .await;
440 }
441
442 if let Some(in_flight) = failed {
443 warn!(
444 "[Peer {}] Payment ack rejected chunk {} for {}: {}",
445 peer_short,
446 ack.c,
447 hash_hex,
448 ack.e.as_deref().unwrap_or("payment rejected")
449 );
450 let _ = cashu_quotes
451 .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
452 .await;
453 let removed = pending_requests.lock().await.remove(&hash_hex);
454 if let Some(removed) = removed {
455 let _ = removed.response_tx.send(None);
456 }
457 return;
458 }
459
460 if let Some((tx, data)) = completed {
461 let _ = tx.send(Some(data));
462 return;
463 }
464
465 if let Some(next_chunk) = buffered_next {
466 process_chunk_message(
467 peer_short,
468 peer_id,
469 dc,
470 pending_requests,
471 Some(cashu_quotes),
472 next_chunk,
473 )
474 .await;
475 }
476}
477
478async fn handle_payment_message(
479 peer_id: &PeerId,
480 cashu_quotes: Option<&Arc<CashuQuoteState>>,
481 req: &DataPayment,
482) -> PaymentHandlingOutcome {
483 let nack = |err: String| PaymentHandlingOutcome {
484 ack: DataPaymentAck {
485 h: req.h.clone(),
486 q: req.q,
487 c: req.c,
488 a: false,
489 e: Some(err),
490 },
491 next_chunk: None,
492 };
493
494 let Some(cashu_quotes) = cashu_quotes else {
495 return nack("Cashu settlement unavailable".to_string());
496 };
497
498 let expected = match cashu_quotes
499 .claim_expected_payment(
500 &peer_id.to_string(),
501 &req.h,
502 req.q,
503 req.c,
504 req.p,
505 req.m.as_deref(),
506 )
507 .await
508 {
509 Ok(expected) => expected,
510 Err(err) => {
511 cashu_quotes
512 .record_payment_default_from_peer(&peer_id.to_string())
513 .await;
514 return nack(err.to_string());
515 }
516 };
517
518 match cashu_quotes.receive_payment_token(&req.tok).await {
519 Ok(received) if received.amount_sat >= expected.payment_sat => {
520 if expected.mint_url.as_deref() != Some(received.mint_url.as_str()) {
521 cashu_quotes
522 .record_payment_default_from_peer(&peer_id.to_string())
523 .await;
524 return nack("Received payment mint did not match quoted mint".to_string());
525 }
526 if let Err(err) = cashu_quotes
527 .record_receipt_from_peer(
528 &peer_id.to_string(),
529 &received.mint_url,
530 received.amount_sat,
531 )
532 .await
533 {
534 warn!(
535 "[Peer {}] Failed to persist Cashu mint success for {}: {}",
536 peer_id.short(),
537 received.mint_url,
538 err
539 );
540 }
541
542 let next_chunk = if expected.final_chunk {
543 None
544 } else {
545 cashu_quotes
546 .next_outgoing_chunk(&peer_id.to_string(), &req.h, req.q)
547 .await
548 };
549
550 PaymentHandlingOutcome {
551 ack: DataPaymentAck {
552 h: req.h.clone(),
553 q: req.q,
554 c: req.c,
555 a: true,
556 e: None,
557 },
558 next_chunk,
559 }
560 }
561 Ok(_) => {
562 cashu_quotes
563 .record_payment_default_from_peer(&peer_id.to_string())
564 .await;
565 nack("Received payment amount was below the quoted amount".to_string())
566 }
567 Err(err) => {
568 if let Some(mint_url) = expected.mint_url.as_deref().or(req.m.as_deref()) {
569 let _ = cashu_quotes.record_mint_receive_failure(mint_url).await;
570 }
571 nack(err.to_string())
572 }
573 }
574}
575
576pub struct Peer {
578 pub peer_id: PeerId,
579 pub direction: PeerDirection,
580 pub created_at: std::time::Instant,
581 pub connected_at: Option<std::time::Instant>,
582
583 pc: Arc<RTCPeerConnection>,
584 pub data_channel: Arc<Mutex<Option<Arc<RTCDataChannel>>>>,
586 signaling_tx: mpsc::Sender<SignalingMessage>,
587 my_peer_id: PeerId,
588
589 store: Option<Arc<dyn ContentStore>>,
591
592 pub pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
594 pending_nostr_queries: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>>,
596
597 #[allow(dead_code)]
599 message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
600 #[allow(dead_code)]
601 message_rx: Option<mpsc::Receiver<(DataMessage, Option<Vec<u8>>)>>,
602
603 state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
605
606 nostr_relay: Option<Arc<NostrRelay>>,
608 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
610 cashu_quotes: Option<Arc<CashuQuoteState>>,
612 htl_config: PeerHTLConfig,
614}
615
616impl Peer {
617 pub async fn new(
619 peer_id: PeerId,
620 direction: PeerDirection,
621 my_peer_id: PeerId,
622 signaling_tx: mpsc::Sender<SignalingMessage>,
623 stun_servers: Vec<String>,
624 ) -> Result<Self> {
625 Self::new_with_store_and_events(
626 peer_id,
627 direction,
628 my_peer_id,
629 signaling_tx,
630 stun_servers,
631 None,
632 None,
633 None,
634 None,
635 None,
636 )
637 .await
638 }
639
640 pub async fn new_with_store(
642 peer_id: PeerId,
643 direction: PeerDirection,
644 my_peer_id: PeerId,
645 signaling_tx: mpsc::Sender<SignalingMessage>,
646 stun_servers: Vec<String>,
647 store: Option<Arc<dyn ContentStore>>,
648 ) -> Result<Self> {
649 Self::new_with_store_and_events(
650 peer_id,
651 direction,
652 my_peer_id,
653 signaling_tx,
654 stun_servers,
655 store,
656 None,
657 None,
658 None,
659 None,
660 )
661 .await
662 }
663
664 #[allow(clippy::too_many_arguments)]
666 pub(crate) async fn new_with_store_and_events(
667 peer_id: PeerId,
668 direction: PeerDirection,
669 my_peer_id: PeerId,
670 signaling_tx: mpsc::Sender<SignalingMessage>,
671 stun_servers: Vec<String>,
672 store: Option<Arc<dyn ContentStore>>,
673 state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
674 nostr_relay: Option<Arc<NostrRelay>>,
675 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
676 cashu_quotes: Option<Arc<CashuQuoteState>>,
677 ) -> Result<Self> {
678 let mut m = MediaEngine::default();
680 m.register_default_codecs()?;
681
682 let mut registry = Registry::new();
683 registry = register_default_interceptors(registry, &mut m)?;
684
685 let setting_engine = SettingEngine::default();
688 let api = APIBuilder::new()
691 .with_media_engine(m)
692 .with_interceptor_registry(registry)
693 .with_setting_engine(setting_engine)
694 .build();
695
696 let ice_servers: Vec<RTCIceServer> = stun_servers
698 .iter()
699 .map(|url| RTCIceServer {
700 urls: vec![url.clone()],
701 ..Default::default()
702 })
703 .collect();
704
705 let config = RTCConfiguration {
706 ice_servers,
707 ..Default::default()
708 };
709
710 let pc = Arc::new(api.new_peer_connection(config).await?);
711 let (message_tx, message_rx) = mpsc::channel(100);
712 Ok(Self {
713 peer_id,
714 direction,
715 created_at: std::time::Instant::now(),
716 connected_at: None,
717 pc,
718 data_channel: Arc::new(Mutex::new(None)),
719 signaling_tx,
720 my_peer_id,
721 store,
722 pending_requests: Arc::new(Mutex::new(HashMap::new())),
723 pending_nostr_queries: Arc::new(Mutex::new(HashMap::new())),
724 message_tx,
725 message_rx: Some(message_rx),
726 state_event_tx,
727 nostr_relay,
728 mesh_frame_tx,
729 cashu_quotes,
730 htl_config: PeerHTLConfig::random(),
731 })
732 }
733
734 pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
736 self.store = Some(store);
737 }
738
739 pub fn state(&self) -> RTCPeerConnectionState {
741 self.pc.connection_state()
742 }
743
744 pub fn signaling_state(&self) -> webrtc::peer_connection::signaling_state::RTCSignalingState {
746 self.pc.signaling_state()
747 }
748
749 pub fn is_connected(&self) -> bool {
751 self.pc.connection_state() == RTCPeerConnectionState::Connected
752 }
753
754 pub fn htl_config(&self) -> &PeerHTLConfig {
755 &self.htl_config
756 }
757
758 pub async fn setup_handlers(&self) -> Result<()> {
760 let peer_id = self.peer_id.clone();
761 let signaling_tx = self.signaling_tx.clone();
762 let my_peer_id_str = self.my_peer_id.to_string();
763 let target_peer_id = self.peer_id.to_string();
764
765 self.pc
767 .on_ice_candidate(Box::new(move |candidate: Option<RTCIceCandidate>| {
768 let signaling_tx = signaling_tx.clone();
769 let my_peer_id_str = my_peer_id_str.clone();
770 let target_peer_id = target_peer_id.clone();
771
772 Box::pin(async move {
773 if let Some(c) = candidate {
774 if let Ok(init) = c.to_json() {
775 info!(
776 "ICE candidate generated: {}",
777 &init.candidate[..init.candidate.len().min(60)]
778 );
779 let msg = SignalingMessage::Candidate {
780 peer_id: my_peer_id_str.clone(),
781 target_peer_id: target_peer_id.clone(),
782 candidate: init.candidate,
783 sdp_m_line_index: init.sdp_mline_index,
784 sdp_mid: init.sdp_mid,
785 };
786 if let Err(e) = signaling_tx.send(msg).await {
787 error!("Failed to send ICE candidate: {}", e);
788 }
789 }
790 }
791 })
792 }));
793
794 let peer_id_log = peer_id.clone();
796 let state_event_tx = self.state_event_tx.clone();
797 self.pc
798 .on_peer_connection_state_change(Box::new(move |state: RTCPeerConnectionState| {
799 let peer_id = peer_id_log.clone();
800 let state_event_tx = state_event_tx.clone();
801 Box::pin(async move {
802 info!("Peer {} connection state: {:?}", peer_id.short(), state);
803
804 if let Some(tx) = state_event_tx {
806 let event = match state {
807 RTCPeerConnectionState::Connected => {
808 Some(PeerStateEvent::Connected(peer_id))
809 }
810 RTCPeerConnectionState::Failed => Some(PeerStateEvent::Failed(peer_id)),
811 RTCPeerConnectionState::Disconnected
812 | RTCPeerConnectionState::Closed => {
813 Some(PeerStateEvent::Disconnected(peer_id))
814 }
815 _ => None,
816 };
817 if let Some(event) = event {
818 if let Err(e) = tx.send(event).await {
819 error!("Failed to send peer state event: {}", e);
820 }
821 }
822 }
823 })
824 }));
825
826 Ok(())
827 }
828
829 pub async fn connect(&self) -> Result<serde_json::Value> {
831 println!("[Peer {}] Creating data channel...", self.peer_id.short());
832 let dc_init = RTCDataChannelInit {
835 ordered: Some(false),
836 ..Default::default()
837 };
838 let dc = self
839 .pc
840 .create_data_channel("hashtree", Some(dc_init))
841 .await?;
842 println!(
843 "[Peer {}] Data channel created, setting up handlers...",
844 self.peer_id.short()
845 );
846 self.setup_data_channel(dc.clone()).await?;
847 println!(
848 "[Peer {}] Handlers set up, storing data channel...",
849 self.peer_id.short()
850 );
851 {
852 let mut dc_guard = self.data_channel.lock().await;
853 *dc_guard = Some(dc);
854 }
855 println!("[Peer {}] Data channel stored", self.peer_id.short());
856
857 let offer = self.pc.create_offer(None).await?;
860 let mut gathering_complete = self.pc.gathering_complete_promise().await;
861 self.pc.set_local_description(offer).await?;
862
863 let _ = tokio::time::timeout(
865 std::time::Duration::from_secs(10),
866 gathering_complete.recv(),
867 )
868 .await;
869
870 let local_desc = self
872 .pc
873 .local_description()
874 .await
875 .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
876
877 debug!(
878 "Offer created, SDP len: {}, ice_gathering: {:?}",
879 local_desc.sdp.len(),
880 self.pc.ice_gathering_state()
881 );
882
883 let offer_json = serde_json::json!({
885 "type": local_desc.sdp_type.to_string().to_lowercase(),
886 "sdp": local_desc.sdp
887 });
888
889 Ok(offer_json)
890 }
891
892 pub async fn handle_offer(&self, offer: serde_json::Value) -> Result<serde_json::Value> {
894 let sdp = offer
895 .get("sdp")
896 .and_then(|s| s.as_str())
897 .ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
898
899 let peer_id = self.peer_id.clone();
902 let message_tx = self.message_tx.clone();
903 let pending_requests = self.pending_requests.clone();
904 let pending_nostr_queries = self.pending_nostr_queries.clone();
905 let store = self.store.clone();
906 let data_channel_holder = self.data_channel.clone();
907 let nostr_relay = self.nostr_relay.clone();
908 let mesh_frame_tx = self.mesh_frame_tx.clone();
909 let cashu_quotes = self.cashu_quotes.clone();
910 let peer_pubkey = Some(self.peer_id.pubkey.clone());
911
912 self.pc
913 .on_data_channel(Box::new(move |dc: Arc<RTCDataChannel>| {
914 let peer_id = peer_id.clone();
915 let message_tx = message_tx.clone();
916 let pending_requests = pending_requests.clone();
917 let pending_nostr_queries = pending_nostr_queries.clone();
918 let store = store.clone();
919 let data_channel_holder = data_channel_holder.clone();
920 let nostr_relay = nostr_relay.clone();
921 let mesh_frame_tx = mesh_frame_tx.clone();
922 let cashu_quotes = cashu_quotes.clone();
923 let peer_pubkey = peer_pubkey.clone();
924
925 Box::pin(async move {
927 info!(
928 "Peer {} received data channel: {}",
929 peer_id.short(),
930 dc.label()
931 );
932
933 {
935 let mut dc_guard = data_channel_holder.lock().await;
936 *dc_guard = Some(dc.clone());
937 }
938
939 Self::setup_dc_handlers(
941 dc.clone(),
942 peer_id,
943 message_tx,
944 pending_requests,
945 pending_nostr_queries.clone(),
946 store,
947 nostr_relay,
948 mesh_frame_tx,
949 cashu_quotes,
950 peer_pubkey,
951 )
952 .await;
953 })
954 }));
955
956 let offer_desc = RTCSessionDescription::offer(sdp.to_string())?;
958 self.pc.set_remote_description(offer_desc).await?;
959
960 let answer = self.pc.create_answer(None).await?;
963 let mut gathering_complete = self.pc.gathering_complete_promise().await;
964 self.pc.set_local_description(answer).await?;
965
966 let _ = tokio::time::timeout(
968 std::time::Duration::from_secs(10),
969 gathering_complete.recv(),
970 )
971 .await;
972
973 let local_desc = self
975 .pc
976 .local_description()
977 .await
978 .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
979
980 debug!(
981 "Answer created, SDP len: {}, ice_gathering: {:?}",
982 local_desc.sdp.len(),
983 self.pc.ice_gathering_state()
984 );
985
986 let answer_json = serde_json::json!({
987 "type": local_desc.sdp_type.to_string().to_lowercase(),
988 "sdp": local_desc.sdp
989 });
990
991 Ok(answer_json)
992 }
993
994 pub async fn handle_answer(&self, answer: serde_json::Value) -> Result<()> {
996 let sdp = answer
997 .get("sdp")
998 .and_then(|s| s.as_str())
999 .ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
1000
1001 let answer_desc = RTCSessionDescription::answer(sdp.to_string())?;
1002 self.pc.set_remote_description(answer_desc).await?;
1003
1004 Ok(())
1005 }
1006
1007 pub async fn handle_candidate(&self, candidate: serde_json::Value) -> Result<()> {
1009 let candidate_str = candidate
1010 .get("candidate")
1011 .and_then(|c| c.as_str())
1012 .unwrap_or("");
1013
1014 let sdp_mid = candidate
1015 .get("sdpMid")
1016 .and_then(|m| m.as_str())
1017 .map(|s| s.to_string());
1018
1019 let sdp_mline_index = candidate
1020 .get("sdpMLineIndex")
1021 .and_then(|i| i.as_u64())
1022 .map(|i| i as u16);
1023
1024 if !candidate_str.is_empty() {
1025 use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
1026 let init = RTCIceCandidateInit {
1027 candidate: candidate_str.to_string(),
1028 sdp_mid,
1029 sdp_mline_index,
1030 username_fragment: candidate
1031 .get("usernameFragment")
1032 .and_then(|u| u.as_str())
1033 .map(|s| s.to_string()),
1034 };
1035 self.pc.add_ice_candidate(init).await?;
1036 }
1037
1038 Ok(())
1039 }
1040
1041 async fn setup_data_channel(&self, dc: Arc<RTCDataChannel>) -> Result<()> {
1043 let peer_id = self.peer_id.clone();
1044 let message_tx = self.message_tx.clone();
1045 let pending_requests = self.pending_requests.clone();
1046 let store = self.store.clone();
1047 let nostr_relay = self.nostr_relay.clone();
1048 let mesh_frame_tx = self.mesh_frame_tx.clone();
1049 let cashu_quotes = self.cashu_quotes.clone();
1050 let peer_pubkey = Some(self.peer_id.pubkey.clone());
1051
1052 Self::setup_dc_handlers(
1053 dc,
1054 peer_id,
1055 message_tx,
1056 pending_requests,
1057 self.pending_nostr_queries.clone(),
1058 store,
1059 nostr_relay,
1060 mesh_frame_tx,
1061 cashu_quotes,
1062 peer_pubkey,
1063 )
1064 .await;
1065 Ok(())
1066 }
1067
1068 #[allow(clippy::too_many_arguments)]
1070 async fn setup_dc_handlers(
1071 dc: Arc<RTCDataChannel>,
1072 peer_id: PeerId,
1073 message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
1074 pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
1075 pending_nostr_queries: Arc<
1076 Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>,
1077 >,
1078 store: Option<Arc<dyn ContentStore>>,
1079 nostr_relay: Option<Arc<NostrRelay>>,
1080 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
1081 cashu_quotes: Option<Arc<CashuQuoteState>>,
1082 peer_pubkey: Option<String>,
1083 ) {
1084 let label = dc.label().to_string();
1085 let peer_short = peer_id.short();
1086
1087 let _pending_binary: Arc<Mutex<Option<u32>>> = Arc::new(Mutex::new(None));
1089
1090 let open_notify = nostr_relay.as_ref().map(|_| Arc::new(Notify::new()));
1091 if let Some(ref notify) = open_notify {
1092 if dc.ready_state() == RTCDataChannelState::Open {
1093 notify.notify_one();
1095 }
1096 }
1097
1098 let mut nostr_client_id: Option<u64> = None;
1099 if let Some(relay) = nostr_relay.clone() {
1100 let client_id = relay.next_client_id();
1101 let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
1102 relay
1103 .register_client(client_id, nostr_tx, peer_pubkey.clone())
1104 .await;
1105 nostr_client_id = Some(client_id);
1106
1107 if let Some(notify) = open_notify.clone() {
1108 let dc_for_send = dc.clone();
1109 tokio::spawn(async move {
1110 notify.notified().await;
1111 while let Some(text) = nostr_rx.recv().await {
1112 if dc_for_send.send_text(text).await.is_err() {
1113 break;
1114 }
1115 }
1116 });
1117 }
1118 }
1119
1120 if let (Some(relay), Some(client_id)) = (nostr_relay.clone(), nostr_client_id) {
1121 dc.on_close(Box::new(move || {
1122 let relay = relay.clone();
1123 Box::pin(async move {
1124 relay.unregister_client(client_id).await;
1125 })
1126 }));
1127 }
1128
1129 let open_notify_clone = open_notify.clone();
1130 let peer_short_open = peer_short.clone();
1131 let label_clone = label.clone();
1132 dc.on_open(Box::new(move || {
1133 let peer_short_open = peer_short_open.clone();
1134 let label_clone = label_clone.clone();
1135 let open_notify = open_notify_clone.clone();
1136 Box::pin(async move {
1138 info!(
1139 "[Peer {}] Data channel '{}' open",
1140 peer_short_open, label_clone
1141 );
1142 if let Some(notify) = open_notify {
1143 notify.notify_one();
1144 }
1145 })
1146 }));
1147
1148 let dc_for_msg = dc.clone();
1149 let peer_short_msg = peer_short.clone();
1150 let _pending_binary_clone = _pending_binary.clone();
1151 let store_clone = store.clone();
1152 let nostr_relay_for_msg = nostr_relay.clone();
1153 let nostr_client_id_for_msg = nostr_client_id;
1154 let pending_nostr_queries_for_msg = pending_nostr_queries.clone();
1155 let mesh_frame_tx_for_msg = mesh_frame_tx.clone();
1156 let peer_id_for_msg = peer_id.clone();
1157
1158 dc.on_message(Box::new(move |msg: DataChannelMessage| {
1159 let dc = dc_for_msg.clone();
1160 let peer_short = peer_short_msg.clone();
1161 let pending_requests = pending_requests.clone();
1162 let _pending_binary = _pending_binary_clone.clone();
1163 let _message_tx = message_tx.clone();
1164 let store = store_clone.clone();
1165 let nostr_relay = nostr_relay_for_msg.clone();
1166 let nostr_client_id = nostr_client_id_for_msg;
1167 let pending_nostr_queries = pending_nostr_queries_for_msg.clone();
1168 let mesh_frame_tx = mesh_frame_tx_for_msg.clone();
1169 let cashu_quotes = cashu_quotes.clone();
1170 let peer_id = peer_id_for_msg.clone();
1171 let msg_data = msg.data.clone();
1172
1173 Box::pin(async move {
1175 if msg.is_string {
1176 if let Ok(text) = std::str::from_utf8(&msg_data) {
1177 if let Ok(mesh_frame) = serde_json::from_str::<MeshNostrFrame>(text) {
1178 match validate_mesh_frame(&mesh_frame) {
1179 Ok(()) => {
1180 if let Some(tx) = mesh_frame_tx {
1181 let _ = tx.send((peer_id.clone(), mesh_frame)).await;
1182 }
1183 return;
1184 }
1185 Err(reason) => {
1186 debug!(
1187 "[Peer {}] Ignoring invalid mesh frame: {}",
1188 peer_short, reason
1189 );
1190 }
1191 }
1192 }
1193
1194 if let Ok(relay_msg) = NostrRelayMessage::from_json(text) {
1196 if let Some(sub_id) = relay_subscription_id(&relay_msg) {
1197 let sender = {
1198 let pending = pending_nostr_queries.lock().await;
1199 pending.get(&sub_id).cloned()
1200 };
1201 if let Some(tx) = sender {
1202 debug!(
1203 "[Peer {}] Routed Nostr relay message for subscription {}",
1204 peer_short, sub_id
1205 );
1206 let _ = tx.send(relay_msg);
1207 return;
1208 } else {
1209 debug!(
1210 "[Peer {}] Dropping Nostr relay message for unknown subscription {}",
1211 peer_short, sub_id
1212 );
1213 }
1214 }
1215 }
1216
1217 if let Some(relay) = nostr_relay {
1219 if let Ok(nostr_msg) = NostrClientMessage::from_json(text) {
1220 if let Some(client_id) = nostr_client_id {
1221 relay.handle_client_message(client_id, nostr_msg).await;
1222 }
1223 }
1224 }
1225 }
1226 return;
1227 }
1228 debug!(
1230 "[Peer {}] Received {} bytes on data channel",
1231 peer_short,
1232 msg_data.len()
1233 );
1234 match parse_message(&msg_data) {
1235 Ok(data_msg) => match data_msg {
1236 DataMessage::Request(req) => {
1237 let hash_hex = hash_to_hex(&req.h);
1238 let hash_short = &hash_hex[..8.min(hash_hex.len())];
1239 info!("[Peer {}] Received request for {}", peer_short, hash_short);
1240
1241 if let Some(cashu_quotes) = cashu_quotes.as_ref() {
1242 if cashu_quotes
1243 .should_refuse_requests_from_peer(&peer_id.to_string())
1244 .await
1245 {
1246 info!(
1247 "[Peer {}] Refusing request from peer with unpaid defaults",
1248 peer_short
1249 );
1250 return;
1251 }
1252 }
1253
1254 let quoted_settlement = if let Some(quote_id) = req.q {
1255 let Some(cashu_quotes) = cashu_quotes.as_ref() else {
1256 info!(
1257 "[Peer {}] Ignoring quoted request without Cashu settlement state",
1258 peer_short
1259 );
1260 return;
1261 };
1262 match cashu_quotes
1263 .take_valid_quote(&peer_id.to_string(), &req.h, quote_id)
1264 .await
1265 {
1266 Some(settlement) => Some((quote_id, settlement)),
1267 None => {
1268 info!(
1269 "[Peer {}] Ignoring request with invalid or expired quote {}",
1270 peer_short, quote_id
1271 );
1272 return;
1273 }
1274 }
1275 } else {
1276 None
1277 };
1278
1279 let data = if let Some(ref store) = store {
1281 match store.get(&hash_hex) {
1282 Ok(Some(data)) => {
1283 info!(
1284 "[Peer {}] Found {} in store ({} bytes)",
1285 peer_short,
1286 hash_short,
1287 data.len()
1288 );
1289 Some(data)
1290 }
1291 Ok(None) => {
1292 info!(
1293 "[Peer {}] Hash {} not in store",
1294 peer_short, hash_short
1295 );
1296 None
1297 }
1298 Err(e) => {
1299 warn!("[Peer {}] Store error: {}", peer_short, e);
1300 None
1301 }
1302 }
1303 } else {
1304 warn!(
1305 "[Peer {}] No store configured - cannot serve requests",
1306 peer_short
1307 );
1308 None
1309 };
1310
1311 if let Some(data) = data {
1313 let data_len = data.len();
1314 if let (Some(cashu_quotes), Some((quote_id, settlement))) =
1315 (cashu_quotes.as_ref(), quoted_settlement)
1316 {
1317 match cashu_quotes
1318 .prepare_quoted_transfer(
1319 &peer_id.to_string(),
1320 &req.h,
1321 quote_id,
1322 &settlement,
1323 data,
1324 )
1325 .await
1326 {
1327 Some((first_chunk, first_expected)) => {
1328 if send_quoted_chunk(
1329 &dc,
1330 &peer_id,
1331 &peer_short,
1332 cashu_quotes,
1333 first_chunk,
1334 first_expected,
1335 )
1336 .await
1337 {
1338 info!(
1339 "[Peer {}] Started quoted chunked response for {} ({} bytes)",
1340 peer_short, hash_short, data_len
1341 );
1342 }
1343 }
1344 None => {
1345 warn!(
1346 "[Peer {}] Failed to prepare quoted transfer for {}",
1347 peer_short, hash_short
1348 );
1349 }
1350 }
1351 } else {
1352 let response = DataResponse {
1353 h: req.h,
1354 d: data,
1355 i: None,
1356 n: None,
1357 };
1358 if let Ok(wire) = encode_response(&response) {
1359 if let Err(e) = dc.send(&Bytes::from(wire)).await {
1360 error!(
1361 "[Peer {}] Failed to send response: {}",
1362 peer_short, e
1363 );
1364 } else {
1365 info!(
1366 "[Peer {}] Sent response for {} ({} bytes)",
1367 peer_short, hash_short, data_len
1368 );
1369 }
1370 }
1371 }
1372 } else {
1373 info!("[Peer {}] Content not found for {}", peer_short, hash_short);
1374 }
1375 }
1376 DataMessage::Response(res) => {
1377 let hash_hex = hash_to_hex(&res.h);
1378 let hash_short = &hash_hex[..8.min(hash_hex.len())];
1379 debug!(
1380 "[Peer {}] Received response for {} ({} bytes)",
1381 peer_short,
1382 hash_short,
1383 res.d.len()
1384 );
1385
1386 let mut pending = pending_requests.lock().await;
1388 if let Some(req) = pending.remove(&hash_hex) {
1389 let _ = req.response_tx.send(Some(res.d));
1390 }
1391 }
1392 DataMessage::QuoteRequest(req) => {
1393 let response = handle_quote_request_message(
1394 &peer_short,
1395 &peer_id,
1396 &store,
1397 cashu_quotes.as_ref(),
1398 &req,
1399 )
1400 .await;
1401 if let Some(response) = response {
1402 if let Ok(wire) = encode_quote_response(&response) {
1403 if let Err(e) = dc.send(&Bytes::from(wire)).await {
1404 warn!(
1405 "[Peer {}] Failed to send quote response: {}",
1406 peer_short, e
1407 );
1408 }
1409 }
1410 }
1411 }
1412 DataMessage::QuoteResponse(res) => {
1413 if let Some(cashu_quotes) = cashu_quotes.as_ref() {
1414 let _ = cashu_quotes
1415 .handle_quote_response(&peer_id.to_string(), res)
1416 .await;
1417 }
1418 }
1419 DataMessage::Chunk(chunk) => {
1420 process_chunk_message(
1421 &peer_short,
1422 &peer_id,
1423 &dc,
1424 &pending_requests,
1425 cashu_quotes.as_ref(),
1426 chunk,
1427 )
1428 .await;
1429 }
1430 DataMessage::Payment(req) => {
1431 let outcome =
1432 handle_payment_message(&peer_id, cashu_quotes.as_ref(), &req).await;
1433 if let Ok(wire) = encode_payment_ack(&outcome.ack) {
1434 if let Err(e) = dc.send(&Bytes::from(wire)).await {
1435 warn!(
1436 "[Peer {}] Failed to send payment ack: {}",
1437 peer_short, e
1438 );
1439 }
1440 }
1441 if let (Some(cashu_quotes), Some((next_chunk, next_expected))) =
1442 (cashu_quotes.as_ref(), outcome.next_chunk)
1443 {
1444 let _ = send_quoted_chunk(
1445 &dc,
1446 &peer_id,
1447 &peer_short,
1448 cashu_quotes,
1449 next_chunk,
1450 next_expected,
1451 )
1452 .await;
1453 }
1454 }
1455 DataMessage::PaymentAck(res) => {
1456 handle_payment_ack_message(
1457 &peer_short,
1458 &peer_id,
1459 &dc,
1460 &pending_requests,
1461 cashu_quotes.as_ref(),
1462 res,
1463 )
1464 .await;
1465 }
1466 },
1467 Err(e) => {
1468 warn!("[Peer {}] Failed to parse message: {:?}", peer_short, e);
1469 let hex_dump: String = msg_data
1471 .iter()
1472 .take(50)
1473 .map(|b| format!("{:02x}", b))
1474 .collect();
1475 warn!("[Peer {}] Message hex: {}", peer_short, hex_dump);
1476 }
1477 }
1478 })
1479 }));
1480 }
1481
1482 pub fn has_data_channel(&self) -> bool {
1484 self.data_channel
1486 .try_lock()
1487 .map(|guard| {
1488 guard
1489 .as_ref()
1490 .map(|dc| dc.ready_state() == RTCDataChannelState::Open)
1491 .unwrap_or(false)
1492 })
1493 .unwrap_or(false)
1494 }
1495
1496 pub async fn request(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
1498 self.request_with_timeout(hash_hex, std::time::Duration::from_secs(10))
1499 .await
1500 }
1501
1502 pub async fn request_with_timeout(
1504 &self,
1505 hash_hex: &str,
1506 timeout: std::time::Duration,
1507 ) -> Result<Option<Vec<u8>>> {
1508 let dc_guard = self.data_channel.lock().await;
1509 let dc = dc_guard
1510 .as_ref()
1511 .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1512 .clone();
1513 drop(dc_guard); let hash = hex::decode(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hex hash: {}", e))?;
1517
1518 let (tx, rx) = oneshot::channel();
1520
1521 {
1523 let mut pending = self.pending_requests.lock().await;
1524 pending.insert(
1525 hash_hex.to_string(),
1526 PendingRequest::standard(hash.clone(), tx),
1527 );
1528 }
1529
1530 let req = DataRequest {
1532 h: hash,
1533 htl: BLOB_REQUEST_POLICY.max_htl,
1534 q: None,
1535 };
1536 let wire = encode_request(&req)?;
1537 dc.send(&Bytes::from(wire)).await?;
1538
1539 debug!(
1540 "[Peer {}] Sent request for {}",
1541 self.peer_id.short(),
1542 &hash_hex[..8.min(hash_hex.len())]
1543 );
1544
1545 match tokio::time::timeout(timeout, rx).await {
1547 Ok(Ok(data)) => Ok(data),
1548 Ok(Err(_)) => {
1549 Ok(None)
1551 }
1552 Err(_) => {
1553 let mut pending = self.pending_requests.lock().await;
1555 pending.remove(hash_hex);
1556 Ok(None)
1557 }
1558 }
1559 }
1560
1561 pub async fn query_nostr_events(
1564 &self,
1565 filters: Vec<NostrFilter>,
1566 timeout: std::time::Duration,
1567 ) -> Result<Vec<nostr::Event>> {
1568 let dc_guard = self.data_channel.lock().await;
1569 let dc = dc_guard
1570 .as_ref()
1571 .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1572 .clone();
1573 drop(dc_guard);
1574
1575 let subscription_id = NostrSubscriptionId::generate();
1576 let subscription_key = subscription_id.to_string();
1577 let (tx, mut rx) = mpsc::unbounded_channel::<NostrRelayMessage>();
1578
1579 {
1580 let mut pending = self.pending_nostr_queries.lock().await;
1581 pending.insert(subscription_key.clone(), tx);
1582 }
1583
1584 let req = NostrClientMessage::req(subscription_id.clone(), filters);
1585 if let Err(e) = dc.send_text(req.as_json()).await {
1586 let mut pending = self.pending_nostr_queries.lock().await;
1587 pending.remove(&subscription_key);
1588 return Err(e.into());
1589 }
1590 debug!(
1591 "[Peer {}] Sent Nostr REQ subscription {}",
1592 self.peer_id.short(),
1593 subscription_id
1594 );
1595
1596 let mut events = Vec::new();
1597 let deadline = tokio::time::Instant::now() + timeout;
1598
1599 loop {
1600 let now = tokio::time::Instant::now();
1601 if now >= deadline {
1602 break;
1603 }
1604 let remaining = deadline - now;
1605
1606 let next = tokio::time::timeout(remaining, rx.recv()).await;
1607 match next {
1608 Ok(Some(NostrRelayMessage::Event {
1609 subscription_id: sid,
1610 event,
1611 })) if sid == subscription_id => {
1612 debug!(
1613 "[Peer {}] Received Nostr EVENT for subscription {}",
1614 self.peer_id.short(),
1615 subscription_id
1616 );
1617 events.push(*event);
1618 }
1619 Ok(Some(NostrRelayMessage::EndOfStoredEvents(sid))) if sid == subscription_id => {
1620 debug!(
1621 "[Peer {}] Received Nostr EOSE for subscription {}",
1622 self.peer_id.short(),
1623 subscription_id
1624 );
1625 break;
1626 }
1627 Ok(Some(NostrRelayMessage::Closed {
1628 subscription_id: sid,
1629 message,
1630 })) if sid == subscription_id => {
1631 warn!(
1632 "[Peer {}] Nostr query closed for subscription {}: {}",
1633 self.peer_id.short(),
1634 subscription_id,
1635 message
1636 );
1637 break;
1638 }
1639 Ok(Some(_)) => {}
1640 Ok(None) => break,
1641 Err(_) => {
1642 warn!(
1643 "[Peer {}] Nostr query timed out for subscription {}",
1644 self.peer_id.short(),
1645 subscription_id
1646 );
1647 break;
1648 }
1649 }
1650 }
1651
1652 let close = NostrClientMessage::close(subscription_id.clone());
1653 let _ = dc.send_text(close.as_json()).await;
1654
1655 let mut pending = self.pending_nostr_queries.lock().await;
1656 pending.remove(&subscription_key);
1657 debug!(
1658 "[Peer {}] Nostr query subscription {} collected {} event(s)",
1659 self.peer_id.short(),
1660 subscription_id,
1661 events.len()
1662 );
1663
1664 Ok(events)
1665 }
1666
1667 pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
1669 let dc_guard = self.data_channel.lock().await;
1670 let dc = dc_guard
1671 .as_ref()
1672 .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1673 .clone();
1674 drop(dc_guard);
1675
1676 let text = serde_json::to_string(frame)?;
1677 dc.send_text(text).await?;
1678 Ok(())
1679 }
1680
1681 pub async fn send_message(&self, msg: &DataMessage) -> Result<()> {
1683 let dc_guard = self.data_channel.lock().await;
1684 if let Some(ref dc) = *dc_guard {
1685 let wire = encode_message(msg)?;
1686 dc.send(&Bytes::from(wire)).await?;
1687 }
1688 Ok(())
1689 }
1690
1691 pub async fn close(&self) -> Result<()> {
1693 {
1694 let dc_guard = self.data_channel.lock().await;
1695 if let Some(ref dc) = *dc_guard {
1696 dc.close().await?;
1697 }
1698 }
1699 self.pc.close().await?;
1700 Ok(())
1701 }
1702}
1703
1704fn relay_subscription_id(msg: &NostrRelayMessage) -> Option<String> {
1705 match msg {
1706 NostrRelayMessage::Event {
1707 subscription_id, ..
1708 } => Some(subscription_id.to_string()),
1709 NostrRelayMessage::EndOfStoredEvents(subscription_id) => Some(subscription_id.to_string()),
1710 NostrRelayMessage::Closed {
1711 subscription_id, ..
1712 } => Some(subscription_id.to_string()),
1713 NostrRelayMessage::Count {
1714 subscription_id, ..
1715 } => Some(subscription_id.to_string()),
1716 _ => None,
1717 }
1718}
1719
1720#[async_trait]
1721impl RoutedPeerLink for Peer {
1722 async fn send(&self, data: Vec<u8>) -> std::result::Result<(), RoutedTransportError> {
1723 let dc = self
1724 .data_channel
1725 .lock()
1726 .await
1727 .as_ref()
1728 .cloned()
1729 .ok_or(RoutedTransportError::NotConnected)?;
1730 dc.send(&Bytes::from(data))
1731 .await
1732 .map(|_| ())
1733 .map_err(|e| RoutedTransportError::SendFailed(e.to_string()))
1734 }
1735
1736 async fn recv(&self) -> Option<Vec<u8>> {
1737 None
1738 }
1739
1740 fn try_recv(&self) -> Option<Vec<u8>> {
1741 None
1742 }
1743
1744 fn is_open(&self) -> bool {
1745 self.has_data_channel()
1746 }
1747
1748 async fn close(&self) {
1749 let _ = Peer::close(self).await;
1750 }
1751}