1use anyhow::Result;
4use bytes::Bytes;
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::{mpsc, oneshot, Mutex, Notify};
8use tracing::{debug, error, info, warn};
9use webrtc::api::interceptor_registry::register_default_interceptors;
10use webrtc::api::media_engine::MediaEngine;
11use webrtc::api::setting_engine::SettingEngine;
12use webrtc::api::APIBuilder;
13use webrtc::data_channel::data_channel_init::RTCDataChannelInit;
14use webrtc::data_channel::data_channel_message::DataChannelMessage;
15use webrtc::data_channel::data_channel_state::RTCDataChannelState;
16use webrtc::data_channel::RTCDataChannel;
17use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
18use webrtc::ice_transport::ice_server::RTCIceServer;
19use webrtc::interceptor::registry::Registry;
20use webrtc::peer_connection::configuration::RTCConfiguration;
21use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
22use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
23use webrtc::peer_connection::RTCPeerConnection;
24
25use super::cashu::{CashuQuoteState, ExpectedSettlement};
26use super::types::{
27 encode_chunk, encode_message, encode_payment, encode_payment_ack, encode_quote_response,
28 encode_request, encode_response, hash_to_hex, parse_message, validate_mesh_frame, DataChunk,
29 DataMessage, DataPayment, DataPaymentAck, DataQuoteRequest, DataRequest, DataResponse,
30 MeshNostrFrame, PeerDirection, PeerHTLConfig, PeerId, PeerStateEvent, SignalingMessage,
31 BLOB_REQUEST_POLICY,
32};
33use crate::nostr_relay::NostrRelay;
34use nostr::{
35 ClientMessage as NostrClientMessage, Filter as NostrFilter, JsonUtil as NostrJsonUtil,
36 RelayMessage as NostrRelayMessage, SubscriptionId as NostrSubscriptionId,
37};
38
39pub trait ContentStore: Send + Sync + 'static {
41 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>>;
43}
44
45pub struct PendingRequest {
47 pub hash: Vec<u8>,
48 pub response_tx: oneshot::Sender<Option<Vec<u8>>>,
49 pub quoted: Option<PendingQuotedRequest>,
50}
51
52pub struct PendingQuotedRequest {
53 pub quote_id: u64,
54 pub mint_url: String,
55 pub total_payment_sat: u64,
56 pub confirmed_payment_sat: u64,
57 pub next_chunk_index: u32,
58 pub total_chunks: Option<u32>,
59 pub assembled_data: Vec<u8>,
60 pub in_flight_payment: Option<PendingChunkPayment>,
61 pub buffered_chunk: Option<DataChunk>,
62}
63
64pub struct PendingChunkPayment {
65 pub chunk_index: u32,
66 pub amount_sat: u64,
67 pub mint_url: String,
68 pub operation_id: String,
69 pub final_chunk: bool,
70}
71
72impl PendingRequest {
73 pub fn standard(hash: Vec<u8>, response_tx: oneshot::Sender<Option<Vec<u8>>>) -> Self {
74 Self {
75 hash,
76 response_tx,
77 quoted: None,
78 }
79 }
80
81 pub fn quoted(
82 hash: Vec<u8>,
83 response_tx: oneshot::Sender<Option<Vec<u8>>>,
84 quote_id: u64,
85 mint_url: String,
86 total_payment_sat: u64,
87 ) -> Self {
88 Self {
89 hash,
90 response_tx,
91 quoted: Some(PendingQuotedRequest {
92 quote_id,
93 mint_url,
94 total_payment_sat,
95 confirmed_payment_sat: 0,
96 next_chunk_index: 0,
97 total_chunks: None,
98 assembled_data: Vec::new(),
99 in_flight_payment: None,
100 buffered_chunk: None,
101 }),
102 }
103 }
104}
105
106async fn handle_quote_request_message(
107 peer_short: &str,
108 peer_id: &PeerId,
109 store: &Option<Arc<dyn ContentStore>>,
110 cashu_quotes: Option<&Arc<CashuQuoteState>>,
111 req: &DataQuoteRequest,
112) -> Option<super::types::DataQuoteResponse> {
113 let Some(cashu_quotes) = cashu_quotes else {
114 debug!(
115 "[Peer {}] Ignoring quote request without Cashu policy",
116 peer_short
117 );
118 return None;
119 };
120
121 if cashu_quotes
122 .should_refuse_requests_from_peer(&peer_id.to_string())
123 .await
124 {
125 return Some(
126 cashu_quotes
127 .build_quote_response(&peer_id.to_string(), req, false)
128 .await,
129 );
130 }
131
132 let hash_hex = hash_to_hex(&req.h);
133 let can_serve = if let Some(store) = store {
134 match store.get(&hash_hex) {
135 Ok(Some(_)) => true,
136 Ok(None) => false,
137 Err(e) => {
138 warn!("[Peer {}] Store error during quote: {}", peer_short, e);
139 false
140 }
141 }
142 } else {
143 false
144 };
145
146 Some(
147 cashu_quotes
148 .build_quote_response(&peer_id.to_string(), req, can_serve)
149 .await,
150 )
151}
152
153struct PaymentHandlingOutcome {
154 ack: DataPaymentAck,
155 next_chunk: Option<(DataChunk, ExpectedSettlement)>,
156}
157
158async fn send_quoted_chunk(
159 dc: &Arc<RTCDataChannel>,
160 peer_id: &PeerId,
161 peer_short: &str,
162 cashu_quotes: &Arc<CashuQuoteState>,
163 chunk: DataChunk,
164 expected: ExpectedSettlement,
165) -> bool {
166 let hash_hex = hash_to_hex(&chunk.h);
167 let wire = match encode_chunk(&chunk) {
168 Ok(wire) => wire,
169 Err(err) => {
170 warn!(
171 "[Peer {}] Failed to encode quoted chunk {} for quote {}: {}",
172 peer_short, chunk.c, chunk.q, err
173 );
174 return false;
175 }
176 };
177
178 if let Err(err) = dc.send(&Bytes::from(wire)).await {
179 warn!(
180 "[Peer {}] Failed to send quoted chunk {} for quote {}: {}",
181 peer_short, chunk.c, chunk.q, err
182 );
183 return false;
184 }
185
186 cashu_quotes
187 .register_expected_payment(peer_id.to_string(), hash_hex, chunk.q, expected)
188 .await;
189 true
190}
191
192async fn fail_pending_request(
193 pending_requests: &Arc<Mutex<HashMap<String, PendingRequest>>>,
194 cashu_quotes: Option<&Arc<CashuQuoteState>>,
195 hash_hex: &str,
196) {
197 let pending = pending_requests.lock().await.remove(hash_hex);
198 let Some(pending) = pending else {
199 return;
200 };
201
202 if let (Some(cashu_quotes), Some(quoted)) = (cashu_quotes, pending.quoted) {
203 if let Some(in_flight) = quoted.in_flight_payment {
204 let _ = cashu_quotes
205 .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
206 .await;
207 }
208 }
209 let _ = pending.response_tx.send(None);
210}
211
212async fn process_chunk_message(
213 peer_short: &str,
214 _peer_id: &PeerId,
215 dc: &Arc<RTCDataChannel>,
216 pending_requests: &Arc<Mutex<HashMap<String, PendingRequest>>>,
217 cashu_quotes: Option<&Arc<CashuQuoteState>>,
218 chunk: DataChunk,
219) {
220 let hash_hex = hash_to_hex(&chunk.h);
221 let Some(cashu_quotes) = cashu_quotes else {
222 fail_pending_request(pending_requests, None, &hash_hex).await;
223 return;
224 };
225
226 enum ChunkAction {
227 BufferOnly,
228 Fail,
229 Pay {
230 mint_url: String,
231 amount_sat: u64,
232 final_chunk: bool,
233 },
234 }
235
236 let action = {
237 let mut pending = pending_requests.lock().await;
238 let Some(request) = pending.get_mut(&hash_hex) else {
239 return;
240 };
241 match request.quoted.as_mut() {
242 None => ChunkAction::Fail,
243 Some(quoted) if quoted.quote_id != chunk.q || chunk.n == 0 => ChunkAction::Fail,
244 Some(quoted) => {
245 if let Some(in_flight) = quoted.in_flight_payment.as_ref() {
246 let expected_buffer_index = in_flight.chunk_index + 1;
247 if chunk.c == expected_buffer_index && quoted.buffered_chunk.is_none() {
248 quoted.buffered_chunk = Some(chunk.clone());
249 ChunkAction::BufferOnly
250 } else {
251 ChunkAction::Fail
252 }
253 } else if chunk.c != quoted.next_chunk_index {
254 ChunkAction::Fail
255 } else if let Some(total_chunks) = quoted.total_chunks {
256 if total_chunks != chunk.n {
257 ChunkAction::Fail
258 } else {
259 let next_total = quoted.confirmed_payment_sat.saturating_add(chunk.p);
260 if next_total > quoted.total_payment_sat
261 || (chunk.c + 1 == chunk.n && next_total != quoted.total_payment_sat)
262 {
263 ChunkAction::Fail
264 } else {
265 quoted.total_chunks = Some(chunk.n);
266 quoted.assembled_data.extend_from_slice(&chunk.d);
267 quoted.next_chunk_index += 1;
268 ChunkAction::Pay {
269 mint_url: quoted.mint_url.clone(),
270 amount_sat: chunk.p,
271 final_chunk: chunk.c + 1 == chunk.n,
272 }
273 }
274 }
275 } else {
276 let next_total = quoted.confirmed_payment_sat.saturating_add(chunk.p);
277 if next_total > quoted.total_payment_sat
278 || (chunk.c + 1 == chunk.n && next_total != quoted.total_payment_sat)
279 {
280 ChunkAction::Fail
281 } else {
282 quoted.total_chunks = Some(chunk.n);
283 quoted.assembled_data.extend_from_slice(&chunk.d);
284 quoted.next_chunk_index += 1;
285 ChunkAction::Pay {
286 mint_url: quoted.mint_url.clone(),
287 amount_sat: chunk.p,
288 final_chunk: chunk.c + 1 == chunk.n,
289 }
290 }
291 }
292 }
293 }
294 };
295
296 match action {
297 ChunkAction::BufferOnly => (),
298 ChunkAction::Fail => {
299 warn!(
300 "[Peer {}] Invalid quoted chunk {} for hash {}",
301 peer_short, chunk.c, hash_hex
302 );
303 fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
304 }
305 ChunkAction::Pay {
306 mint_url,
307 amount_sat,
308 final_chunk,
309 } => {
310 let payment = match cashu_quotes
311 .create_payment_token(&mint_url, amount_sat)
312 .await
313 {
314 Ok(payment) => payment,
315 Err(err) => {
316 warn!(
317 "[Peer {}] Failed to create payment token for chunk {} of {}: {}",
318 peer_short, chunk.c, hash_hex, err
319 );
320 fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
321 return;
322 }
323 };
324
325 {
326 let mut pending = pending_requests.lock().await;
327 let Some(request) = pending.get_mut(&hash_hex) else {
328 let _ = cashu_quotes
329 .revoke_payment_token(&payment.mint_url, &payment.operation_id)
330 .await;
331 return;
332 };
333 let Some(quoted) = request.quoted.as_mut() else {
334 let _ = cashu_quotes
335 .revoke_payment_token(&payment.mint_url, &payment.operation_id)
336 .await;
337 return;
338 };
339 quoted.in_flight_payment = Some(PendingChunkPayment {
340 chunk_index: chunk.c,
341 amount_sat,
342 mint_url: payment.mint_url.clone(),
343 operation_id: payment.operation_id.clone(),
344 final_chunk,
345 });
346 }
347
348 let payment_msg = DataPayment {
349 h: chunk.h,
350 q: chunk.q,
351 c: chunk.c,
352 p: amount_sat,
353 m: Some(payment.mint_url.clone()),
354 tok: payment.token,
355 };
356 let wire = match encode_payment(&payment_msg) {
357 Ok(wire) => wire,
358 Err(err) => {
359 warn!(
360 "[Peer {}] Failed to encode payment for chunk {} of {}: {}",
361 peer_short, chunk.c, hash_hex, err
362 );
363 fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
364 return;
365 }
366 };
367 if let Err(err) = dc.send(&Bytes::from(wire)).await {
368 warn!(
369 "[Peer {}] Failed to send payment for chunk {} of {}: {}",
370 peer_short, chunk.c, hash_hex, err
371 );
372 fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
373 }
374 }
375 }
376}
377
378async fn handle_payment_ack_message(
379 peer_short: &str,
380 peer_id: &PeerId,
381 dc: &Arc<RTCDataChannel>,
382 pending_requests: &Arc<Mutex<HashMap<String, PendingRequest>>>,
383 cashu_quotes: Option<&Arc<CashuQuoteState>>,
384 ack: DataPaymentAck,
385) {
386 let Some(cashu_quotes) = cashu_quotes else {
387 return;
388 };
389 let hash_hex = hash_to_hex(&ack.h);
390 let mut buffered_next = None;
391 let mut completed = None;
392 let mut failed = None;
393 let mut confirmed_amount = None;
394 let mut completed_data = None;
395
396 {
397 let mut pending = pending_requests.lock().await;
398 let Some(request) = pending.get_mut(&hash_hex) else {
399 return;
400 };
401 let Some(quoted) = request.quoted.as_mut() else {
402 return;
403 };
404 let Some(in_flight) = quoted.in_flight_payment.take() else {
405 return;
406 };
407 if ack.q != quoted.quote_id || ack.c != in_flight.chunk_index {
408 quoted.in_flight_payment = Some(in_flight);
409 return;
410 }
411
412 if !ack.a {
413 failed = Some(in_flight);
414 } else {
415 quoted.confirmed_payment_sat = quoted
416 .confirmed_payment_sat
417 .saturating_add(in_flight.amount_sat);
418 confirmed_amount = Some(in_flight.amount_sat);
419 if in_flight.final_chunk {
420 completed_data = Some(quoted.assembled_data.clone());
421 } else if let Some(next_chunk) = quoted.buffered_chunk.take() {
422 buffered_next = Some(next_chunk);
423 }
424 }
425
426 if let Some(data) = completed_data.take() {
427 let finished = pending
428 .remove(&hash_hex)
429 .expect("pending request must exist");
430 completed = Some((finished.response_tx, data));
431 }
432 }
433
434 if let Some(amount_sat) = confirmed_amount {
435 cashu_quotes
436 .record_paid_peer(&peer_id.to_string(), amount_sat)
437 .await;
438 }
439
440 if let Some(in_flight) = failed {
441 warn!(
442 "[Peer {}] Payment ack rejected chunk {} for {}: {}",
443 peer_short,
444 ack.c,
445 hash_hex,
446 ack.e.as_deref().unwrap_or("payment rejected")
447 );
448 let _ = cashu_quotes
449 .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
450 .await;
451 let removed = pending_requests.lock().await.remove(&hash_hex);
452 if let Some(removed) = removed {
453 let _ = removed.response_tx.send(None);
454 }
455 return;
456 }
457
458 if let Some((tx, data)) = completed {
459 let _ = tx.send(Some(data));
460 return;
461 }
462
463 if let Some(next_chunk) = buffered_next {
464 process_chunk_message(
465 peer_short,
466 peer_id,
467 dc,
468 pending_requests,
469 Some(cashu_quotes),
470 next_chunk,
471 )
472 .await;
473 }
474}
475
476async fn handle_payment_message(
477 peer_id: &PeerId,
478 cashu_quotes: Option<&Arc<CashuQuoteState>>,
479 req: &DataPayment,
480) -> PaymentHandlingOutcome {
481 let nack = |err: String| PaymentHandlingOutcome {
482 ack: DataPaymentAck {
483 h: req.h.clone(),
484 q: req.q,
485 c: req.c,
486 a: false,
487 e: Some(err),
488 },
489 next_chunk: None,
490 };
491
492 let Some(cashu_quotes) = cashu_quotes else {
493 return nack("Cashu settlement unavailable".to_string());
494 };
495
496 let expected = match cashu_quotes
497 .claim_expected_payment(
498 &peer_id.to_string(),
499 &req.h,
500 req.q,
501 req.c,
502 req.p,
503 req.m.as_deref(),
504 )
505 .await
506 {
507 Ok(expected) => expected,
508 Err(err) => {
509 cashu_quotes
510 .record_payment_default_from_peer(&peer_id.to_string())
511 .await;
512 return nack(err.to_string());
513 }
514 };
515
516 match cashu_quotes.receive_payment_token(&req.tok).await {
517 Ok(received) if received.amount_sat >= expected.payment_sat => {
518 if expected.mint_url.as_deref() != Some(received.mint_url.as_str()) {
519 cashu_quotes
520 .record_payment_default_from_peer(&peer_id.to_string())
521 .await;
522 return nack("Received payment mint did not match quoted mint".to_string());
523 }
524 if let Err(err) = cashu_quotes
525 .record_receipt_from_peer(
526 &peer_id.to_string(),
527 &received.mint_url,
528 received.amount_sat,
529 )
530 .await
531 {
532 warn!(
533 "[Peer {}] Failed to persist Cashu mint success for {}: {}",
534 peer_id.short(),
535 received.mint_url,
536 err
537 );
538 }
539
540 let next_chunk = if expected.final_chunk {
541 None
542 } else {
543 cashu_quotes
544 .next_outgoing_chunk(&peer_id.to_string(), &req.h, req.q)
545 .await
546 };
547
548 PaymentHandlingOutcome {
549 ack: DataPaymentAck {
550 h: req.h.clone(),
551 q: req.q,
552 c: req.c,
553 a: true,
554 e: None,
555 },
556 next_chunk,
557 }
558 }
559 Ok(_) => {
560 cashu_quotes
561 .record_payment_default_from_peer(&peer_id.to_string())
562 .await;
563 nack("Received payment amount was below the quoted amount".to_string())
564 }
565 Err(err) => {
566 if let Some(mint_url) = expected.mint_url.as_deref().or(req.m.as_deref()) {
567 let _ = cashu_quotes.record_mint_receive_failure(mint_url).await;
568 }
569 nack(err.to_string())
570 }
571 }
572}
573
574pub struct Peer {
576 pub peer_id: PeerId,
577 pub direction: PeerDirection,
578 pub created_at: std::time::Instant,
579 pub connected_at: Option<std::time::Instant>,
580
581 pc: Arc<RTCPeerConnection>,
582 pub data_channel: Arc<Mutex<Option<Arc<RTCDataChannel>>>>,
584 signaling_tx: mpsc::Sender<SignalingMessage>,
585 my_peer_id: PeerId,
586
587 store: Option<Arc<dyn ContentStore>>,
589
590 pub pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
592 pending_nostr_queries: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>>,
594
595 #[allow(dead_code)]
597 message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
598 #[allow(dead_code)]
599 message_rx: Option<mpsc::Receiver<(DataMessage, Option<Vec<u8>>)>>,
600
601 state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
603
604 nostr_relay: Option<Arc<NostrRelay>>,
606 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
608 cashu_quotes: Option<Arc<CashuQuoteState>>,
610 htl_config: PeerHTLConfig,
612}
613
614impl Peer {
615 pub async fn new(
617 peer_id: PeerId,
618 direction: PeerDirection,
619 my_peer_id: PeerId,
620 signaling_tx: mpsc::Sender<SignalingMessage>,
621 stun_servers: Vec<String>,
622 ) -> Result<Self> {
623 Self::new_with_store_and_events(
624 peer_id,
625 direction,
626 my_peer_id,
627 signaling_tx,
628 stun_servers,
629 None,
630 None,
631 None,
632 None,
633 None,
634 )
635 .await
636 }
637
638 pub async fn new_with_store(
640 peer_id: PeerId,
641 direction: PeerDirection,
642 my_peer_id: PeerId,
643 signaling_tx: mpsc::Sender<SignalingMessage>,
644 stun_servers: Vec<String>,
645 store: Option<Arc<dyn ContentStore>>,
646 ) -> Result<Self> {
647 Self::new_with_store_and_events(
648 peer_id,
649 direction,
650 my_peer_id,
651 signaling_tx,
652 stun_servers,
653 store,
654 None,
655 None,
656 None,
657 None,
658 )
659 .await
660 }
661
662 #[allow(clippy::too_many_arguments)]
664 pub(crate) async fn new_with_store_and_events(
665 peer_id: PeerId,
666 direction: PeerDirection,
667 my_peer_id: PeerId,
668 signaling_tx: mpsc::Sender<SignalingMessage>,
669 stun_servers: Vec<String>,
670 store: Option<Arc<dyn ContentStore>>,
671 state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
672 nostr_relay: Option<Arc<NostrRelay>>,
673 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
674 cashu_quotes: Option<Arc<CashuQuoteState>>,
675 ) -> Result<Self> {
676 let mut m = MediaEngine::default();
678 m.register_default_codecs()?;
679
680 let mut registry = Registry::new();
681 registry = register_default_interceptors(registry, &mut m)?;
682
683 let setting_engine = SettingEngine::default();
686 let api = APIBuilder::new()
689 .with_media_engine(m)
690 .with_interceptor_registry(registry)
691 .with_setting_engine(setting_engine)
692 .build();
693
694 let ice_servers: Vec<RTCIceServer> = stun_servers
696 .iter()
697 .map(|url| RTCIceServer {
698 urls: vec![url.clone()],
699 ..Default::default()
700 })
701 .collect();
702
703 let config = RTCConfiguration {
704 ice_servers,
705 ..Default::default()
706 };
707
708 let pc = Arc::new(api.new_peer_connection(config).await?);
709 let (message_tx, message_rx) = mpsc::channel(100);
710 Ok(Self {
711 peer_id,
712 direction,
713 created_at: std::time::Instant::now(),
714 connected_at: None,
715 pc,
716 data_channel: Arc::new(Mutex::new(None)),
717 signaling_tx,
718 my_peer_id,
719 store,
720 pending_requests: Arc::new(Mutex::new(HashMap::new())),
721 pending_nostr_queries: Arc::new(Mutex::new(HashMap::new())),
722 message_tx,
723 message_rx: Some(message_rx),
724 state_event_tx,
725 nostr_relay,
726 mesh_frame_tx,
727 cashu_quotes,
728 htl_config: PeerHTLConfig::random(),
729 })
730 }
731
732 pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
734 self.store = Some(store);
735 }
736
737 pub fn state(&self) -> RTCPeerConnectionState {
739 self.pc.connection_state()
740 }
741
742 pub fn signaling_state(&self) -> webrtc::peer_connection::signaling_state::RTCSignalingState {
744 self.pc.signaling_state()
745 }
746
747 pub fn is_connected(&self) -> bool {
749 self.pc.connection_state() == RTCPeerConnectionState::Connected
750 }
751
752 pub fn htl_config(&self) -> &PeerHTLConfig {
753 &self.htl_config
754 }
755
756 pub async fn setup_handlers(&self) -> Result<()> {
758 let peer_id = self.peer_id.clone();
759 let signaling_tx = self.signaling_tx.clone();
760 let my_peer_id_str = self.my_peer_id.to_string();
761 let target_peer_id = self.peer_id.to_string();
762
763 self.pc
765 .on_ice_candidate(Box::new(move |candidate: Option<RTCIceCandidate>| {
766 let signaling_tx = signaling_tx.clone();
767 let my_peer_id_str = my_peer_id_str.clone();
768 let target_peer_id = target_peer_id.clone();
769
770 Box::pin(async move {
771 if let Some(c) = candidate {
772 if let Ok(init) = c.to_json() {
773 info!(
774 "ICE candidate generated: {}",
775 &init.candidate[..init.candidate.len().min(60)]
776 );
777 let msg = SignalingMessage::Candidate {
778 peer_id: my_peer_id_str.clone(),
779 target_peer_id: target_peer_id.clone(),
780 candidate: init.candidate,
781 sdp_m_line_index: init.sdp_mline_index,
782 sdp_mid: init.sdp_mid,
783 };
784 if let Err(e) = signaling_tx.send(msg).await {
785 error!("Failed to send ICE candidate: {}", e);
786 }
787 }
788 }
789 })
790 }));
791
792 let peer_id_log = peer_id.clone();
794 let state_event_tx = self.state_event_tx.clone();
795 self.pc
796 .on_peer_connection_state_change(Box::new(move |state: RTCPeerConnectionState| {
797 let peer_id = peer_id_log.clone();
798 let state_event_tx = state_event_tx.clone();
799 Box::pin(async move {
800 info!("Peer {} connection state: {:?}", peer_id.short(), state);
801
802 if let Some(tx) = state_event_tx {
804 let event = match state {
805 RTCPeerConnectionState::Connected => {
806 Some(PeerStateEvent::Connected(peer_id))
807 }
808 RTCPeerConnectionState::Failed => Some(PeerStateEvent::Failed(peer_id)),
809 RTCPeerConnectionState::Disconnected
810 | RTCPeerConnectionState::Closed => {
811 Some(PeerStateEvent::Disconnected(peer_id))
812 }
813 _ => None,
814 };
815 if let Some(event) = event {
816 if let Err(e) = tx.send(event).await {
817 error!("Failed to send peer state event: {}", e);
818 }
819 }
820 }
821 })
822 }));
823
824 Ok(())
825 }
826
827 pub async fn connect(&self) -> Result<serde_json::Value> {
829 println!("[Peer {}] Creating data channel...", self.peer_id.short());
830 let dc_init = RTCDataChannelInit {
833 ordered: Some(false),
834 ..Default::default()
835 };
836 let dc = self
837 .pc
838 .create_data_channel("hashtree", Some(dc_init))
839 .await?;
840 println!(
841 "[Peer {}] Data channel created, setting up handlers...",
842 self.peer_id.short()
843 );
844 self.setup_data_channel(dc.clone()).await?;
845 println!(
846 "[Peer {}] Handlers set up, storing data channel...",
847 self.peer_id.short()
848 );
849 {
850 let mut dc_guard = self.data_channel.lock().await;
851 *dc_guard = Some(dc);
852 }
853 println!("[Peer {}] Data channel stored", self.peer_id.short());
854
855 let offer = self.pc.create_offer(None).await?;
858 let mut gathering_complete = self.pc.gathering_complete_promise().await;
859 self.pc.set_local_description(offer).await?;
860
861 let _ = tokio::time::timeout(
863 std::time::Duration::from_secs(10),
864 gathering_complete.recv(),
865 )
866 .await;
867
868 let local_desc = self
870 .pc
871 .local_description()
872 .await
873 .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
874
875 debug!(
876 "Offer created, SDP len: {}, ice_gathering: {:?}",
877 local_desc.sdp.len(),
878 self.pc.ice_gathering_state()
879 );
880
881 let offer_json = serde_json::json!({
883 "type": local_desc.sdp_type.to_string().to_lowercase(),
884 "sdp": local_desc.sdp
885 });
886
887 Ok(offer_json)
888 }
889
890 pub async fn handle_offer(&self, offer: serde_json::Value) -> Result<serde_json::Value> {
892 let sdp = offer
893 .get("sdp")
894 .and_then(|s| s.as_str())
895 .ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
896
897 let peer_id = self.peer_id.clone();
900 let message_tx = self.message_tx.clone();
901 let pending_requests = self.pending_requests.clone();
902 let pending_nostr_queries = self.pending_nostr_queries.clone();
903 let store = self.store.clone();
904 let data_channel_holder = self.data_channel.clone();
905 let nostr_relay = self.nostr_relay.clone();
906 let mesh_frame_tx = self.mesh_frame_tx.clone();
907 let cashu_quotes = self.cashu_quotes.clone();
908 let peer_pubkey = Some(self.peer_id.pubkey.clone());
909
910 self.pc
911 .on_data_channel(Box::new(move |dc: Arc<RTCDataChannel>| {
912 let peer_id = peer_id.clone();
913 let message_tx = message_tx.clone();
914 let pending_requests = pending_requests.clone();
915 let pending_nostr_queries = pending_nostr_queries.clone();
916 let store = store.clone();
917 let data_channel_holder = data_channel_holder.clone();
918 let nostr_relay = nostr_relay.clone();
919 let mesh_frame_tx = mesh_frame_tx.clone();
920 let cashu_quotes = cashu_quotes.clone();
921 let peer_pubkey = peer_pubkey.clone();
922
923 Box::pin(async move {
925 info!(
926 "Peer {} received data channel: {}",
927 peer_id.short(),
928 dc.label()
929 );
930
931 {
933 let mut dc_guard = data_channel_holder.lock().await;
934 *dc_guard = Some(dc.clone());
935 }
936
937 Self::setup_dc_handlers(
939 dc.clone(),
940 peer_id,
941 message_tx,
942 pending_requests,
943 pending_nostr_queries.clone(),
944 store,
945 nostr_relay,
946 mesh_frame_tx,
947 cashu_quotes,
948 peer_pubkey,
949 )
950 .await;
951 })
952 }));
953
954 let offer_desc = RTCSessionDescription::offer(sdp.to_string())?;
956 self.pc.set_remote_description(offer_desc).await?;
957
958 let answer = self.pc.create_answer(None).await?;
961 let mut gathering_complete = self.pc.gathering_complete_promise().await;
962 self.pc.set_local_description(answer).await?;
963
964 let _ = tokio::time::timeout(
966 std::time::Duration::from_secs(10),
967 gathering_complete.recv(),
968 )
969 .await;
970
971 let local_desc = self
973 .pc
974 .local_description()
975 .await
976 .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
977
978 debug!(
979 "Answer created, SDP len: {}, ice_gathering: {:?}",
980 local_desc.sdp.len(),
981 self.pc.ice_gathering_state()
982 );
983
984 let answer_json = serde_json::json!({
985 "type": local_desc.sdp_type.to_string().to_lowercase(),
986 "sdp": local_desc.sdp
987 });
988
989 Ok(answer_json)
990 }
991
992 pub async fn handle_answer(&self, answer: serde_json::Value) -> Result<()> {
994 let sdp = answer
995 .get("sdp")
996 .and_then(|s| s.as_str())
997 .ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
998
999 let answer_desc = RTCSessionDescription::answer(sdp.to_string())?;
1000 self.pc.set_remote_description(answer_desc).await?;
1001
1002 Ok(())
1003 }
1004
1005 pub async fn handle_candidate(&self, candidate: serde_json::Value) -> Result<()> {
1007 let candidate_str = candidate
1008 .get("candidate")
1009 .and_then(|c| c.as_str())
1010 .unwrap_or("");
1011
1012 let sdp_mid = candidate
1013 .get("sdpMid")
1014 .and_then(|m| m.as_str())
1015 .map(|s| s.to_string());
1016
1017 let sdp_mline_index = candidate
1018 .get("sdpMLineIndex")
1019 .and_then(|i| i.as_u64())
1020 .map(|i| i as u16);
1021
1022 if !candidate_str.is_empty() {
1023 use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
1024 let init = RTCIceCandidateInit {
1025 candidate: candidate_str.to_string(),
1026 sdp_mid,
1027 sdp_mline_index,
1028 username_fragment: candidate
1029 .get("usernameFragment")
1030 .and_then(|u| u.as_str())
1031 .map(|s| s.to_string()),
1032 };
1033 self.pc.add_ice_candidate(init).await?;
1034 }
1035
1036 Ok(())
1037 }
1038
1039 async fn setup_data_channel(&self, dc: Arc<RTCDataChannel>) -> Result<()> {
1041 let peer_id = self.peer_id.clone();
1042 let message_tx = self.message_tx.clone();
1043 let pending_requests = self.pending_requests.clone();
1044 let store = self.store.clone();
1045 let nostr_relay = self.nostr_relay.clone();
1046 let mesh_frame_tx = self.mesh_frame_tx.clone();
1047 let cashu_quotes = self.cashu_quotes.clone();
1048 let peer_pubkey = Some(self.peer_id.pubkey.clone());
1049
1050 Self::setup_dc_handlers(
1051 dc,
1052 peer_id,
1053 message_tx,
1054 pending_requests,
1055 self.pending_nostr_queries.clone(),
1056 store,
1057 nostr_relay,
1058 mesh_frame_tx,
1059 cashu_quotes,
1060 peer_pubkey,
1061 )
1062 .await;
1063 Ok(())
1064 }
1065
1066 #[allow(clippy::too_many_arguments)]
1068 async fn setup_dc_handlers(
1069 dc: Arc<RTCDataChannel>,
1070 peer_id: PeerId,
1071 message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
1072 pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
1073 pending_nostr_queries: Arc<
1074 Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>,
1075 >,
1076 store: Option<Arc<dyn ContentStore>>,
1077 nostr_relay: Option<Arc<NostrRelay>>,
1078 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
1079 cashu_quotes: Option<Arc<CashuQuoteState>>,
1080 peer_pubkey: Option<String>,
1081 ) {
1082 let label = dc.label().to_string();
1083 let peer_short = peer_id.short();
1084
1085 let _pending_binary: Arc<Mutex<Option<u32>>> = Arc::new(Mutex::new(None));
1087
1088 let open_notify = nostr_relay.as_ref().map(|_| Arc::new(Notify::new()));
1089 if let Some(ref notify) = open_notify {
1090 if dc.ready_state() == RTCDataChannelState::Open {
1091 notify.notify_one();
1093 }
1094 }
1095
1096 let mut nostr_client_id: Option<u64> = None;
1097 if let Some(relay) = nostr_relay.clone() {
1098 let client_id = relay.next_client_id();
1099 let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
1100 relay
1101 .register_client(client_id, nostr_tx, peer_pubkey.clone())
1102 .await;
1103 nostr_client_id = Some(client_id);
1104
1105 if let Some(notify) = open_notify.clone() {
1106 let dc_for_send = dc.clone();
1107 tokio::spawn(async move {
1108 notify.notified().await;
1109 while let Some(text) = nostr_rx.recv().await {
1110 if dc_for_send.send_text(text).await.is_err() {
1111 break;
1112 }
1113 }
1114 });
1115 }
1116 }
1117
1118 if let (Some(relay), Some(client_id)) = (nostr_relay.clone(), nostr_client_id) {
1119 dc.on_close(Box::new(move || {
1120 let relay = relay.clone();
1121 Box::pin(async move {
1122 relay.unregister_client(client_id).await;
1123 })
1124 }));
1125 }
1126
1127 let open_notify_clone = open_notify.clone();
1128 let peer_short_open = peer_short.clone();
1129 let label_clone = label.clone();
1130 dc.on_open(Box::new(move || {
1131 let peer_short_open = peer_short_open.clone();
1132 let label_clone = label_clone.clone();
1133 let open_notify = open_notify_clone.clone();
1134 Box::pin(async move {
1136 info!(
1137 "[Peer {}] Data channel '{}' open",
1138 peer_short_open, label_clone
1139 );
1140 if let Some(notify) = open_notify {
1141 notify.notify_one();
1142 }
1143 })
1144 }));
1145
1146 let dc_for_msg = dc.clone();
1147 let peer_short_msg = peer_short.clone();
1148 let _pending_binary_clone = _pending_binary.clone();
1149 let store_clone = store.clone();
1150 let nostr_relay_for_msg = nostr_relay.clone();
1151 let nostr_client_id_for_msg = nostr_client_id;
1152 let pending_nostr_queries_for_msg = pending_nostr_queries.clone();
1153 let mesh_frame_tx_for_msg = mesh_frame_tx.clone();
1154 let peer_id_for_msg = peer_id.clone();
1155
1156 dc.on_message(Box::new(move |msg: DataChannelMessage| {
1157 let dc = dc_for_msg.clone();
1158 let peer_short = peer_short_msg.clone();
1159 let pending_requests = pending_requests.clone();
1160 let _pending_binary = _pending_binary_clone.clone();
1161 let _message_tx = message_tx.clone();
1162 let store = store_clone.clone();
1163 let nostr_relay = nostr_relay_for_msg.clone();
1164 let nostr_client_id = nostr_client_id_for_msg;
1165 let pending_nostr_queries = pending_nostr_queries_for_msg.clone();
1166 let mesh_frame_tx = mesh_frame_tx_for_msg.clone();
1167 let cashu_quotes = cashu_quotes.clone();
1168 let peer_id = peer_id_for_msg.clone();
1169 let msg_data = msg.data.clone();
1170
1171 Box::pin(async move {
1173 if msg.is_string {
1174 if let Ok(text) = std::str::from_utf8(&msg_data) {
1175 if let Ok(mesh_frame) = serde_json::from_str::<MeshNostrFrame>(text) {
1176 match validate_mesh_frame(&mesh_frame) {
1177 Ok(()) => {
1178 if let Some(tx) = mesh_frame_tx {
1179 let _ = tx.send((peer_id.clone(), mesh_frame)).await;
1180 }
1181 return;
1182 }
1183 Err(reason) => {
1184 debug!(
1185 "[Peer {}] Ignoring invalid mesh frame: {}",
1186 peer_short, reason
1187 );
1188 }
1189 }
1190 }
1191
1192 if let Ok(relay_msg) = NostrRelayMessage::from_json(text) {
1194 if let Some(sub_id) = relay_subscription_id(&relay_msg) {
1195 let sender = {
1196 let pending = pending_nostr_queries.lock().await;
1197 pending.get(&sub_id).cloned()
1198 };
1199 if let Some(tx) = sender {
1200 debug!(
1201 "[Peer {}] Routed Nostr relay message for subscription {}",
1202 peer_short, sub_id
1203 );
1204 let _ = tx.send(relay_msg);
1205 return;
1206 } else {
1207 debug!(
1208 "[Peer {}] Dropping Nostr relay message for unknown subscription {}",
1209 peer_short, sub_id
1210 );
1211 }
1212 }
1213 }
1214
1215 if let Some(relay) = nostr_relay {
1217 if let Ok(nostr_msg) = NostrClientMessage::from_json(text) {
1218 if let Some(client_id) = nostr_client_id {
1219 relay.handle_client_message(client_id, nostr_msg).await;
1220 }
1221 }
1222 }
1223 }
1224 return;
1225 }
1226 debug!(
1228 "[Peer {}] Received {} bytes on data channel",
1229 peer_short,
1230 msg_data.len()
1231 );
1232 match parse_message(&msg_data) {
1233 Ok(data_msg) => match data_msg {
1234 DataMessage::Request(req) => {
1235 let hash_hex = hash_to_hex(&req.h);
1236 let hash_short = &hash_hex[..8.min(hash_hex.len())];
1237 info!("[Peer {}] Received request for {}", peer_short, hash_short);
1238
1239 if let Some(cashu_quotes) = cashu_quotes.as_ref() {
1240 if cashu_quotes
1241 .should_refuse_requests_from_peer(&peer_id.to_string())
1242 .await
1243 {
1244 info!(
1245 "[Peer {}] Refusing request from peer with unpaid defaults",
1246 peer_short
1247 );
1248 return;
1249 }
1250 }
1251
1252 let quoted_settlement = if let Some(quote_id) = req.q {
1253 let Some(cashu_quotes) = cashu_quotes.as_ref() else {
1254 info!(
1255 "[Peer {}] Ignoring quoted request without Cashu settlement state",
1256 peer_short
1257 );
1258 return;
1259 };
1260 match cashu_quotes
1261 .take_valid_quote(&peer_id.to_string(), &req.h, quote_id)
1262 .await
1263 {
1264 Some(settlement) => Some((quote_id, settlement)),
1265 None => {
1266 info!(
1267 "[Peer {}] Ignoring request with invalid or expired quote {}",
1268 peer_short, quote_id
1269 );
1270 return;
1271 }
1272 }
1273 } else {
1274 None
1275 };
1276
1277 let data = if let Some(ref store) = store {
1279 match store.get(&hash_hex) {
1280 Ok(Some(data)) => {
1281 info!(
1282 "[Peer {}] Found {} in store ({} bytes)",
1283 peer_short,
1284 hash_short,
1285 data.len()
1286 );
1287 Some(data)
1288 }
1289 Ok(None) => {
1290 info!(
1291 "[Peer {}] Hash {} not in store",
1292 peer_short, hash_short
1293 );
1294 None
1295 }
1296 Err(e) => {
1297 warn!("[Peer {}] Store error: {}", peer_short, e);
1298 None
1299 }
1300 }
1301 } else {
1302 warn!(
1303 "[Peer {}] No store configured - cannot serve requests",
1304 peer_short
1305 );
1306 None
1307 };
1308
1309 if let Some(data) = data {
1311 let data_len = data.len();
1312 if let (Some(cashu_quotes), Some((quote_id, settlement))) =
1313 (cashu_quotes.as_ref(), quoted_settlement)
1314 {
1315 match cashu_quotes
1316 .prepare_quoted_transfer(
1317 &peer_id.to_string(),
1318 &req.h,
1319 quote_id,
1320 &settlement,
1321 data,
1322 )
1323 .await
1324 {
1325 Some((first_chunk, first_expected)) => {
1326 if send_quoted_chunk(
1327 &dc,
1328 &peer_id,
1329 &peer_short,
1330 cashu_quotes,
1331 first_chunk,
1332 first_expected,
1333 )
1334 .await
1335 {
1336 info!(
1337 "[Peer {}] Started quoted chunked response for {} ({} bytes)",
1338 peer_short, hash_short, data_len
1339 );
1340 }
1341 }
1342 None => {
1343 warn!(
1344 "[Peer {}] Failed to prepare quoted transfer for {}",
1345 peer_short, hash_short
1346 );
1347 }
1348 }
1349 } else {
1350 let response = DataResponse { h: req.h, d: data };
1351 if let Ok(wire) = encode_response(&response) {
1352 if let Err(e) = dc.send(&Bytes::from(wire)).await {
1353 error!(
1354 "[Peer {}] Failed to send response: {}",
1355 peer_short, e
1356 );
1357 } else {
1358 info!(
1359 "[Peer {}] Sent response for {} ({} bytes)",
1360 peer_short, hash_short, data_len
1361 );
1362 }
1363 }
1364 }
1365 } else {
1366 info!("[Peer {}] Content not found for {}", peer_short, hash_short);
1367 }
1368 }
1369 DataMessage::Response(res) => {
1370 let hash_hex = hash_to_hex(&res.h);
1371 let hash_short = &hash_hex[..8.min(hash_hex.len())];
1372 debug!(
1373 "[Peer {}] Received response for {} ({} bytes)",
1374 peer_short,
1375 hash_short,
1376 res.d.len()
1377 );
1378
1379 let mut pending = pending_requests.lock().await;
1381 if let Some(req) = pending.remove(&hash_hex) {
1382 let _ = req.response_tx.send(Some(res.d));
1383 }
1384 }
1385 DataMessage::QuoteRequest(req) => {
1386 let response = handle_quote_request_message(
1387 &peer_short,
1388 &peer_id,
1389 &store,
1390 cashu_quotes.as_ref(),
1391 &req,
1392 )
1393 .await;
1394 if let Some(response) = response {
1395 if let Ok(wire) = encode_quote_response(&response) {
1396 if let Err(e) = dc.send(&Bytes::from(wire)).await {
1397 warn!(
1398 "[Peer {}] Failed to send quote response: {}",
1399 peer_short, e
1400 );
1401 }
1402 }
1403 }
1404 }
1405 DataMessage::QuoteResponse(res) => {
1406 if let Some(cashu_quotes) = cashu_quotes.as_ref() {
1407 let _ = cashu_quotes
1408 .handle_quote_response(&peer_id.to_string(), res)
1409 .await;
1410 }
1411 }
1412 DataMessage::Chunk(chunk) => {
1413 process_chunk_message(
1414 &peer_short,
1415 &peer_id,
1416 &dc,
1417 &pending_requests,
1418 cashu_quotes.as_ref(),
1419 chunk,
1420 )
1421 .await;
1422 }
1423 DataMessage::Payment(req) => {
1424 let outcome =
1425 handle_payment_message(&peer_id, cashu_quotes.as_ref(), &req).await;
1426 if let Ok(wire) = encode_payment_ack(&outcome.ack) {
1427 if let Err(e) = dc.send(&Bytes::from(wire)).await {
1428 warn!(
1429 "[Peer {}] Failed to send payment ack: {}",
1430 peer_short, e
1431 );
1432 }
1433 }
1434 if let (Some(cashu_quotes), Some((next_chunk, next_expected))) =
1435 (cashu_quotes.as_ref(), outcome.next_chunk)
1436 {
1437 let _ = send_quoted_chunk(
1438 &dc,
1439 &peer_id,
1440 &peer_short,
1441 cashu_quotes,
1442 next_chunk,
1443 next_expected,
1444 )
1445 .await;
1446 }
1447 }
1448 DataMessage::PaymentAck(res) => {
1449 handle_payment_ack_message(
1450 &peer_short,
1451 &peer_id,
1452 &dc,
1453 &pending_requests,
1454 cashu_quotes.as_ref(),
1455 res,
1456 )
1457 .await;
1458 }
1459 },
1460 Err(e) => {
1461 warn!("[Peer {}] Failed to parse message: {:?}", peer_short, e);
1462 let hex_dump: String = msg_data
1464 .iter()
1465 .take(50)
1466 .map(|b| format!("{:02x}", b))
1467 .collect();
1468 warn!("[Peer {}] Message hex: {}", peer_short, hex_dump);
1469 }
1470 }
1471 })
1472 }));
1473 }
1474
1475 pub fn has_data_channel(&self) -> bool {
1477 self.data_channel
1479 .try_lock()
1480 .map(|guard| {
1481 guard
1482 .as_ref()
1483 .map(|dc| dc.ready_state() == RTCDataChannelState::Open)
1484 .unwrap_or(false)
1485 })
1486 .unwrap_or(false)
1487 }
1488
1489 pub async fn request(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
1491 self.request_with_timeout(hash_hex, std::time::Duration::from_secs(10))
1492 .await
1493 }
1494
1495 pub async fn request_with_timeout(
1497 &self,
1498 hash_hex: &str,
1499 timeout: std::time::Duration,
1500 ) -> Result<Option<Vec<u8>>> {
1501 let dc_guard = self.data_channel.lock().await;
1502 let dc = dc_guard
1503 .as_ref()
1504 .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1505 .clone();
1506 drop(dc_guard); let hash = hex::decode(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hex hash: {}", e))?;
1510
1511 let (tx, rx) = oneshot::channel();
1513
1514 {
1516 let mut pending = self.pending_requests.lock().await;
1517 pending.insert(
1518 hash_hex.to_string(),
1519 PendingRequest::standard(hash.clone(), tx),
1520 );
1521 }
1522
1523 let req = DataRequest {
1525 h: hash,
1526 htl: BLOB_REQUEST_POLICY.max_htl,
1527 q: None,
1528 };
1529 let wire = encode_request(&req)?;
1530 dc.send(&Bytes::from(wire)).await?;
1531
1532 debug!(
1533 "[Peer {}] Sent request for {}",
1534 self.peer_id.short(),
1535 &hash_hex[..8.min(hash_hex.len())]
1536 );
1537
1538 match tokio::time::timeout(timeout, rx).await {
1540 Ok(Ok(data)) => Ok(data),
1541 Ok(Err(_)) => {
1542 Ok(None)
1544 }
1545 Err(_) => {
1546 let mut pending = self.pending_requests.lock().await;
1548 pending.remove(hash_hex);
1549 Ok(None)
1550 }
1551 }
1552 }
1553
1554 pub async fn query_nostr_events(
1557 &self,
1558 filters: Vec<NostrFilter>,
1559 timeout: std::time::Duration,
1560 ) -> Result<Vec<nostr::Event>> {
1561 let dc_guard = self.data_channel.lock().await;
1562 let dc = dc_guard
1563 .as_ref()
1564 .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1565 .clone();
1566 drop(dc_guard);
1567
1568 let subscription_id = NostrSubscriptionId::generate();
1569 let subscription_key = subscription_id.to_string();
1570 let (tx, mut rx) = mpsc::unbounded_channel::<NostrRelayMessage>();
1571
1572 {
1573 let mut pending = self.pending_nostr_queries.lock().await;
1574 pending.insert(subscription_key.clone(), tx);
1575 }
1576
1577 let req = NostrClientMessage::req(subscription_id.clone(), filters);
1578 if let Err(e) = dc.send_text(req.as_json()).await {
1579 let mut pending = self.pending_nostr_queries.lock().await;
1580 pending.remove(&subscription_key);
1581 return Err(e.into());
1582 }
1583 debug!(
1584 "[Peer {}] Sent Nostr REQ subscription {}",
1585 self.peer_id.short(),
1586 subscription_id
1587 );
1588
1589 let mut events = Vec::new();
1590 let deadline = tokio::time::Instant::now() + timeout;
1591
1592 loop {
1593 let now = tokio::time::Instant::now();
1594 if now >= deadline {
1595 break;
1596 }
1597 let remaining = deadline - now;
1598
1599 let next = tokio::time::timeout(remaining, rx.recv()).await;
1600 match next {
1601 Ok(Some(NostrRelayMessage::Event {
1602 subscription_id: sid,
1603 event,
1604 })) if sid == subscription_id => {
1605 debug!(
1606 "[Peer {}] Received Nostr EVENT for subscription {}",
1607 self.peer_id.short(),
1608 subscription_id
1609 );
1610 events.push(*event);
1611 }
1612 Ok(Some(NostrRelayMessage::EndOfStoredEvents(sid))) if sid == subscription_id => {
1613 debug!(
1614 "[Peer {}] Received Nostr EOSE for subscription {}",
1615 self.peer_id.short(),
1616 subscription_id
1617 );
1618 break;
1619 }
1620 Ok(Some(NostrRelayMessage::Closed {
1621 subscription_id: sid,
1622 message,
1623 })) if sid == subscription_id => {
1624 warn!(
1625 "[Peer {}] Nostr query closed for subscription {}: {}",
1626 self.peer_id.short(),
1627 subscription_id,
1628 message
1629 );
1630 break;
1631 }
1632 Ok(Some(_)) => {}
1633 Ok(None) => break,
1634 Err(_) => {
1635 warn!(
1636 "[Peer {}] Nostr query timed out for subscription {}",
1637 self.peer_id.short(),
1638 subscription_id
1639 );
1640 break;
1641 }
1642 }
1643 }
1644
1645 let close = NostrClientMessage::close(subscription_id.clone());
1646 let _ = dc.send_text(close.as_json()).await;
1647
1648 let mut pending = self.pending_nostr_queries.lock().await;
1649 pending.remove(&subscription_key);
1650 debug!(
1651 "[Peer {}] Nostr query subscription {} collected {} event(s)",
1652 self.peer_id.short(),
1653 subscription_id,
1654 events.len()
1655 );
1656
1657 Ok(events)
1658 }
1659
1660 pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
1662 let dc_guard = self.data_channel.lock().await;
1663 let dc = dc_guard
1664 .as_ref()
1665 .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1666 .clone();
1667 drop(dc_guard);
1668
1669 let text = serde_json::to_string(frame)?;
1670 dc.send_text(text).await?;
1671 Ok(())
1672 }
1673
1674 pub async fn send_message(&self, msg: &DataMessage) -> Result<()> {
1676 let dc_guard = self.data_channel.lock().await;
1677 if let Some(ref dc) = *dc_guard {
1678 let wire = encode_message(msg)?;
1679 dc.send(&Bytes::from(wire)).await?;
1680 }
1681 Ok(())
1682 }
1683
1684 pub async fn close(&self) -> Result<()> {
1686 {
1687 let dc_guard = self.data_channel.lock().await;
1688 if let Some(ref dc) = *dc_guard {
1689 dc.close().await?;
1690 }
1691 }
1692 self.pc.close().await?;
1693 Ok(())
1694 }
1695}
1696
1697fn relay_subscription_id(msg: &NostrRelayMessage) -> Option<String> {
1698 match msg {
1699 NostrRelayMessage::Event {
1700 subscription_id, ..
1701 } => Some(subscription_id.to_string()),
1702 NostrRelayMessage::EndOfStoredEvents(subscription_id) => Some(subscription_id.to_string()),
1703 NostrRelayMessage::Closed {
1704 subscription_id, ..
1705 } => Some(subscription_id.to_string()),
1706 NostrRelayMessage::Count {
1707 subscription_id, ..
1708 } => Some(subscription_id.to_string()),
1709 _ => None,
1710 }
1711}