1use anyhow::Result;
4use bytes::Bytes;
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::{mpsc, oneshot, Mutex, Notify};
8use tracing::{debug, error, info, warn};
9use webrtc::api::interceptor_registry::register_default_interceptors;
10use webrtc::api::media_engine::MediaEngine;
11use webrtc::api::setting_engine::SettingEngine;
12use webrtc::api::APIBuilder;
13use webrtc::data_channel::data_channel_init::RTCDataChannelInit;
14use webrtc::data_channel::data_channel_message::DataChannelMessage;
15use webrtc::data_channel::data_channel_state::RTCDataChannelState;
16use webrtc::data_channel::RTCDataChannel;
17use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
18use webrtc::ice_transport::ice_server::RTCIceServer;
19use webrtc::interceptor::registry::Registry;
20use webrtc::peer_connection::configuration::RTCConfiguration;
21use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
22use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
23use webrtc::peer_connection::RTCPeerConnection;
24
25use super::cashu::{CashuQuoteState, ExpectedSettlement};
26use super::types::{
27 encode_chunk, encode_message, encode_payment, encode_payment_ack, encode_quote_response,
28 encode_request, encode_response, hash_to_hex, parse_message, validate_mesh_frame, DataChunk,
29 DataMessage, DataPayment, DataPaymentAck, DataQuoteRequest, DataRequest, DataResponse,
30 MeshNostrFrame, PeerDirection, PeerHTLConfig, PeerId, PeerStateEvent, SignalingMessage,
31 BLOB_REQUEST_POLICY,
32};
33use crate::nostr_relay::NostrRelay;
34use nostr::{
35 ClientMessage as NostrClientMessage, Filter as NostrFilter, JsonUtil as NostrJsonUtil,
36 RelayMessage as NostrRelayMessage, SubscriptionId as NostrSubscriptionId,
37};
38
39pub trait ContentStore: Send + Sync + 'static {
41 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>>;
43}
44
45pub struct PendingRequest {
47 pub hash: Vec<u8>,
48 pub response_tx: oneshot::Sender<Option<Vec<u8>>>,
49 pub quoted: Option<PendingQuotedRequest>,
50}
51
52pub struct PendingQuotedRequest {
53 pub quote_id: u64,
54 pub mint_url: String,
55 pub total_payment_sat: u64,
56 pub confirmed_payment_sat: u64,
57 pub next_chunk_index: u32,
58 pub total_chunks: Option<u32>,
59 pub assembled_data: Vec<u8>,
60 pub in_flight_payment: Option<PendingChunkPayment>,
61 pub buffered_chunk: Option<DataChunk>,
62}
63
64pub struct PendingChunkPayment {
65 pub chunk_index: u32,
66 pub amount_sat: u64,
67 pub mint_url: String,
68 pub operation_id: String,
69 pub final_chunk: bool,
70}
71
72impl PendingRequest {
73 pub fn standard(hash: Vec<u8>, response_tx: oneshot::Sender<Option<Vec<u8>>>) -> Self {
74 Self {
75 hash,
76 response_tx,
77 quoted: None,
78 }
79 }
80
81 pub fn quoted(
82 hash: Vec<u8>,
83 response_tx: oneshot::Sender<Option<Vec<u8>>>,
84 quote_id: u64,
85 mint_url: String,
86 total_payment_sat: u64,
87 ) -> Self {
88 Self {
89 hash,
90 response_tx,
91 quoted: Some(PendingQuotedRequest {
92 quote_id,
93 mint_url,
94 total_payment_sat,
95 confirmed_payment_sat: 0,
96 next_chunk_index: 0,
97 total_chunks: None,
98 assembled_data: Vec::new(),
99 in_flight_payment: None,
100 buffered_chunk: None,
101 }),
102 }
103 }
104}
105
106async fn handle_quote_request_message(
107 peer_short: &str,
108 peer_id: &PeerId,
109 store: &Option<Arc<dyn ContentStore>>,
110 cashu_quotes: Option<&Arc<CashuQuoteState>>,
111 req: &DataQuoteRequest,
112) -> Option<super::types::DataQuoteResponse> {
113 let Some(cashu_quotes) = cashu_quotes else {
114 debug!(
115 "[Peer {}] Ignoring quote request without Cashu policy",
116 peer_short
117 );
118 return None;
119 };
120
121 if cashu_quotes
122 .should_refuse_requests_from_peer(&peer_id.to_string())
123 .await
124 {
125 return Some(
126 cashu_quotes
127 .build_quote_response(&peer_id.to_string(), req, false)
128 .await,
129 );
130 }
131
132 let hash_hex = hash_to_hex(&req.h);
133 let can_serve = if let Some(store) = store {
134 match store.get(&hash_hex) {
135 Ok(Some(_)) => true,
136 Ok(None) => false,
137 Err(e) => {
138 warn!("[Peer {}] Store error during quote: {}", peer_short, e);
139 false
140 }
141 }
142 } else {
143 false
144 };
145
146 Some(
147 cashu_quotes
148 .build_quote_response(&peer_id.to_string(), req, can_serve)
149 .await,
150 )
151}
152
153struct PaymentHandlingOutcome {
154 ack: DataPaymentAck,
155 next_chunk: Option<(DataChunk, ExpectedSettlement)>,
156}
157
158async fn send_quoted_chunk(
159 dc: &Arc<RTCDataChannel>,
160 peer_id: &PeerId,
161 peer_short: &str,
162 cashu_quotes: &Arc<CashuQuoteState>,
163 chunk: DataChunk,
164 expected: ExpectedSettlement,
165) -> bool {
166 let hash_hex = hash_to_hex(&chunk.h);
167 let wire = match encode_chunk(&chunk) {
168 Ok(wire) => wire,
169 Err(err) => {
170 warn!(
171 "[Peer {}] Failed to encode quoted chunk {} for quote {}: {}",
172 peer_short, chunk.c, chunk.q, err
173 );
174 return false;
175 }
176 };
177
178 if let Err(err) = dc.send(&Bytes::from(wire)).await {
179 warn!(
180 "[Peer {}] Failed to send quoted chunk {} for quote {}: {}",
181 peer_short, chunk.c, chunk.q, err
182 );
183 return false;
184 }
185
186 cashu_quotes
187 .register_expected_payment(peer_id.to_string(), hash_hex, chunk.q, expected)
188 .await;
189 true
190}
191
192async fn fail_pending_request(
193 pending_requests: &Arc<Mutex<HashMap<String, PendingRequest>>>,
194 cashu_quotes: Option<&Arc<CashuQuoteState>>,
195 hash_hex: &str,
196) {
197 let pending = pending_requests.lock().await.remove(hash_hex);
198 let Some(pending) = pending else {
199 return;
200 };
201
202 if let (Some(cashu_quotes), Some(quoted)) = (cashu_quotes, pending.quoted) {
203 if let Some(in_flight) = quoted.in_flight_payment {
204 let _ = cashu_quotes
205 .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
206 .await;
207 }
208 }
209 let _ = pending.response_tx.send(None);
210}
211
212async fn process_chunk_message(
213 peer_short: &str,
214 _peer_id: &PeerId,
215 dc: &Arc<RTCDataChannel>,
216 pending_requests: &Arc<Mutex<HashMap<String, PendingRequest>>>,
217 cashu_quotes: Option<&Arc<CashuQuoteState>>,
218 chunk: DataChunk,
219) {
220 let hash_hex = hash_to_hex(&chunk.h);
221 let Some(cashu_quotes) = cashu_quotes else {
222 fail_pending_request(pending_requests, None, &hash_hex).await;
223 return;
224 };
225
226 enum ChunkAction {
227 BufferOnly,
228 Fail,
229 Pay {
230 mint_url: String,
231 amount_sat: u64,
232 final_chunk: bool,
233 },
234 }
235
236 let action = {
237 let mut pending = pending_requests.lock().await;
238 let Some(request) = pending.get_mut(&hash_hex) else {
239 return;
240 };
241 match request.quoted.as_mut() {
242 None => ChunkAction::Fail,
243 Some(quoted) if quoted.quote_id != chunk.q || chunk.n == 0 => ChunkAction::Fail,
244 Some(quoted) => {
245 if let Some(in_flight) = quoted.in_flight_payment.as_ref() {
246 let expected_buffer_index = in_flight.chunk_index + 1;
247 if chunk.c == expected_buffer_index && quoted.buffered_chunk.is_none() {
248 quoted.buffered_chunk = Some(chunk.clone());
249 ChunkAction::BufferOnly
250 } else {
251 ChunkAction::Fail
252 }
253 } else if chunk.c != quoted.next_chunk_index {
254 ChunkAction::Fail
255 } else if let Some(total_chunks) = quoted.total_chunks {
256 if total_chunks != chunk.n {
257 ChunkAction::Fail
258 } else {
259 let next_total = quoted.confirmed_payment_sat.saturating_add(chunk.p);
260 if next_total > quoted.total_payment_sat
261 || (chunk.c + 1 == chunk.n && next_total != quoted.total_payment_sat)
262 {
263 ChunkAction::Fail
264 } else {
265 quoted.total_chunks = Some(chunk.n);
266 quoted.assembled_data.extend_from_slice(&chunk.d);
267 quoted.next_chunk_index += 1;
268 ChunkAction::Pay {
269 mint_url: quoted.mint_url.clone(),
270 amount_sat: chunk.p,
271 final_chunk: chunk.c + 1 == chunk.n,
272 }
273 }
274 }
275 } else {
276 let next_total = quoted.confirmed_payment_sat.saturating_add(chunk.p);
277 if next_total > quoted.total_payment_sat
278 || (chunk.c + 1 == chunk.n && next_total != quoted.total_payment_sat)
279 {
280 ChunkAction::Fail
281 } else {
282 quoted.total_chunks = Some(chunk.n);
283 quoted.assembled_data.extend_from_slice(&chunk.d);
284 quoted.next_chunk_index += 1;
285 ChunkAction::Pay {
286 mint_url: quoted.mint_url.clone(),
287 amount_sat: chunk.p,
288 final_chunk: chunk.c + 1 == chunk.n,
289 }
290 }
291 }
292 }
293 }
294 };
295
296 match action {
297 ChunkAction::BufferOnly => (),
298 ChunkAction::Fail => {
299 warn!(
300 "[Peer {}] Invalid quoted chunk {} for hash {}",
301 peer_short, chunk.c, hash_hex
302 );
303 fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
304 }
305 ChunkAction::Pay {
306 mint_url,
307 amount_sat,
308 final_chunk,
309 } => {
310 let payment = match cashu_quotes
311 .create_payment_token(&mint_url, amount_sat)
312 .await
313 {
314 Ok(payment) => payment,
315 Err(err) => {
316 warn!(
317 "[Peer {}] Failed to create payment token for chunk {} of {}: {}",
318 peer_short, chunk.c, hash_hex, err
319 );
320 fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
321 return;
322 }
323 };
324
325 {
326 let mut pending = pending_requests.lock().await;
327 let Some(request) = pending.get_mut(&hash_hex) else {
328 let _ = cashu_quotes
329 .revoke_payment_token(&payment.mint_url, &payment.operation_id)
330 .await;
331 return;
332 };
333 let Some(quoted) = request.quoted.as_mut() else {
334 let _ = cashu_quotes
335 .revoke_payment_token(&payment.mint_url, &payment.operation_id)
336 .await;
337 return;
338 };
339 quoted.in_flight_payment = Some(PendingChunkPayment {
340 chunk_index: chunk.c,
341 amount_sat,
342 mint_url: payment.mint_url.clone(),
343 operation_id: payment.operation_id.clone(),
344 final_chunk,
345 });
346 }
347
348 let payment_msg = DataPayment {
349 h: chunk.h,
350 q: chunk.q,
351 c: chunk.c,
352 p: amount_sat,
353 m: Some(payment.mint_url.clone()),
354 tok: payment.token,
355 };
356 let wire = match encode_payment(&payment_msg) {
357 Ok(wire) => wire,
358 Err(err) => {
359 warn!(
360 "[Peer {}] Failed to encode payment for chunk {} of {}: {}",
361 peer_short, chunk.c, hash_hex, err
362 );
363 fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
364 return;
365 }
366 };
367 if let Err(err) = dc.send(&Bytes::from(wire)).await {
368 warn!(
369 "[Peer {}] Failed to send payment for chunk {} of {}: {}",
370 peer_short, chunk.c, hash_hex, err
371 );
372 fail_pending_request(pending_requests, Some(cashu_quotes), &hash_hex).await;
373 }
374 }
375 }
376}
377
378async fn handle_payment_ack_message(
379 peer_short: &str,
380 peer_id: &PeerId,
381 dc: &Arc<RTCDataChannel>,
382 pending_requests: &Arc<Mutex<HashMap<String, PendingRequest>>>,
383 cashu_quotes: Option<&Arc<CashuQuoteState>>,
384 ack: DataPaymentAck,
385) {
386 let Some(cashu_quotes) = cashu_quotes else {
387 return;
388 };
389 let hash_hex = hash_to_hex(&ack.h);
390 let mut buffered_next = None;
391 let mut completed = None;
392 let mut failed = None;
393 let mut confirmed_amount = None;
394 let mut completed_data = None;
395
396 {
397 let mut pending = pending_requests.lock().await;
398 let Some(request) = pending.get_mut(&hash_hex) else {
399 return;
400 };
401 let Some(quoted) = request.quoted.as_mut() else {
402 return;
403 };
404 let Some(in_flight) = quoted.in_flight_payment.take() else {
405 return;
406 };
407 if ack.q != quoted.quote_id || ack.c != in_flight.chunk_index {
408 quoted.in_flight_payment = Some(in_flight);
409 return;
410 }
411
412 if !ack.a {
413 failed = Some(in_flight);
414 } else {
415 quoted.confirmed_payment_sat = quoted
416 .confirmed_payment_sat
417 .saturating_add(in_flight.amount_sat);
418 confirmed_amount = Some(in_flight.amount_sat);
419 if in_flight.final_chunk {
420 completed_data = Some(quoted.assembled_data.clone());
421 } else if let Some(next_chunk) = quoted.buffered_chunk.take() {
422 buffered_next = Some(next_chunk);
423 }
424 }
425
426 if let Some(data) = completed_data.take() {
427 let finished = pending
428 .remove(&hash_hex)
429 .expect("pending request must exist");
430 completed = Some((finished.response_tx, data));
431 }
432 }
433
434 if let Some(amount_sat) = confirmed_amount {
435 cashu_quotes
436 .record_paid_peer(&peer_id.to_string(), amount_sat)
437 .await;
438 }
439
440 if let Some(in_flight) = failed {
441 warn!(
442 "[Peer {}] Payment ack rejected chunk {} for {}: {}",
443 peer_short,
444 ack.c,
445 hash_hex,
446 ack.e.as_deref().unwrap_or("payment rejected")
447 );
448 let _ = cashu_quotes
449 .revoke_payment_token(&in_flight.mint_url, &in_flight.operation_id)
450 .await;
451 let removed = pending_requests.lock().await.remove(&hash_hex);
452 if let Some(removed) = removed {
453 let _ = removed.response_tx.send(None);
454 }
455 return;
456 }
457
458 if let Some((tx, data)) = completed {
459 let _ = tx.send(Some(data));
460 return;
461 }
462
463 if let Some(next_chunk) = buffered_next {
464 process_chunk_message(
465 peer_short,
466 peer_id,
467 dc,
468 pending_requests,
469 Some(cashu_quotes),
470 next_chunk,
471 )
472 .await;
473 }
474}
475
476async fn handle_payment_message(
477 peer_id: &PeerId,
478 cashu_quotes: Option<&Arc<CashuQuoteState>>,
479 req: &DataPayment,
480) -> PaymentHandlingOutcome {
481 let nack = |err: String| PaymentHandlingOutcome {
482 ack: DataPaymentAck {
483 h: req.h.clone(),
484 q: req.q,
485 c: req.c,
486 a: false,
487 e: Some(err),
488 },
489 next_chunk: None,
490 };
491
492 let Some(cashu_quotes) = cashu_quotes else {
493 return nack("Cashu settlement unavailable".to_string());
494 };
495
496 let expected = match cashu_quotes
497 .claim_expected_payment(
498 &peer_id.to_string(),
499 &req.h,
500 req.q,
501 req.c,
502 req.p,
503 req.m.as_deref(),
504 )
505 .await
506 {
507 Ok(expected) => expected,
508 Err(err) => {
509 cashu_quotes
510 .record_payment_default_from_peer(&peer_id.to_string())
511 .await;
512 return nack(err.to_string());
513 }
514 };
515
516 match cashu_quotes.receive_payment_token(&req.tok).await {
517 Ok(received) if received.amount_sat >= expected.payment_sat => {
518 if expected.mint_url.as_deref() != Some(received.mint_url.as_str()) {
519 cashu_quotes
520 .record_payment_default_from_peer(&peer_id.to_string())
521 .await;
522 return nack("Received payment mint did not match quoted mint".to_string());
523 }
524 if let Err(err) = cashu_quotes
525 .record_receipt_from_peer(
526 &peer_id.to_string(),
527 &received.mint_url,
528 received.amount_sat,
529 )
530 .await
531 {
532 warn!(
533 "[Peer {}] Failed to persist Cashu mint success for {}: {}",
534 peer_id.short(),
535 received.mint_url,
536 err
537 );
538 }
539
540 let next_chunk = if expected.final_chunk {
541 None
542 } else {
543 cashu_quotes
544 .next_outgoing_chunk(&peer_id.to_string(), &req.h, req.q)
545 .await
546 };
547
548 PaymentHandlingOutcome {
549 ack: DataPaymentAck {
550 h: req.h.clone(),
551 q: req.q,
552 c: req.c,
553 a: true,
554 e: None,
555 },
556 next_chunk,
557 }
558 }
559 Ok(_) => {
560 cashu_quotes
561 .record_payment_default_from_peer(&peer_id.to_string())
562 .await;
563 nack("Received payment amount was below the quoted amount".to_string())
564 }
565 Err(err) => {
566 if let Some(mint_url) = expected.mint_url.as_deref().or(req.m.as_deref()) {
567 let _ = cashu_quotes.record_mint_receive_failure(mint_url).await;
568 }
569 nack(err.to_string())
570 }
571 }
572}
573
574pub struct Peer {
576 pub peer_id: PeerId,
577 pub direction: PeerDirection,
578 pub created_at: std::time::Instant,
579 pub connected_at: Option<std::time::Instant>,
580
581 pc: Arc<RTCPeerConnection>,
582 pub data_channel: Arc<Mutex<Option<Arc<RTCDataChannel>>>>,
584 signaling_tx: mpsc::Sender<SignalingMessage>,
585 my_peer_id: PeerId,
586
587 store: Option<Arc<dyn ContentStore>>,
589
590 pub pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
592 pending_nostr_queries: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>>,
594
595 #[allow(dead_code)]
597 message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
598 #[allow(dead_code)]
599 message_rx: Option<mpsc::Receiver<(DataMessage, Option<Vec<u8>>)>>,
600
601 state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
603
604 nostr_relay: Option<Arc<NostrRelay>>,
606 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
608 cashu_quotes: Option<Arc<CashuQuoteState>>,
610 htl_config: PeerHTLConfig,
612}
613
614impl Peer {
615 pub async fn new(
617 peer_id: PeerId,
618 direction: PeerDirection,
619 my_peer_id: PeerId,
620 signaling_tx: mpsc::Sender<SignalingMessage>,
621 stun_servers: Vec<String>,
622 ) -> Result<Self> {
623 Self::new_with_store_and_events(
624 peer_id,
625 direction,
626 my_peer_id,
627 signaling_tx,
628 stun_servers,
629 None,
630 None,
631 None,
632 None,
633 None,
634 )
635 .await
636 }
637
638 pub async fn new_with_store(
640 peer_id: PeerId,
641 direction: PeerDirection,
642 my_peer_id: PeerId,
643 signaling_tx: mpsc::Sender<SignalingMessage>,
644 stun_servers: Vec<String>,
645 store: Option<Arc<dyn ContentStore>>,
646 ) -> Result<Self> {
647 Self::new_with_store_and_events(
648 peer_id,
649 direction,
650 my_peer_id,
651 signaling_tx,
652 stun_servers,
653 store,
654 None,
655 None,
656 None,
657 None,
658 )
659 .await
660 }
661
662 #[allow(clippy::too_many_arguments)]
664 pub(crate) async fn new_with_store_and_events(
665 peer_id: PeerId,
666 direction: PeerDirection,
667 my_peer_id: PeerId,
668 signaling_tx: mpsc::Sender<SignalingMessage>,
669 stun_servers: Vec<String>,
670 store: Option<Arc<dyn ContentStore>>,
671 state_event_tx: Option<mpsc::Sender<PeerStateEvent>>,
672 nostr_relay: Option<Arc<NostrRelay>>,
673 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
674 cashu_quotes: Option<Arc<CashuQuoteState>>,
675 ) -> Result<Self> {
676 let mut m = MediaEngine::default();
678 m.register_default_codecs()?;
679
680 let mut registry = Registry::new();
681 registry = register_default_interceptors(registry, &mut m)?;
682
683 let setting_engine = SettingEngine::default();
686 let api = APIBuilder::new()
689 .with_media_engine(m)
690 .with_interceptor_registry(registry)
691 .with_setting_engine(setting_engine)
692 .build();
693
694 let ice_servers: Vec<RTCIceServer> = stun_servers
696 .iter()
697 .map(|url| RTCIceServer {
698 urls: vec![url.clone()],
699 ..Default::default()
700 })
701 .collect();
702
703 let config = RTCConfiguration {
704 ice_servers,
705 ..Default::default()
706 };
707
708 let pc = Arc::new(api.new_peer_connection(config).await?);
709 let (message_tx, message_rx) = mpsc::channel(100);
710 Ok(Self {
711 peer_id,
712 direction,
713 created_at: std::time::Instant::now(),
714 connected_at: None,
715 pc,
716 data_channel: Arc::new(Mutex::new(None)),
717 signaling_tx,
718 my_peer_id,
719 store,
720 pending_requests: Arc::new(Mutex::new(HashMap::new())),
721 pending_nostr_queries: Arc::new(Mutex::new(HashMap::new())),
722 message_tx,
723 message_rx: Some(message_rx),
724 state_event_tx,
725 nostr_relay,
726 mesh_frame_tx,
727 cashu_quotes,
728 htl_config: PeerHTLConfig::random(),
729 })
730 }
731
732 pub fn set_store(&mut self, store: Arc<dyn ContentStore>) {
734 self.store = Some(store);
735 }
736
737 pub fn state(&self) -> RTCPeerConnectionState {
739 self.pc.connection_state()
740 }
741
742 pub fn signaling_state(&self) -> webrtc::peer_connection::signaling_state::RTCSignalingState {
744 self.pc.signaling_state()
745 }
746
747 pub fn is_connected(&self) -> bool {
749 self.pc.connection_state() == RTCPeerConnectionState::Connected
750 }
751
752 pub fn htl_config(&self) -> &PeerHTLConfig {
753 &self.htl_config
754 }
755
756 pub async fn setup_handlers(&self) -> Result<()> {
758 let peer_id = self.peer_id.clone();
759 let signaling_tx = self.signaling_tx.clone();
760 let my_peer_id_str = self.my_peer_id.to_string();
761 let recipient = self.peer_id.to_string();
762
763 self.pc
765 .on_ice_candidate(Box::new(move |candidate: Option<RTCIceCandidate>| {
766 let signaling_tx = signaling_tx.clone();
767 let my_peer_id_str = my_peer_id_str.clone();
768 let recipient = recipient.clone();
769
770 Box::pin(async move {
771 if let Some(c) = candidate {
772 if let Ok(init) = c.to_json() {
773 info!(
774 "ICE candidate generated: {}",
775 &init.candidate[..init.candidate.len().min(60)]
776 );
777 let msg = SignalingMessage::candidate(
778 serde_json::to_value(&init).unwrap_or_default(),
779 &recipient,
780 &my_peer_id_str,
781 );
782 if let Err(e) = signaling_tx.send(msg).await {
783 error!("Failed to send ICE candidate: {}", e);
784 }
785 }
786 }
787 })
788 }));
789
790 let peer_id_log = peer_id.clone();
792 let state_event_tx = self.state_event_tx.clone();
793 self.pc
794 .on_peer_connection_state_change(Box::new(move |state: RTCPeerConnectionState| {
795 let peer_id = peer_id_log.clone();
796 let state_event_tx = state_event_tx.clone();
797 Box::pin(async move {
798 info!("Peer {} connection state: {:?}", peer_id.short(), state);
799
800 if let Some(tx) = state_event_tx {
802 let event = match state {
803 RTCPeerConnectionState::Connected => {
804 Some(PeerStateEvent::Connected(peer_id))
805 }
806 RTCPeerConnectionState::Failed => Some(PeerStateEvent::Failed(peer_id)),
807 RTCPeerConnectionState::Disconnected
808 | RTCPeerConnectionState::Closed => {
809 Some(PeerStateEvent::Disconnected(peer_id))
810 }
811 _ => None,
812 };
813 if let Some(event) = event {
814 if let Err(e) = tx.send(event).await {
815 error!("Failed to send peer state event: {}", e);
816 }
817 }
818 }
819 })
820 }));
821
822 Ok(())
823 }
824
825 pub async fn connect(&self) -> Result<serde_json::Value> {
827 println!("[Peer {}] Creating data channel...", self.peer_id.short());
828 let dc_init = RTCDataChannelInit {
831 ordered: Some(false),
832 ..Default::default()
833 };
834 let dc = self
835 .pc
836 .create_data_channel("hashtree", Some(dc_init))
837 .await?;
838 println!(
839 "[Peer {}] Data channel created, setting up handlers...",
840 self.peer_id.short()
841 );
842 self.setup_data_channel(dc.clone()).await?;
843 println!(
844 "[Peer {}] Handlers set up, storing data channel...",
845 self.peer_id.short()
846 );
847 {
848 let mut dc_guard = self.data_channel.lock().await;
849 *dc_guard = Some(dc);
850 }
851 println!("[Peer {}] Data channel stored", self.peer_id.short());
852
853 let offer = self.pc.create_offer(None).await?;
856 let mut gathering_complete = self.pc.gathering_complete_promise().await;
857 self.pc.set_local_description(offer).await?;
858
859 let _ = tokio::time::timeout(
861 std::time::Duration::from_secs(10),
862 gathering_complete.recv(),
863 )
864 .await;
865
866 let local_desc = self
868 .pc
869 .local_description()
870 .await
871 .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
872
873 debug!(
874 "Offer created, SDP len: {}, ice_gathering: {:?}",
875 local_desc.sdp.len(),
876 self.pc.ice_gathering_state()
877 );
878
879 let offer_json = serde_json::json!({
881 "type": local_desc.sdp_type.to_string().to_lowercase(),
882 "sdp": local_desc.sdp
883 });
884
885 Ok(offer_json)
886 }
887
888 pub async fn handle_offer(&self, offer: serde_json::Value) -> Result<serde_json::Value> {
890 let sdp = offer
891 .get("sdp")
892 .and_then(|s| s.as_str())
893 .ok_or_else(|| anyhow::anyhow!("Missing SDP in offer"))?;
894
895 let peer_id = self.peer_id.clone();
898 let message_tx = self.message_tx.clone();
899 let pending_requests = self.pending_requests.clone();
900 let pending_nostr_queries = self.pending_nostr_queries.clone();
901 let store = self.store.clone();
902 let data_channel_holder = self.data_channel.clone();
903 let nostr_relay = self.nostr_relay.clone();
904 let mesh_frame_tx = self.mesh_frame_tx.clone();
905 let cashu_quotes = self.cashu_quotes.clone();
906 let peer_pubkey = Some(self.peer_id.pubkey.clone());
907
908 self.pc
909 .on_data_channel(Box::new(move |dc: Arc<RTCDataChannel>| {
910 let peer_id = peer_id.clone();
911 let message_tx = message_tx.clone();
912 let pending_requests = pending_requests.clone();
913 let pending_nostr_queries = pending_nostr_queries.clone();
914 let store = store.clone();
915 let data_channel_holder = data_channel_holder.clone();
916 let nostr_relay = nostr_relay.clone();
917 let mesh_frame_tx = mesh_frame_tx.clone();
918 let cashu_quotes = cashu_quotes.clone();
919 let peer_pubkey = peer_pubkey.clone();
920
921 Box::pin(async move {
923 info!(
924 "Peer {} received data channel: {}",
925 peer_id.short(),
926 dc.label()
927 );
928
929 {
931 let mut dc_guard = data_channel_holder.lock().await;
932 *dc_guard = Some(dc.clone());
933 }
934
935 Self::setup_dc_handlers(
937 dc.clone(),
938 peer_id,
939 message_tx,
940 pending_requests,
941 pending_nostr_queries.clone(),
942 store,
943 nostr_relay,
944 mesh_frame_tx,
945 cashu_quotes,
946 peer_pubkey,
947 )
948 .await;
949 })
950 }));
951
952 let offer_desc = RTCSessionDescription::offer(sdp.to_string())?;
954 self.pc.set_remote_description(offer_desc).await?;
955
956 let answer = self.pc.create_answer(None).await?;
959 let mut gathering_complete = self.pc.gathering_complete_promise().await;
960 self.pc.set_local_description(answer).await?;
961
962 let _ = tokio::time::timeout(
964 std::time::Duration::from_secs(10),
965 gathering_complete.recv(),
966 )
967 .await;
968
969 let local_desc = self
971 .pc
972 .local_description()
973 .await
974 .ok_or_else(|| anyhow::anyhow!("No local description after gathering"))?;
975
976 debug!(
977 "Answer created, SDP len: {}, ice_gathering: {:?}",
978 local_desc.sdp.len(),
979 self.pc.ice_gathering_state()
980 );
981
982 let answer_json = serde_json::json!({
983 "type": local_desc.sdp_type.to_string().to_lowercase(),
984 "sdp": local_desc.sdp
985 });
986
987 Ok(answer_json)
988 }
989
990 pub async fn handle_answer(&self, answer: serde_json::Value) -> Result<()> {
992 let sdp = answer
993 .get("sdp")
994 .and_then(|s| s.as_str())
995 .ok_or_else(|| anyhow::anyhow!("Missing SDP in answer"))?;
996
997 let answer_desc = RTCSessionDescription::answer(sdp.to_string())?;
998 self.pc.set_remote_description(answer_desc).await?;
999
1000 Ok(())
1001 }
1002
1003 pub async fn handle_candidate(&self, candidate: serde_json::Value) -> Result<()> {
1005 let candidate_str = candidate
1006 .get("candidate")
1007 .and_then(|c| c.as_str())
1008 .unwrap_or("");
1009
1010 let sdp_mid = candidate
1011 .get("sdpMid")
1012 .and_then(|m| m.as_str())
1013 .map(|s| s.to_string());
1014
1015 let sdp_mline_index = candidate
1016 .get("sdpMLineIndex")
1017 .and_then(|i| i.as_u64())
1018 .map(|i| i as u16);
1019
1020 if !candidate_str.is_empty() {
1021 use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
1022 let init = RTCIceCandidateInit {
1023 candidate: candidate_str.to_string(),
1024 sdp_mid,
1025 sdp_mline_index,
1026 username_fragment: candidate
1027 .get("usernameFragment")
1028 .and_then(|u| u.as_str())
1029 .map(|s| s.to_string()),
1030 };
1031 self.pc.add_ice_candidate(init).await?;
1032 }
1033
1034 Ok(())
1035 }
1036
1037 async fn setup_data_channel(&self, dc: Arc<RTCDataChannel>) -> Result<()> {
1039 let peer_id = self.peer_id.clone();
1040 let message_tx = self.message_tx.clone();
1041 let pending_requests = self.pending_requests.clone();
1042 let store = self.store.clone();
1043 let nostr_relay = self.nostr_relay.clone();
1044 let mesh_frame_tx = self.mesh_frame_tx.clone();
1045 let cashu_quotes = self.cashu_quotes.clone();
1046 let peer_pubkey = Some(self.peer_id.pubkey.clone());
1047
1048 Self::setup_dc_handlers(
1049 dc,
1050 peer_id,
1051 message_tx,
1052 pending_requests,
1053 self.pending_nostr_queries.clone(),
1054 store,
1055 nostr_relay,
1056 mesh_frame_tx,
1057 cashu_quotes,
1058 peer_pubkey,
1059 )
1060 .await;
1061 Ok(())
1062 }
1063
1064 #[allow(clippy::too_many_arguments)]
1066 async fn setup_dc_handlers(
1067 dc: Arc<RTCDataChannel>,
1068 peer_id: PeerId,
1069 message_tx: mpsc::Sender<(DataMessage, Option<Vec<u8>>)>,
1070 pending_requests: Arc<Mutex<HashMap<String, PendingRequest>>>,
1071 pending_nostr_queries: Arc<
1072 Mutex<HashMap<String, mpsc::UnboundedSender<NostrRelayMessage>>>,
1073 >,
1074 store: Option<Arc<dyn ContentStore>>,
1075 nostr_relay: Option<Arc<NostrRelay>>,
1076 mesh_frame_tx: Option<mpsc::Sender<(PeerId, MeshNostrFrame)>>,
1077 cashu_quotes: Option<Arc<CashuQuoteState>>,
1078 peer_pubkey: Option<String>,
1079 ) {
1080 let label = dc.label().to_string();
1081 let peer_short = peer_id.short();
1082
1083 let _pending_binary: Arc<Mutex<Option<u32>>> = Arc::new(Mutex::new(None));
1085
1086 let open_notify = nostr_relay.as_ref().map(|_| Arc::new(Notify::new()));
1087 if let Some(ref notify) = open_notify {
1088 if dc.ready_state() == RTCDataChannelState::Open {
1089 notify.notify_one();
1091 }
1092 }
1093
1094 let mut nostr_client_id: Option<u64> = None;
1095 if let Some(relay) = nostr_relay.clone() {
1096 let client_id = relay.next_client_id();
1097 let (nostr_tx, mut nostr_rx) = mpsc::unbounded_channel::<String>();
1098 relay
1099 .register_client(client_id, nostr_tx, peer_pubkey.clone())
1100 .await;
1101 nostr_client_id = Some(client_id);
1102
1103 if let Some(notify) = open_notify.clone() {
1104 let dc_for_send = dc.clone();
1105 tokio::spawn(async move {
1106 notify.notified().await;
1107 while let Some(text) = nostr_rx.recv().await {
1108 if dc_for_send.send_text(text).await.is_err() {
1109 break;
1110 }
1111 }
1112 });
1113 }
1114 }
1115
1116 if let (Some(relay), Some(client_id)) = (nostr_relay.clone(), nostr_client_id) {
1117 dc.on_close(Box::new(move || {
1118 let relay = relay.clone();
1119 Box::pin(async move {
1120 relay.unregister_client(client_id).await;
1121 })
1122 }));
1123 }
1124
1125 let open_notify_clone = open_notify.clone();
1126 let peer_short_open = peer_short.clone();
1127 let label_clone = label.clone();
1128 dc.on_open(Box::new(move || {
1129 let peer_short_open = peer_short_open.clone();
1130 let label_clone = label_clone.clone();
1131 let open_notify = open_notify_clone.clone();
1132 Box::pin(async move {
1134 info!(
1135 "[Peer {}] Data channel '{}' open",
1136 peer_short_open, label_clone
1137 );
1138 if let Some(notify) = open_notify {
1139 notify.notify_one();
1140 }
1141 })
1142 }));
1143
1144 let dc_for_msg = dc.clone();
1145 let peer_short_msg = peer_short.clone();
1146 let _pending_binary_clone = _pending_binary.clone();
1147 let store_clone = store.clone();
1148 let nostr_relay_for_msg = nostr_relay.clone();
1149 let nostr_client_id_for_msg = nostr_client_id;
1150 let pending_nostr_queries_for_msg = pending_nostr_queries.clone();
1151 let mesh_frame_tx_for_msg = mesh_frame_tx.clone();
1152 let peer_id_for_msg = peer_id.clone();
1153
1154 dc.on_message(Box::new(move |msg: DataChannelMessage| {
1155 let dc = dc_for_msg.clone();
1156 let peer_short = peer_short_msg.clone();
1157 let pending_requests = pending_requests.clone();
1158 let _pending_binary = _pending_binary_clone.clone();
1159 let _message_tx = message_tx.clone();
1160 let store = store_clone.clone();
1161 let nostr_relay = nostr_relay_for_msg.clone();
1162 let nostr_client_id = nostr_client_id_for_msg;
1163 let pending_nostr_queries = pending_nostr_queries_for_msg.clone();
1164 let mesh_frame_tx = mesh_frame_tx_for_msg.clone();
1165 let cashu_quotes = cashu_quotes.clone();
1166 let peer_id = peer_id_for_msg.clone();
1167 let msg_data = msg.data.clone();
1168
1169 Box::pin(async move {
1171 if msg.is_string {
1172 if let Ok(text) = std::str::from_utf8(&msg_data) {
1173 if let Ok(mesh_frame) = serde_json::from_str::<MeshNostrFrame>(text) {
1174 match validate_mesh_frame(&mesh_frame) {
1175 Ok(()) => {
1176 if let Some(tx) = mesh_frame_tx {
1177 let _ = tx.send((peer_id.clone(), mesh_frame)).await;
1178 }
1179 return;
1180 }
1181 Err(reason) => {
1182 debug!(
1183 "[Peer {}] Ignoring invalid mesh frame: {}",
1184 peer_short, reason
1185 );
1186 }
1187 }
1188 }
1189
1190 if let Ok(relay_msg) = NostrRelayMessage::from_json(text) {
1192 if let Some(sub_id) = relay_subscription_id(&relay_msg) {
1193 let sender = {
1194 let pending = pending_nostr_queries.lock().await;
1195 pending.get(&sub_id).cloned()
1196 };
1197 if let Some(tx) = sender {
1198 debug!(
1199 "[Peer {}] Routed Nostr relay message for subscription {}",
1200 peer_short, sub_id
1201 );
1202 let _ = tx.send(relay_msg);
1203 return;
1204 } else {
1205 debug!(
1206 "[Peer {}] Dropping Nostr relay message for unknown subscription {}",
1207 peer_short, sub_id
1208 );
1209 }
1210 }
1211 }
1212
1213 if let Some(relay) = nostr_relay {
1215 if let Ok(nostr_msg) = NostrClientMessage::from_json(text) {
1216 if let Some(client_id) = nostr_client_id {
1217 relay.handle_client_message(client_id, nostr_msg).await;
1218 }
1219 }
1220 }
1221 }
1222 return;
1223 }
1224 debug!(
1226 "[Peer {}] Received {} bytes on data channel",
1227 peer_short,
1228 msg_data.len()
1229 );
1230 match parse_message(&msg_data) {
1231 Ok(data_msg) => match data_msg {
1232 DataMessage::Request(req) => {
1233 let hash_hex = hash_to_hex(&req.h);
1234 let hash_short = &hash_hex[..8.min(hash_hex.len())];
1235 info!("[Peer {}] Received request for {}", peer_short, hash_short);
1236
1237 if let Some(cashu_quotes) = cashu_quotes.as_ref() {
1238 if cashu_quotes
1239 .should_refuse_requests_from_peer(&peer_id.to_string())
1240 .await
1241 {
1242 info!(
1243 "[Peer {}] Refusing request from peer with unpaid defaults",
1244 peer_short
1245 );
1246 return;
1247 }
1248 }
1249
1250 let quoted_settlement = if let Some(quote_id) = req.q {
1251 let Some(cashu_quotes) = cashu_quotes.as_ref() else {
1252 info!(
1253 "[Peer {}] Ignoring quoted request without Cashu settlement state",
1254 peer_short
1255 );
1256 return;
1257 };
1258 match cashu_quotes
1259 .take_valid_quote(&peer_id.to_string(), &req.h, quote_id)
1260 .await
1261 {
1262 Some(settlement) => Some((quote_id, settlement)),
1263 None => {
1264 info!(
1265 "[Peer {}] Ignoring request with invalid or expired quote {}",
1266 peer_short, quote_id
1267 );
1268 return;
1269 }
1270 }
1271 } else {
1272 None
1273 };
1274
1275 let data = if let Some(ref store) = store {
1277 match store.get(&hash_hex) {
1278 Ok(Some(data)) => {
1279 info!(
1280 "[Peer {}] Found {} in store ({} bytes)",
1281 peer_short,
1282 hash_short,
1283 data.len()
1284 );
1285 Some(data)
1286 }
1287 Ok(None) => {
1288 info!(
1289 "[Peer {}] Hash {} not in store",
1290 peer_short, hash_short
1291 );
1292 None
1293 }
1294 Err(e) => {
1295 warn!("[Peer {}] Store error: {}", peer_short, e);
1296 None
1297 }
1298 }
1299 } else {
1300 warn!(
1301 "[Peer {}] No store configured - cannot serve requests",
1302 peer_short
1303 );
1304 None
1305 };
1306
1307 if let Some(data) = data {
1309 let data_len = data.len();
1310 if let (Some(cashu_quotes), Some((quote_id, settlement))) =
1311 (cashu_quotes.as_ref(), quoted_settlement)
1312 {
1313 match cashu_quotes
1314 .prepare_quoted_transfer(
1315 &peer_id.to_string(),
1316 &req.h,
1317 quote_id,
1318 &settlement,
1319 data,
1320 )
1321 .await
1322 {
1323 Some((first_chunk, first_expected)) => {
1324 if send_quoted_chunk(
1325 &dc,
1326 &peer_id,
1327 &peer_short,
1328 cashu_quotes,
1329 first_chunk,
1330 first_expected,
1331 )
1332 .await
1333 {
1334 info!(
1335 "[Peer {}] Started quoted chunked response for {} ({} bytes)",
1336 peer_short, hash_short, data_len
1337 );
1338 }
1339 }
1340 None => {
1341 warn!(
1342 "[Peer {}] Failed to prepare quoted transfer for {}",
1343 peer_short, hash_short
1344 );
1345 }
1346 }
1347 } else {
1348 let response = DataResponse { h: req.h, d: data };
1349 if let Ok(wire) = encode_response(&response) {
1350 if let Err(e) = dc.send(&Bytes::from(wire)).await {
1351 error!(
1352 "[Peer {}] Failed to send response: {}",
1353 peer_short, e
1354 );
1355 } else {
1356 info!(
1357 "[Peer {}] Sent response for {} ({} bytes)",
1358 peer_short, hash_short, data_len
1359 );
1360 }
1361 }
1362 }
1363 } else {
1364 info!("[Peer {}] Content not found for {}", peer_short, hash_short);
1365 }
1366 }
1367 DataMessage::Response(res) => {
1368 let hash_hex = hash_to_hex(&res.h);
1369 let hash_short = &hash_hex[..8.min(hash_hex.len())];
1370 debug!(
1371 "[Peer {}] Received response for {} ({} bytes)",
1372 peer_short,
1373 hash_short,
1374 res.d.len()
1375 );
1376
1377 let mut pending = pending_requests.lock().await;
1379 if let Some(req) = pending.remove(&hash_hex) {
1380 let _ = req.response_tx.send(Some(res.d));
1381 }
1382 }
1383 DataMessage::QuoteRequest(req) => {
1384 let response = handle_quote_request_message(
1385 &peer_short,
1386 &peer_id,
1387 &store,
1388 cashu_quotes.as_ref(),
1389 &req,
1390 )
1391 .await;
1392 if let Some(response) = response {
1393 if let Ok(wire) = encode_quote_response(&response) {
1394 if let Err(e) = dc.send(&Bytes::from(wire)).await {
1395 warn!(
1396 "[Peer {}] Failed to send quote response: {}",
1397 peer_short, e
1398 );
1399 }
1400 }
1401 }
1402 }
1403 DataMessage::QuoteResponse(res) => {
1404 if let Some(cashu_quotes) = cashu_quotes.as_ref() {
1405 let _ = cashu_quotes
1406 .handle_quote_response(&peer_id.to_string(), res)
1407 .await;
1408 }
1409 }
1410 DataMessage::Chunk(chunk) => {
1411 process_chunk_message(
1412 &peer_short,
1413 &peer_id,
1414 &dc,
1415 &pending_requests,
1416 cashu_quotes.as_ref(),
1417 chunk,
1418 )
1419 .await;
1420 }
1421 DataMessage::Payment(req) => {
1422 let outcome =
1423 handle_payment_message(&peer_id, cashu_quotes.as_ref(), &req).await;
1424 if let Ok(wire) = encode_payment_ack(&outcome.ack) {
1425 if let Err(e) = dc.send(&Bytes::from(wire)).await {
1426 warn!(
1427 "[Peer {}] Failed to send payment ack: {}",
1428 peer_short, e
1429 );
1430 }
1431 }
1432 if let (Some(cashu_quotes), Some((next_chunk, next_expected))) =
1433 (cashu_quotes.as_ref(), outcome.next_chunk)
1434 {
1435 let _ = send_quoted_chunk(
1436 &dc,
1437 &peer_id,
1438 &peer_short,
1439 cashu_quotes,
1440 next_chunk,
1441 next_expected,
1442 )
1443 .await;
1444 }
1445 }
1446 DataMessage::PaymentAck(res) => {
1447 handle_payment_ack_message(
1448 &peer_short,
1449 &peer_id,
1450 &dc,
1451 &pending_requests,
1452 cashu_quotes.as_ref(),
1453 res,
1454 )
1455 .await;
1456 }
1457 },
1458 Err(e) => {
1459 warn!("[Peer {}] Failed to parse message: {:?}", peer_short, e);
1460 let hex_dump: String = msg_data
1462 .iter()
1463 .take(50)
1464 .map(|b| format!("{:02x}", b))
1465 .collect();
1466 warn!("[Peer {}] Message hex: {}", peer_short, hex_dump);
1467 }
1468 }
1469 })
1470 }));
1471 }
1472
1473 pub fn has_data_channel(&self) -> bool {
1475 self.data_channel
1477 .try_lock()
1478 .map(|guard| {
1479 guard
1480 .as_ref()
1481 .map(|dc| dc.ready_state() == RTCDataChannelState::Open)
1482 .unwrap_or(false)
1483 })
1484 .unwrap_or(false)
1485 }
1486
1487 pub async fn request(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
1489 self.request_with_timeout(hash_hex, std::time::Duration::from_secs(10))
1490 .await
1491 }
1492
1493 pub async fn request_with_timeout(
1495 &self,
1496 hash_hex: &str,
1497 timeout: std::time::Duration,
1498 ) -> Result<Option<Vec<u8>>> {
1499 let dc_guard = self.data_channel.lock().await;
1500 let dc = dc_guard
1501 .as_ref()
1502 .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1503 .clone();
1504 drop(dc_guard); let hash = hex::decode(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hex hash: {}", e))?;
1508
1509 let (tx, rx) = oneshot::channel();
1511
1512 {
1514 let mut pending = self.pending_requests.lock().await;
1515 pending.insert(
1516 hash_hex.to_string(),
1517 PendingRequest::standard(hash.clone(), tx),
1518 );
1519 }
1520
1521 let req = DataRequest {
1523 h: hash,
1524 htl: BLOB_REQUEST_POLICY.max_htl,
1525 q: None,
1526 };
1527 let wire = encode_request(&req)?;
1528 dc.send(&Bytes::from(wire)).await?;
1529
1530 debug!(
1531 "[Peer {}] Sent request for {}",
1532 self.peer_id.short(),
1533 &hash_hex[..8.min(hash_hex.len())]
1534 );
1535
1536 match tokio::time::timeout(timeout, rx).await {
1538 Ok(Ok(data)) => Ok(data),
1539 Ok(Err(_)) => {
1540 Ok(None)
1542 }
1543 Err(_) => {
1544 let mut pending = self.pending_requests.lock().await;
1546 pending.remove(hash_hex);
1547 Ok(None)
1548 }
1549 }
1550 }
1551
1552 pub async fn query_nostr_events(
1555 &self,
1556 filters: Vec<NostrFilter>,
1557 timeout: std::time::Duration,
1558 ) -> Result<Vec<nostr::Event>> {
1559 let dc_guard = self.data_channel.lock().await;
1560 let dc = dc_guard
1561 .as_ref()
1562 .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1563 .clone();
1564 drop(dc_guard);
1565
1566 let subscription_id = NostrSubscriptionId::generate();
1567 let subscription_key = subscription_id.to_string();
1568 let (tx, mut rx) = mpsc::unbounded_channel::<NostrRelayMessage>();
1569
1570 {
1571 let mut pending = self.pending_nostr_queries.lock().await;
1572 pending.insert(subscription_key.clone(), tx);
1573 }
1574
1575 let req = NostrClientMessage::req(subscription_id.clone(), filters);
1576 if let Err(e) = dc.send_text(req.as_json()).await {
1577 let mut pending = self.pending_nostr_queries.lock().await;
1578 pending.remove(&subscription_key);
1579 return Err(e.into());
1580 }
1581 debug!(
1582 "[Peer {}] Sent Nostr REQ subscription {}",
1583 self.peer_id.short(),
1584 subscription_id
1585 );
1586
1587 let mut events = Vec::new();
1588 let deadline = tokio::time::Instant::now() + timeout;
1589
1590 loop {
1591 let now = tokio::time::Instant::now();
1592 if now >= deadline {
1593 break;
1594 }
1595 let remaining = deadline - now;
1596
1597 let next = tokio::time::timeout(remaining, rx.recv()).await;
1598 match next {
1599 Ok(Some(NostrRelayMessage::Event {
1600 subscription_id: sid,
1601 event,
1602 })) if sid == subscription_id => {
1603 debug!(
1604 "[Peer {}] Received Nostr EVENT for subscription {}",
1605 self.peer_id.short(),
1606 subscription_id
1607 );
1608 events.push(*event);
1609 }
1610 Ok(Some(NostrRelayMessage::EndOfStoredEvents(sid))) if sid == subscription_id => {
1611 debug!(
1612 "[Peer {}] Received Nostr EOSE for subscription {}",
1613 self.peer_id.short(),
1614 subscription_id
1615 );
1616 break;
1617 }
1618 Ok(Some(NostrRelayMessage::Closed {
1619 subscription_id: sid,
1620 message,
1621 })) if sid == subscription_id => {
1622 warn!(
1623 "[Peer {}] Nostr query closed for subscription {}: {}",
1624 self.peer_id.short(),
1625 subscription_id,
1626 message
1627 );
1628 break;
1629 }
1630 Ok(Some(_)) => {}
1631 Ok(None) => break,
1632 Err(_) => {
1633 warn!(
1634 "[Peer {}] Nostr query timed out for subscription {}",
1635 self.peer_id.short(),
1636 subscription_id
1637 );
1638 break;
1639 }
1640 }
1641 }
1642
1643 let close = NostrClientMessage::close(subscription_id.clone());
1644 let _ = dc.send_text(close.as_json()).await;
1645
1646 let mut pending = self.pending_nostr_queries.lock().await;
1647 pending.remove(&subscription_key);
1648 debug!(
1649 "[Peer {}] Nostr query subscription {} collected {} event(s)",
1650 self.peer_id.short(),
1651 subscription_id,
1652 events.len()
1653 );
1654
1655 Ok(events)
1656 }
1657
1658 pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
1660 let dc_guard = self.data_channel.lock().await;
1661 let dc = dc_guard
1662 .as_ref()
1663 .ok_or_else(|| anyhow::anyhow!("No data channel"))?
1664 .clone();
1665 drop(dc_guard);
1666
1667 let text = serde_json::to_string(frame)?;
1668 dc.send_text(text).await?;
1669 Ok(())
1670 }
1671
1672 pub async fn send_message(&self, msg: &DataMessage) -> Result<()> {
1674 let dc_guard = self.data_channel.lock().await;
1675 if let Some(ref dc) = *dc_guard {
1676 let wire = encode_message(msg)?;
1677 dc.send(&Bytes::from(wire)).await?;
1678 }
1679 Ok(())
1680 }
1681
1682 pub async fn close(&self) -> Result<()> {
1684 {
1685 let dc_guard = self.data_channel.lock().await;
1686 if let Some(ref dc) = *dc_guard {
1687 dc.close().await?;
1688 }
1689 }
1690 self.pc.close().await?;
1691 Ok(())
1692 }
1693}
1694
1695fn relay_subscription_id(msg: &NostrRelayMessage) -> Option<String> {
1696 match msg {
1697 NostrRelayMessage::Event {
1698 subscription_id, ..
1699 } => Some(subscription_id.to_string()),
1700 NostrRelayMessage::EndOfStoredEvents(subscription_id) => Some(subscription_id.to_string()),
1701 NostrRelayMessage::Closed {
1702 subscription_id, ..
1703 } => Some(subscription_id.to_string()),
1704 NostrRelayMessage::Count {
1705 subscription_id, ..
1706 } => Some(subscription_id.to_string()),
1707 _ => None,
1708 }
1709}