1use async_trait::async_trait;
8use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
9use std::collections::hash_map::DefaultHasher;
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::hash::{Hash as _, Hasher};
13use std::ops::Range;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{oneshot, Mutex, RwLock};
18use tokio::time::Instant;
19
20use hashtree_core::{Hash, Store, StoreError};
21
22use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
23use crate::protocol::{
24 create_quote_request, create_quote_response_available, create_quote_response_unavailable,
25 create_request, create_request_with_quote, create_response, encode_quote_request,
26 encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
27 DataMessage, DataQuoteRequest, DataQuoteResponse,
28};
29use crate::signaling::MeshRouter;
30use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
31use crate::types::{should_forward_htl, PeerHTLConfig, SignalingMessage, TimedSeenSet, MAX_HTL};
32
33const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
36const RECENT_FORWARD_MISS_CAPACITY: usize = 4096;
37const MIN_RECENT_FORWARD_MISS_TTL_MS: u64 = 250;
38
39struct PendingRequest {
41 response_tx: oneshot::Sender<Option<Vec<u8>>>,
42 started_at: Instant,
43 queried_peers: Vec<String>,
44}
45
46struct PendingQuoteRequest {
47 response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
48 preferred_mint_url: Option<String>,
49 offered_payment_sat: u64,
50}
51
52struct PendingForwardRequest {
53 requester_ids: HashSet<String>,
54}
55
56#[derive(Debug, Clone, Default)]
57struct PeerWireStats {
58 bytes_sent: u64,
59 bytes_received: u64,
60 bandwidth_debt: f64,
61}
62
63struct PendingResponseSend {
64 job_id: u64,
65 peer_id: String,
66 bytes: Vec<u8>,
67 ready_at: Instant,
68 queue_sequence: u64,
69}
70
71#[async_trait]
72pub trait MeshReadSource: Send + Sync {
73 fn id(&self) -> &str;
74
75 fn is_available(&self) -> bool {
76 true
77 }
78
79 async fn get(&self, hash: &Hash) -> Option<Vec<u8>>;
80}
81
82#[derive(Debug, Clone)]
83struct NegotiatedQuote {
84 peer_id: String,
85 quote_id: u64,
86 #[allow(dead_code)]
87 mint_url: Option<String>,
88}
89
90struct IssuedQuote {
91 expires_at: Instant,
92 #[allow(dead_code)]
93 payment_sat: u64,
94 #[allow(dead_code)]
95 mint_url: Option<String>,
96}
97
98#[derive(Debug, Clone, Default)]
99struct AdaptiveSourceStats {
100 requests: u64,
101 successes: u64,
102 misses: u64,
103 failures: u64,
104 timeouts: u64,
105 srtt_ms: f64,
106 rttvar_ms: f64,
107 backoff_level: u32,
108 backed_off_until: Option<Instant>,
109 last_success_at: Option<Instant>,
110 last_failure_at: Option<Instant>,
111}
112
113#[derive(Debug, Clone)]
114enum RouteFetchOutcome {
115 Hit(Vec<u8>),
116 Miss,
117 Timeout,
118}
119
120struct InflightSourceFetch {
121 waiters: Vec<oneshot::Sender<RouteFetchOutcome>>,
122}
123
124enum SourceFetchOutcome {
125 Hit {
126 source_id: String,
127 data: Vec<u8>,
128 elapsed_ms: u64,
129 },
130 Miss {
131 source_id: String,
132 },
133 Failure {
134 source_id: String,
135 },
136}
137
138const INITIAL_SOURCE_BACKOFF_MS: u64 = 250;
139const MAX_SOURCE_BACKOFF_MS: u64 = 10_000;
140const SOURCE_SCORE_TIE_DELTA: f64 = 0.15;
141const RECENT_SOURCE_SUCCESS_WINDOW: Duration = Duration::from_secs(60);
142const ACTIVE_PEER_REQUEST_RANK_PENALTY: usize = 3;
143
144fn source_reliability_score(stats: &AdaptiveSourceStats) -> f64 {
145 (stats.successes as f64 + 1.0) / (stats.requests as f64 + 2.0)
146}
147
148fn source_latency_score(stats: &AdaptiveSourceStats) -> f64 {
149 if stats.srtt_ms <= 0.0 {
150 return 0.5;
151 }
152 (500.0 / (stats.srtt_ms + 50.0)).min(1.0)
153}
154
155fn source_has_history(stats: &AdaptiveSourceStats) -> bool {
156 stats.requests > 0
157 || stats.successes > 0
158 || stats.misses > 0
159 || stats.failures > 0
160 || stats.timeouts > 0
161}
162
163fn adaptive_source_score(stats: &AdaptiveSourceStats, now: Instant) -> f64 {
164 if let Some(backed_off_until) = stats.backed_off_until {
165 if backed_off_until > now {
166 return f64::NEG_INFINITY;
167 }
168 }
169
170 let miss_penalty = if stats.requests > 0 {
171 (stats.misses as f64 / stats.requests as f64) * 0.15
172 } else {
173 0.0
174 };
175 let failure_penalty = if stats.requests > 0 {
176 ((stats.failures + stats.timeouts) as f64 / stats.requests as f64) * 0.3
177 } else {
178 0.0
179 };
180 let recency_bonus = if stats
181 .last_success_at
182 .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
183 {
184 0.1
185 } else {
186 0.0
187 };
188
189 0.6 * source_reliability_score(stats) + 0.3 * source_latency_score(stats) + recency_bonus
190 - miss_penalty
191 - failure_penalty
192}
193
194fn peer_endpoint_has_history(stats: &crate::peer_selector::PeerStats) -> bool {
195 stats.requests_sent > 0 || stats.successes > 0 || stats.failures > 0 || stats.timeouts > 0
196}
197
198fn peer_endpoint_score(stats: &crate::peer_selector::PeerStats, now: Instant) -> f64 {
199 if stats.backed_off_until.is_some_and(|until| until > now) {
200 return f64::NEG_INFINITY;
201 }
202
203 let miss_penalty = 0.0;
204 let failure_penalty = if stats.requests_sent > 0 {
205 ((stats.failures + stats.timeouts) as f64 / stats.requests_sent as f64) * 0.3
206 } else {
207 0.0
208 };
209 let recency_bonus = if stats
210 .last_success
211 .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
212 {
213 0.1
214 } else {
215 0.0
216 };
217
218 0.6 * stats.success_rate()
219 + 0.3
220 * source_latency_score(&AdaptiveSourceStats {
221 srtt_ms: stats.srtt_ms,
222 ..AdaptiveSourceStats::default()
223 })
224 + recency_bonus
225 - miss_penalty
226 - failure_penalty
227}
228
229#[derive(Clone)]
230enum ReadRoute {
231 Peers(Vec<String>),
232 Sources,
233}
234
235impl ReadRoute {
236 fn id(&self) -> &'static str {
237 match self {
238 Self::Peers(_) => "peers",
239 Self::Sources => "sources",
240 }
241 }
242}
243
244struct RankedReadRoute {
245 route: ReadRoute,
246 best_endpoint_id: String,
247 score: f64,
248 has_history: bool,
249}
250
251fn ranked_route_kind(route: &ReadRoute) -> u8 {
252 match route {
253 ReadRoute::Sources => 0,
254 ReadRoute::Peers(_) => 1,
255 }
256}
257
258#[derive(Debug, Clone)]
259struct MeshReadContext {
260 exclude_peer_id: Option<String>,
261 request_htl: u8,
262}
263
264impl Default for MeshReadContext {
265 fn default() -> Self {
266 Self {
267 exclude_peer_id: None,
268 request_htl: MAX_HTL,
269 }
270 }
271}
272
273#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
275pub struct DataPumpStats {
276 pub processed: usize,
277 pub request_messages: usize,
278 pub response_messages: usize,
279 pub quote_request_messages: u64,
280 pub quote_response_messages: u64,
281 pub processed_bytes: u64,
282}
283
284#[derive(Debug, Clone, Copy)]
290pub struct RequestDispatchConfig {
291 pub initial_fanout: usize,
293 pub hedge_fanout: usize,
295 pub max_fanout: usize,
297 pub hedge_interval_ms: u64,
299}
300
301impl Default for RequestDispatchConfig {
302 fn default() -> Self {
303 Self {
304 initial_fanout: usize::MAX,
305 hedge_fanout: usize::MAX,
306 max_fanout: usize::MAX,
307 hedge_interval_ms: 0,
308 }
309 }
310}
311
312pub fn normalize_dispatch_config(
314 dispatch: RequestDispatchConfig,
315 available_peers: usize,
316) -> RequestDispatchConfig {
317 let mut cfg = dispatch;
318 let cap = if cfg.max_fanout == 0 {
319 available_peers
320 } else {
321 cfg.max_fanout.min(available_peers)
322 };
323 cfg.max_fanout = cap;
324 cfg.initial_fanout = if cfg.initial_fanout == 0 {
325 1
326 } else {
327 cfg.initial_fanout.min(cap.max(1))
328 };
329 cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
330 1
331 } else {
332 cfg.hedge_fanout.min(cap.max(1))
333 };
334 cfg
335}
336
337pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
339 if peer_count == 0 {
340 return Vec::new();
341 }
342 let cap = dispatch.max_fanout.min(peer_count);
343 if cap == 0 {
344 return Vec::new();
345 }
346
347 let mut plan = Vec::new();
348 let mut sent = 0usize;
349 let first = dispatch.initial_fanout.min(cap).max(1);
350 plan.push(first);
351 sent += first;
352
353 while sent < cap {
354 let next = dispatch.hedge_fanout.min(cap - sent).max(1);
355 plan.push(next);
356 sent += next;
357 }
358 plan
359}
360
361#[derive(Debug)]
363pub enum HedgedWaveAction<T> {
364 Continue,
365 Success(T),
366 Abort,
367}
368
369pub async fn run_hedged_waves<T, SendWave, SendWaveFut, WaitWave, WaitWaveFut>(
374 peer_count: usize,
375 dispatch: RequestDispatchConfig,
376 request_timeout: Duration,
377 mut send_wave: SendWave,
378 mut wait_wave: WaitWave,
379) -> Option<T>
380where
381 SendWave: FnMut(Range<usize>) -> SendWaveFut,
382 SendWaveFut: Future<Output = usize>,
383 WaitWave: FnMut(Duration) -> WaitWaveFut,
384 WaitWaveFut: Future<Output = HedgedWaveAction<T>>,
385{
386 let dispatch = normalize_dispatch_config(dispatch, peer_count);
387 let wave_plan = build_hedged_wave_plan(peer_count, dispatch);
388 if wave_plan.is_empty() {
389 return None;
390 }
391
392 let deadline = Instant::now() + request_timeout;
393 let mut sent_total = 0usize;
394 let mut next_peer_idx = 0usize;
395
396 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
397 let from = next_peer_idx;
398 let to = (next_peer_idx + wave_size).min(peer_count);
399 next_peer_idx = to;
400
401 if from == to {
402 continue;
403 }
404
405 sent_total += send_wave(from..to).await;
406 if sent_total == 0 {
407 if next_peer_idx >= peer_count {
408 break;
409 }
410 continue;
411 }
412
413 let now = Instant::now();
414 if now >= deadline {
415 break;
416 }
417 let remaining = deadline.saturating_duration_since(now);
418 let is_last_wave = wave_idx + 1 == wave_plan.len() || next_peer_idx >= peer_count;
419 let wait = if is_last_wave {
420 remaining
421 } else if dispatch.hedge_interval_ms == 0 {
422 Duration::ZERO
423 } else {
424 Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
425 };
426
427 if wait.is_zero() {
428 continue;
429 }
430
431 match wait_wave(wait).await {
432 HedgedWaveAction::Continue => {}
433 HedgedWaveAction::Success(value) => return Some(value),
434 HedgedWaveAction::Abort => break,
435 }
436 }
437
438 None
439}
440
441pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
443 let mut selector = selector.write().await;
444 let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
445 let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
446 for peer_id in known {
447 if !current.contains(peer_id.as_str()) {
448 selector.remove_peer(&peer_id);
449 }
450 }
451 for peer_id in current_peer_ids {
452 selector.add_peer(peer_id.clone());
453 }
454}
455
456#[derive(Debug, Clone, Copy)]
460pub struct ResponseBehaviorConfig {
461 pub drop_response_prob: f64,
463 pub corrupt_response_prob: f64,
465 pub extra_delay_ms: u64,
467 pub first_byte_delay_ms: u64,
469 pub bytes_per_second: u64,
471 pub stall_response_prob: f64,
473 pub stall_delay_ms: u64,
475}
476
477impl Default for ResponseBehaviorConfig {
478 fn default() -> Self {
479 Self {
480 drop_response_prob: 0.0,
481 corrupt_response_prob: 0.0,
482 extra_delay_ms: 0,
483 first_byte_delay_ms: 0,
484 bytes_per_second: 0,
485 stall_response_prob: 0.0,
486 stall_delay_ms: 0,
487 }
488 }
489}
490
491impl ResponseBehaviorConfig {
492 fn normalized(self) -> Self {
493 Self {
494 drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
495 corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
496 extra_delay_ms: self.extra_delay_ms,
497 first_byte_delay_ms: self.first_byte_delay_ms,
498 bytes_per_second: self.bytes_per_second,
499 stall_response_prob: self.stall_response_prob.clamp(0.0, 1.0),
500 stall_delay_ms: self.stall_delay_ms,
501 }
502 }
503}
504
505#[derive(Debug, Clone)]
507pub struct MeshRoutingConfig {
508 pub selection_strategy: SelectionStrategy,
509 pub fairness_enabled: bool,
510 pub cashu_payment_weight: f64,
512 pub cashu_payment_default_block_threshold: u64,
515 pub cashu_accepted_mints: Vec<String>,
517 pub cashu_default_mint: Option<String>,
519 pub cashu_peer_suggested_mint_base_cap_sat: u64,
521 pub cashu_peer_suggested_mint_success_step_sat: u64,
523 pub cashu_peer_suggested_mint_receipt_step_sat: u64,
525 pub cashu_peer_suggested_mint_max_cap_sat: u64,
527 pub dispatch: RequestDispatchConfig,
528 pub response_behavior: ResponseBehaviorConfig,
529}
530
531impl Default for MeshRoutingConfig {
532 fn default() -> Self {
533 Self {
534 selection_strategy: SelectionStrategy::Weighted,
535 fairness_enabled: true,
536 cashu_payment_weight: 0.0,
537 cashu_payment_default_block_threshold: 0,
538 cashu_accepted_mints: Vec::new(),
539 cashu_default_mint: None,
540 cashu_peer_suggested_mint_base_cap_sat: 0,
541 cashu_peer_suggested_mint_success_step_sat: 0,
542 cashu_peer_suggested_mint_receipt_step_sat: 0,
543 cashu_peer_suggested_mint_max_cap_sat: 0,
544 dispatch: RequestDispatchConfig::default(),
545 response_behavior: ResponseBehaviorConfig::default(),
546 }
547 }
548}
549
550pub struct MeshStoreCore<S, R, F>
557where
558 S: Store + Send + Sync + 'static,
559 R: SignalingTransport + Send + Sync + 'static,
560 F: PeerLinkFactory + Send + Sync + 'static,
561{
562 local_store: Arc<S>,
564 signaling: Arc<MeshRouter<R, F>>,
566 htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
568 pending_requests: RwLock<HashMap<String, PendingRequest>>,
570 pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
572 pending_forward_requests: RwLock<HashMap<String, PendingForwardRequest>>,
574 recent_forward_misses: Mutex<TimedSeenSet>,
576 issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
578 next_quote_id: RwLock<u64>,
580 read_sources: RwLock<HashMap<String, Arc<dyn MeshReadSource>>>,
582 read_source_stats: RwLock<HashMap<String, AdaptiveSourceStats>>,
584 inflight_source_fetches: Mutex<HashMap<String, InflightSourceFetch>>,
586 peer_selector: RwLock<PeerSelector>,
588 peer_active_requests: RwLock<HashMap<String, usize>>,
590 peer_wire_stats: RwLock<HashMap<String, PeerWireStats>>,
592 pending_response_sends: Mutex<Vec<PendingResponseSend>>,
594 response_scheduler_running: AtomicBool,
596 next_response_job_id: AtomicU64,
598 routing: MeshRoutingConfig,
600 request_timeout: Duration,
602 debug: bool,
604 running: RwLock<bool>,
606}
607
608impl<S, R, F> MeshStoreCore<S, R, F>
609where
610 S: Store + Send + Sync + 'static,
611 R: SignalingTransport + Send + Sync + 'static,
612 F: PeerLinkFactory + Send + Sync + 'static,
613{
614 pub fn new(
616 local_store: Arc<S>,
617 signaling: Arc<MeshRouter<R, F>>,
618 request_timeout: Duration,
619 debug: bool,
620 ) -> Self {
621 Self::new_with_routing(
622 local_store,
623 signaling,
624 request_timeout,
625 debug,
626 Default::default(),
627 )
628 }
629
630 pub fn new_with_routing(
632 local_store: Arc<S>,
633 signaling: Arc<MeshRouter<R, F>>,
634 request_timeout: Duration,
635 debug: bool,
636 routing: MeshRoutingConfig,
637 ) -> Self {
638 let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
639 selector.set_fairness(routing.fairness_enabled);
640 selector.set_cashu_payment_weight(routing.cashu_payment_weight);
641 Self {
642 local_store,
643 signaling,
644 htl_configs: RwLock::new(HashMap::new()),
645 pending_requests: RwLock::new(HashMap::new()),
646 pending_quotes: RwLock::new(HashMap::new()),
647 pending_forward_requests: RwLock::new(HashMap::new()),
648 recent_forward_misses: Mutex::new(TimedSeenSet::new(
649 RECENT_FORWARD_MISS_CAPACITY,
650 Self::recent_forward_miss_ttl(request_timeout),
651 )),
652 issued_quotes: RwLock::new(HashMap::new()),
653 next_quote_id: RwLock::new(1),
654 read_sources: RwLock::new(HashMap::new()),
655 read_source_stats: RwLock::new(HashMap::new()),
656 inflight_source_fetches: Mutex::new(HashMap::new()),
657 peer_selector: RwLock::new(selector),
658 peer_active_requests: RwLock::new(HashMap::new()),
659 peer_wire_stats: RwLock::new(HashMap::new()),
660 pending_response_sends: Mutex::new(Vec::new()),
661 response_scheduler_running: AtomicBool::new(false),
662 next_response_job_id: AtomicU64::new(1),
663 routing,
664 request_timeout,
665 debug,
666 running: RwLock::new(false),
667 }
668 }
669
670 fn recent_forward_miss_ttl(request_timeout: Duration) -> Duration {
671 let ttl_ms = request_timeout
672 .as_millis()
673 .saturating_mul(2)
674 .max(MIN_RECENT_FORWARD_MISS_TTL_MS as u128)
675 .min(u64::MAX as u128) as u64;
676 Duration::from_millis(ttl_ms)
677 }
678
679 pub async fn start(&self) -> Result<(), TransportError> {
681 *self.running.write().await = true;
682
683 self.signaling.send_hello(vec![]).await?;
685
686 Ok(())
687 }
688
689 pub async fn stop(&self) {
691 *self.running.write().await = false;
692 }
693
694 pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
696 let peer_id = msg.peer_id().to_string();
698 {
699 let mut configs = self.htl_configs.write().await;
700 if !configs.contains_key(&peer_id) {
701 configs.insert(peer_id.clone(), PeerHTLConfig::random());
702 }
703 }
704 self.peer_selector.write().await.add_peer(peer_id);
705
706 self.signaling.handle_message(msg).await
707 }
708
709 pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
711 &self.signaling
712 }
713
714 fn response_behavior(&self) -> ResponseBehaviorConfig {
715 self.routing.response_behavior.normalized()
716 }
717
718 async fn record_peer_wire_sent(&self, peer_id: &str, bytes: u64) {
719 if bytes == 0 {
720 return;
721 }
722 let mut stats = self.peer_wire_stats.write().await;
723 let entry = stats.entry(peer_id.to_string()).or_default();
724 entry.bytes_sent = entry.bytes_sent.saturating_add(bytes);
725 }
726
727 async fn record_peer_wire_received(&self, peer_id: &str, bytes: u64) {
728 if bytes == 0 {
729 return;
730 }
731 let mut stats = self.peer_wire_stats.write().await;
732 let entry = stats.entry(peer_id.to_string()).or_default();
733 entry.bytes_received = entry.bytes_received.saturating_add(bytes);
734 }
735
736 fn peer_upload_weight(stats: &PeerWireStats) -> f64 {
737 let raw_ratio = (stats.bytes_received.saturating_add(1024) as f64)
738 / (stats.bytes_sent.saturating_add(1024) as f64);
739 let bounded_ratio = raw_ratio / (1.0 + raw_ratio);
740 0.5 + 1.5 * bounded_ratio
741 }
742
743 fn choose_ready_response_job(
744 ready_jobs: &[(u64, String, usize, Instant, u64)],
745 stats: &HashMap<String, PeerWireStats>,
746 ) -> Option<(u64, f64)> {
747 ready_jobs
748 .iter()
749 .map(|job| {
750 let peer_stats = stats.get(&job.1).cloned().unwrap_or_default();
751 let finish = peer_stats.bandwidth_debt
752 + (job.2 as f64 / Self::peer_upload_weight(&peer_stats));
753 (job.0, job.1.as_str(), job.4, finish)
754 })
755 .min_by(|left, right| {
756 left.3
757 .partial_cmp(&right.3)
758 .unwrap_or(std::cmp::Ordering::Equal)
759 .then_with(|| left.2.cmp(&right.2))
760 .then_with(|| left.1.cmp(right.1))
761 })
762 .map(|choice| (choice.0, choice.3))
763 }
764
765 async fn enqueue_response_send(
766 self: &Arc<Self>,
767 peer_id: String,
768 bytes: Vec<u8>,
769 ready_at: Instant,
770 ) {
771 let job_id = self.next_response_job_id.fetch_add(1, Ordering::Relaxed);
772 {
773 let mut queue = self.pending_response_sends.lock().await;
774 queue.push(PendingResponseSend {
775 job_id,
776 peer_id,
777 bytes,
778 ready_at,
779 queue_sequence: job_id,
780 });
781 }
782
783 if self
784 .response_scheduler_running
785 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
786 .is_ok()
787 {
788 let this = Arc::clone(self);
789 tokio::spawn(async move {
790 this.run_response_scheduler().await;
791 });
792 }
793 }
794
795 async fn run_response_scheduler(self: Arc<Self>) {
796 loop {
797 let snapshot = {
798 let queue = self.pending_response_sends.lock().await;
799 if queue.is_empty() {
800 self.response_scheduler_running
801 .store(false, Ordering::Release);
802 return;
803 }
804 queue
805 .iter()
806 .map(|job| {
807 (
808 job.job_id,
809 job.peer_id.clone(),
810 job.bytes.len(),
811 job.ready_at,
812 job.queue_sequence,
813 )
814 })
815 .collect::<Vec<_>>()
816 };
817
818 let now = Instant::now();
819 let mut earliest_ready_at: Option<Instant> = None;
820 let mut ready_jobs = Vec::new();
821 for job in &snapshot {
822 if job.3 <= now {
823 ready_jobs.push(job.clone());
824 } else {
825 earliest_ready_at = Some(match earliest_ready_at {
826 Some(current) => current.min(job.3),
827 None => job.3,
828 });
829 }
830 }
831
832 if ready_jobs.is_empty() {
833 if let Some(ready_at) = earliest_ready_at {
834 tokio::time::sleep(ready_at.saturating_duration_since(Instant::now())).await;
835 continue;
836 }
837 self.response_scheduler_running
838 .store(false, Ordering::Release);
839 return;
840 }
841
842 let (selected_job_id, selected_finish) = {
843 let stats = self.peer_wire_stats.read().await;
844 Self::choose_ready_response_job(&ready_jobs, &stats).expect("ready response job")
845 };
846
847 let selected = {
848 let mut queue = self.pending_response_sends.lock().await;
849 let Some(index) = queue.iter().position(|job| job.job_id == selected_job_id) else {
850 continue;
851 };
852 queue.swap_remove(index)
853 };
854
855 let sent = if let Some(channel) = self.signaling.get_channel(&selected.peer_id).await {
856 channel.send(selected.bytes.clone()).await.is_ok()
857 } else {
858 false
859 };
860
861 let queued_peers = {
862 let queue = self.pending_response_sends.lock().await;
863 queue
864 .iter()
865 .map(|job| job.peer_id.clone())
866 .collect::<HashSet<_>>()
867 };
868 let mut stats = self.peer_wire_stats.write().await;
869 let entry = stats.entry(selected.peer_id.clone()).or_default();
870 if sent {
871 entry.bytes_sent = entry.bytes_sent.saturating_add(selected.bytes.len() as u64);
872 entry.bandwidth_debt = selected_finish;
873 }
874 if queued_peers.is_empty() {
875 for peer_stats in stats.values_mut() {
876 peer_stats.bandwidth_debt = 0.0;
877 }
878 } else {
879 let floor = queued_peers
880 .iter()
881 .filter_map(|peer_id| stats.get(peer_id).map(|peer| peer.bandwidth_debt))
882 .fold(f64::INFINITY, f64::min);
883 if floor.is_finite() && floor > 0.0 {
884 for peer_id in queued_peers {
885 if let Some(peer_stats) = stats.get_mut(&peer_id) {
886 peer_stats.bandwidth_debt =
887 (peer_stats.bandwidth_debt - floor).max(0.0);
888 }
889 }
890 }
891 }
892 }
893 }
894
895 fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
896 let mut hasher = DefaultHasher::new();
897 peer_id.hash(&mut hasher);
898 hash.hash(&mut hasher);
899 salt.hash(&mut hasher);
900 let v = hasher.finish();
901 (v as f64) / (u64::MAX as f64)
902 }
903
904 fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
905 Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
906 }
907
908 fn peer_metadata_pointer_slot_hash() -> Hash {
909 hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
910 }
911
912 fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
913 let bytes = hex::decode(hash_hex)
914 .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
915 if bytes.len() != 32 {
916 return Err(StoreError::Other(format!(
917 "Invalid hash length {}, expected 32 bytes",
918 bytes.len()
919 )));
920 }
921 let mut hash = [0u8; 32];
922 hash.copy_from_slice(&bytes);
923 Ok(hash)
924 }
925
926 fn should_drop_response(&self, hash: &Hash) -> bool {
927 let p = self.response_behavior().drop_response_prob;
928 if p <= 0.0 {
929 return false;
930 }
931 self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
932 }
933
934 fn should_corrupt_response(&self, hash: &Hash) -> bool {
935 let p = self.response_behavior().corrupt_response_prob;
936 if p <= 0.0 {
937 return false;
938 }
939 self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
940 }
941
942 fn should_stall_response(&self, hash: &Hash) -> bool {
943 let p = self.response_behavior().stall_response_prob;
944 if p <= 0.0 {
945 return false;
946 }
947 self.deterministic_actor_draw(hash, 0x5A_11_5A_11_5A_11_5A_11) < p
948 }
949
950 fn response_send_delay(&self, hash: &Hash, payload_len: usize) -> Duration {
951 let behavior = self.response_behavior();
952 let mut total_ms = behavior
953 .extra_delay_ms
954 .saturating_add(behavior.first_byte_delay_ms);
955
956 if behavior.bytes_per_second > 0 && payload_len > 0 {
957 let throughput_ms = ((payload_len as u128) * 1000)
958 .div_ceil(behavior.bytes_per_second as u128)
959 .min(u64::MAX as u128) as u64;
960 total_ms = total_ms.saturating_add(throughput_ms);
961 }
962
963 if behavior.stall_delay_ms > 0 && self.should_stall_response(hash) {
964 total_ms = total_ms.saturating_add(behavior.stall_delay_ms);
965 }
966
967 Duration::from_millis(total_ms)
968 }
969
970 async fn ordered_connected_peers(&self, exclude_peer_id: Option<&str>) -> Vec<String> {
971 let current_peer_ids = self.signaling.peer_ids().await;
972 if current_peer_ids.is_empty() {
973 return Vec::new();
974 }
975
976 sync_selector_peers(&self.peer_selector, ¤t_peer_ids).await;
977 let mut candidate_peer_ids: Vec<String> = current_peer_ids
978 .into_iter()
979 .filter(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude))
980 .collect();
981 if candidate_peer_ids.is_empty() {
982 return Vec::new();
983 }
984
985 let current_set: HashSet<&str> = candidate_peer_ids.iter().map(String::as_str).collect();
986 let mut selector = self.peer_selector.write().await;
987 let mut selector_order = selector.select_peers();
988 selector_order.retain(|peer_id| current_set.contains(peer_id.as_str()));
989 if selector_order.is_empty() {
990 let mut fallback = candidate_peer_ids;
991 fallback.sort();
992 return fallback;
993 }
994 let backed_off: HashMap<String, bool> = candidate_peer_ids
995 .iter()
996 .map(|peer_id| (peer_id.clone(), selector.is_peer_backed_off(peer_id)))
997 .collect();
998 drop(selector);
999
1000 let rank: HashMap<&str, usize> = selector_order
1001 .iter()
1002 .enumerate()
1003 .map(|(idx, peer_id)| (peer_id.as_str(), idx))
1004 .collect();
1005 let active = self.peer_active_requests.read().await;
1006 candidate_peer_ids.sort_by(|left, right| {
1007 let left_backed_off = backed_off.get(left).copied().unwrap_or(false);
1008 let right_backed_off = backed_off.get(right).copied().unwrap_or(false);
1009 if left_backed_off != right_backed_off {
1010 return if left_backed_off {
1011 std::cmp::Ordering::Greater
1012 } else {
1013 std::cmp::Ordering::Less
1014 };
1015 }
1016 let left_rank = rank.get(left.as_str()).copied().unwrap_or(usize::MAX / 2);
1017 let right_rank = rank.get(right.as_str()).copied().unwrap_or(usize::MAX / 2);
1018 let left_load = active.get(left).copied().unwrap_or(0);
1019 let right_load = active.get(right).copied().unwrap_or(0);
1020 (left_rank + left_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY))
1021 .cmp(&(right_rank + right_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY)))
1022 .then_with(|| left.cmp(right))
1023 });
1024 candidate_peer_ids
1025 }
1026
1027 async fn reserve_peer_request(&self, peer_id: &str) {
1028 let mut active = self.peer_active_requests.write().await;
1029 *active.entry(peer_id.to_string()).or_insert(0) += 1;
1030 }
1031
1032 async fn release_peer_request(&self, peer_id: &str) {
1033 let mut active = self.peer_active_requests.write().await;
1034 let Some(count) = active.get_mut(peer_id) else {
1035 return;
1036 };
1037 if *count <= 1 {
1038 active.remove(peer_id);
1039 } else {
1040 *count -= 1;
1041 }
1042 }
1043
1044 async fn release_queried_peer_requests(&self, peer_ids: &[String]) {
1045 for peer_id in peer_ids {
1046 self.release_peer_request(peer_id).await;
1047 }
1048 }
1049
1050 fn requested_quote_mint(&self) -> Option<&str> {
1051 if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
1052 if self.routing.cashu_accepted_mints.is_empty()
1053 || self
1054 .routing
1055 .cashu_accepted_mints
1056 .iter()
1057 .any(|mint| mint == default_mint)
1058 {
1059 return Some(default_mint);
1060 }
1061 }
1062
1063 self.routing
1064 .cashu_accepted_mints
1065 .first()
1066 .map(String::as_str)
1067 }
1068
1069 fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
1070 if let Some(requested_mint) = requested_mint {
1071 if self.accepts_quote_mint(Some(requested_mint)) {
1072 return Some(requested_mint.to_string());
1073 }
1074 }
1075 if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
1076 return Some(default_mint.clone());
1077 }
1078 if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
1079 return Some(first_mint.clone());
1080 }
1081 requested_mint.map(str::to_string)
1082 }
1083
1084 fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
1085 if self.routing.cashu_accepted_mints.is_empty() {
1086 return true;
1087 }
1088
1089 let Some(mint_url) = mint_url else {
1090 return false;
1091 };
1092 self.routing
1093 .cashu_accepted_mints
1094 .iter()
1095 .any(|mint| mint == mint_url)
1096 }
1097
1098 fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
1099 let Some(mint_url) = mint_url else {
1100 return self.routing.cashu_default_mint.is_none()
1101 && self.routing.cashu_accepted_mints.is_empty();
1102 };
1103 self.routing.cashu_default_mint.as_deref() == Some(mint_url)
1104 || self
1105 .routing
1106 .cashu_accepted_mints
1107 .iter()
1108 .any(|mint| mint == mint_url)
1109 }
1110
1111 async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
1112 let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
1113 if base == 0 {
1114 return 0;
1115 }
1116
1117 let selector = self.peer_selector.read().await;
1118 let Some(stats) = selector.get_stats(peer_id) else {
1119 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
1120 return if max_cap > 0 { base.min(max_cap) } else { base };
1121 };
1122
1123 if stats.cashu_payment_defaults > 0
1124 && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
1125 {
1126 return 0;
1127 }
1128
1129 let success_bonus = stats
1130 .successes
1131 .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
1132 let receipt_bonus = stats
1133 .cashu_payment_receipts
1134 .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
1135 let mut cap = base
1136 .saturating_add(success_bonus)
1137 .saturating_add(receipt_bonus);
1138 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
1139 if max_cap > 0 {
1140 cap = cap.min(max_cap);
1141 }
1142 cap
1143 }
1144
1145 async fn should_accept_quote_response(
1146 &self,
1147 from_peer: &str,
1148 preferred_mint_url: Option<&str>,
1149 offered_payment_sat: u64,
1150 res: &DataQuoteResponse,
1151 ) -> bool {
1152 let Some(payment_sat) = res.p else {
1153 return false;
1154 };
1155 if payment_sat > offered_payment_sat {
1156 return false;
1157 }
1158
1159 let response_mint = res.m.as_deref();
1160 if response_mint == preferred_mint_url {
1161 return true;
1162 }
1163 if self.trusts_quote_mint(response_mint) {
1164 return true;
1165 }
1166 if response_mint.is_none() {
1167 return false;
1168 }
1169
1170 payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
1171 }
1172
1173 async fn issue_quote(
1174 &self,
1175 peer_id: &str,
1176 hash_key: &str,
1177 payment_sat: u64,
1178 ttl_ms: u32,
1179 mint_url: Option<&str>,
1180 ) -> u64 {
1181 let quote_id = {
1182 let mut next = self.next_quote_id.write().await;
1183 let quote_id = *next;
1184 *next = next.saturating_add(1);
1185 quote_id
1186 };
1187
1188 let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
1189 self.issued_quotes.write().await.insert(
1190 (peer_id.to_string(), hash_key.to_string(), quote_id),
1191 IssuedQuote {
1192 expires_at,
1193 payment_sat,
1194 mint_url: mint_url.map(str::to_string),
1195 },
1196 );
1197 quote_id
1198 }
1199
1200 async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
1201 let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
1202 let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
1203 return false;
1204 };
1205 quote.expires_at > Instant::now()
1206 }
1207
1208 async fn send_request_to_peer(
1209 &self,
1210 peer_id: &str,
1211 hash: &Hash,
1212 request_htl: u8,
1213 quote_id: Option<u64>,
1214 ) -> bool {
1215 if !should_forward_htl(request_htl) {
1216 return false;
1217 }
1218
1219 let channel = match self.signaling.get_channel(peer_id).await {
1220 Some(c) => c,
1221 None => return false,
1222 };
1223
1224 let htl_config = {
1225 let configs = self.htl_configs.read().await;
1226 configs
1227 .get(peer_id)
1228 .cloned()
1229 .unwrap_or_else(PeerHTLConfig::random)
1230 };
1231
1232 let send_htl = htl_config.decrement(request_htl);
1233 let req = match quote_id {
1234 Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
1235 None => create_request(hash, send_htl),
1236 };
1237 let request_bytes = encode_request(&req);
1238 let request_len = request_bytes.len() as u64;
1239
1240 {
1241 let mut selector = self.peer_selector.write().await;
1242 selector.record_request(peer_id, request_len);
1243 }
1244
1245 match channel.send(request_bytes).await {
1246 Ok(()) => {
1247 self.record_peer_wire_sent(peer_id, request_len).await;
1248 true
1249 }
1250 Err(_) => {
1251 self.peer_selector.write().await.record_failure(peer_id);
1252 false
1253 }
1254 }
1255 }
1256
1257 async fn send_quote_request_to_peer(
1258 &self,
1259 peer_id: &str,
1260 hash: &Hash,
1261 payment_sat: u64,
1262 ttl_ms: u32,
1263 mint_url: Option<&str>,
1264 ) -> bool {
1265 let channel = match self.signaling.get_channel(peer_id).await {
1266 Some(c) => c,
1267 None => return false,
1268 };
1269
1270 let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
1271 let request_bytes = encode_quote_request(&req);
1272 let request_len = request_bytes.len() as u64;
1273
1274 match channel.send(request_bytes).await {
1275 Ok(()) => {
1276 self.record_peer_wire_sent(peer_id, request_len).await;
1277 true
1278 }
1279 Err(_) => false,
1280 }
1281 }
1282
1283 pub async fn set_read_sources(&self, sources: Vec<Arc<dyn MeshReadSource>>) {
1284 let mut by_id = HashMap::new();
1285 let mut stats = self.read_source_stats.write().await;
1286 for source in sources {
1287 let source_id = source.id().to_string();
1288 by_id.insert(source_id.clone(), source);
1289 stats
1290 .entry(source_id)
1291 .or_insert_with(AdaptiveSourceStats::default);
1292 }
1293 *self.read_sources.write().await = by_id;
1294 }
1295
1296 async fn record_read_source_request(&self, source_id: &str) {
1297 let mut stats = self.read_source_stats.write().await;
1298 stats
1299 .entry(source_id.to_string())
1300 .or_insert_with(AdaptiveSourceStats::default)
1301 .requests += 1;
1302 }
1303
1304 async fn record_read_source_miss(&self, source_id: &str) {
1305 let mut stats = self.read_source_stats.write().await;
1306 stats
1307 .entry(source_id.to_string())
1308 .or_insert_with(AdaptiveSourceStats::default)
1309 .misses += 1;
1310 }
1311
1312 async fn record_read_source_success(&self, source_id: &str, elapsed_ms: u64) {
1313 let now = Instant::now();
1314 let mut stats = self.read_source_stats.write().await;
1315 let stats = stats
1316 .entry(source_id.to_string())
1317 .or_insert_with(AdaptiveSourceStats::default);
1318 stats.successes += 1;
1319 stats.last_success_at = Some(now);
1320 stats.backoff_level = 0;
1321 stats.backed_off_until = None;
1322 if stats.srtt_ms <= 0.0 {
1323 stats.srtt_ms = elapsed_ms as f64;
1324 stats.rttvar_ms = elapsed_ms as f64 / 2.0;
1325 return;
1326 }
1327 let elapsed = elapsed_ms as f64;
1328 stats.rttvar_ms = 0.75 * stats.rttvar_ms + 0.25 * (stats.srtt_ms - elapsed).abs();
1329 stats.srtt_ms = 0.875 * stats.srtt_ms + 0.125 * elapsed;
1330 }
1331
1332 async fn record_read_source_failure(&self, source_id: &str) {
1333 let now = Instant::now();
1334 let mut stats = self.read_source_stats.write().await;
1335 let stats = stats
1336 .entry(source_id.to_string())
1337 .or_insert_with(AdaptiveSourceStats::default);
1338 stats.failures += 1;
1339 stats.last_failure_at = Some(now);
1340 Self::apply_source_backoff(stats, now);
1341 }
1342
1343 async fn record_read_source_timeout(&self, source_id: &str) {
1344 let now = Instant::now();
1345 let mut stats = self.read_source_stats.write().await;
1346 let stats = stats
1347 .entry(source_id.to_string())
1348 .or_insert_with(AdaptiveSourceStats::default);
1349 stats.timeouts += 1;
1350 stats.last_failure_at = Some(now);
1351 Self::apply_source_backoff(stats, now);
1352 }
1353
1354 fn apply_source_backoff(stats: &mut AdaptiveSourceStats, now: Instant) {
1355 stats.backoff_level = stats.backoff_level.saturating_add(1);
1356 let backoff_ms = (INITIAL_SOURCE_BACKOFF_MS
1357 .saturating_mul(2u64.saturating_pow(stats.backoff_level.saturating_sub(1))))
1358 .min(MAX_SOURCE_BACKOFF_MS);
1359 stats.backed_off_until = Some(now + Duration::from_millis(backoff_ms));
1360 }
1361
1362 async fn ordered_read_sources(&self) -> Vec<Arc<dyn MeshReadSource>> {
1363 let sources = self.read_sources.read().await;
1364 if sources.is_empty() {
1365 return Vec::new();
1366 }
1367
1368 let mut available: Vec<Arc<dyn MeshReadSource>> = sources
1369 .values()
1370 .filter(|source| source.is_available())
1371 .cloned()
1372 .collect();
1373 if available.is_empty() {
1374 return Vec::new();
1375 }
1376
1377 let now = Instant::now();
1378 let stats = self.read_source_stats.read().await;
1379 let mut healthy: Vec<Arc<dyn MeshReadSource>> = available
1380 .iter()
1381 .filter(|source| {
1382 stats
1383 .get(source.id())
1384 .and_then(|s| s.backed_off_until)
1385 .is_none_or(|until| until <= now)
1386 })
1387 .cloned()
1388 .collect();
1389 if !healthy.is_empty() {
1390 available = std::mem::take(&mut healthy);
1391 }
1392
1393 available.sort_by(|left, right| {
1394 let left_stats = stats.get(left.id()).cloned().unwrap_or_default();
1395 let right_stats = stats.get(right.id()).cloned().unwrap_or_default();
1396 adaptive_source_score(&right_stats, now)
1397 .partial_cmp(&adaptive_source_score(&left_stats, now))
1398 .unwrap_or(std::cmp::Ordering::Equal)
1399 .then_with(|| left.id().cmp(right.id()))
1400 });
1401 available
1402 }
1403
1404 async fn should_probe_multiple_read_sources(
1405 &self,
1406 ordered_sources: &[Arc<dyn MeshReadSource>],
1407 ) -> bool {
1408 if ordered_sources.len() <= 1 {
1409 return false;
1410 }
1411 let stats = self.read_source_stats.read().await;
1412 let best = stats
1413 .get(ordered_sources[0].id())
1414 .cloned()
1415 .unwrap_or_default();
1416 let second = stats
1417 .get(ordered_sources[1].id())
1418 .cloned()
1419 .unwrap_or_default();
1420 if !source_has_history(&best) || !source_has_history(&second) {
1421 return false;
1422 }
1423 let now = Instant::now();
1424 adaptive_source_score(&best, now) - adaptive_source_score(&second, now)
1425 < SOURCE_SCORE_TIE_DELTA
1426 }
1427
1428 async fn source_dispatch_for(&self, source_count: usize) -> RequestDispatchConfig {
1429 if source_count == 0 {
1430 return self.routing.dispatch;
1431 }
1432 let ordered_sources = self.ordered_read_sources().await;
1433 let probe_multiple = self
1434 .should_probe_multiple_read_sources(&ordered_sources)
1435 .await;
1436 let initial_fanout = if probe_multiple {
1437 source_count.min(2)
1438 } else {
1439 1
1440 };
1441 RequestDispatchConfig {
1442 initial_fanout,
1443 hedge_fanout: self.routing.dispatch.hedge_fanout,
1444 max_fanout: self.routing.dispatch.max_fanout.min(source_count),
1445 hedge_interval_ms: self.routing.dispatch.hedge_interval_ms,
1446 }
1447 }
1448
1449 pub async fn peer_count(&self) -> usize {
1451 self.signaling.peer_count().await
1452 }
1453
1454 pub async fn needs_peers(&self) -> bool {
1456 self.signaling.needs_peers().await
1457 }
1458
1459 pub async fn send_hello(&self) -> Result<(), TransportError> {
1461 self.signaling.send_hello(vec![]).await
1462 }
1463
1464 pub async fn drain_available_data_messages(self: &Arc<Self>) -> DataPumpStats {
1469 let mut stats = DataPumpStats::default();
1470 let peer_ids = self.signaling.peer_ids().await;
1471 for peer_id in peer_ids {
1472 let Some(channel) = self.signaling.get_channel(&peer_id).await else {
1473 continue;
1474 };
1475
1476 while let Some(data) = channel.try_recv() {
1477 stats.processed += 1;
1478 stats.processed_bytes += data.len() as u64;
1479 if let Some(msg) = parse_message(&data) {
1480 match msg {
1481 DataMessage::Request(_) => stats.request_messages += 1,
1482 DataMessage::Response(_) => stats.response_messages += 1,
1483 DataMessage::QuoteRequest(_) => stats.quote_request_messages += 1,
1484 DataMessage::QuoteResponse(_) => stats.quote_response_messages += 1,
1485 DataMessage::Payment(_)
1486 | DataMessage::PaymentAck(_)
1487 | DataMessage::Chunk(_) => {}
1488 }
1489 }
1490 self.handle_data_message(&peer_id, &data).await;
1491 }
1492 }
1493 stats
1494 }
1495
1496 pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
1498 self.peer_selector
1499 .write()
1500 .await
1501 .record_cashu_payment(peer_id, amount_sat);
1502 }
1503
1504 pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
1506 self.peer_selector
1507 .write()
1508 .await
1509 .record_cashu_receipt(peer_id, amount_sat);
1510 }
1511
1512 pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
1514 self.peer_selector
1515 .write()
1516 .await
1517 .record_cashu_payment_default(peer_id);
1518 }
1519
1520 pub async fn selector_summary(&self) -> crate::peer_selector::SelectorSummary {
1522 self.peer_selector.read().await.summary()
1523 }
1524
1525 fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
1526 selector.is_peer_blocked_for_payment_defaults(
1527 peer_id,
1528 self.routing.cashu_payment_default_block_threshold,
1529 )
1530 }
1531
1532 pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
1534 self.peer_selector
1535 .read()
1536 .await
1537 .export_peer_metadata_snapshot()
1538 }
1539
1540 pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
1545 let snapshot = self
1546 .peer_selector
1547 .read()
1548 .await
1549 .export_peer_metadata_snapshot();
1550 let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
1551 StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
1552 })?;
1553 let snapshot_hash = hashtree_core::sha256(&bytes);
1554 let _ = self.local_store.put(snapshot_hash, bytes).await?;
1555
1556 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
1557 let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
1558 let _ = self.local_store.delete(&pointer_slot).await?;
1559 let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
1560
1561 Ok(snapshot_hash)
1562 }
1563
1564 pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
1566 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
1567 let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
1568 return Ok(false);
1569 };
1570 let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
1571 StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
1572 })?;
1573 let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
1574
1575 let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
1576 return Ok(false);
1577 };
1578 let snapshot: PeerMetadataSnapshot =
1579 serde_json::from_slice(&snapshot_bytes).map_err(|e| {
1580 StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
1581 })?;
1582 self.peer_selector
1583 .write()
1584 .await
1585 .import_peer_metadata_snapshot(&snapshot);
1586 Ok(true)
1587 }
1588
1589 pub async fn get_with_quote(
1594 &self,
1595 hash: &Hash,
1596 payment_sat: u64,
1597 quote_ttl: Duration,
1598 ) -> Result<Option<Vec<u8>>, StoreError> {
1599 if let Some(data) = self.local_store.get(hash).await? {
1600 return Ok(Some(data));
1601 }
1602 Ok(self
1603 .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
1604 .await)
1605 }
1606
1607 async fn request_from_peers_with_quote(
1608 &self,
1609 hash: &Hash,
1610 payment_sat: u64,
1611 quote_ttl: Duration,
1612 ) -> Option<Vec<u8>> {
1613 let ordered_peer_ids = self.ordered_connected_peers(None).await;
1614 if ordered_peer_ids.is_empty() {
1615 return None;
1616 }
1617
1618 if let Some(quote) = self
1619 .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
1620 .await
1621 {
1622 if let Some(data) = self
1623 .request_from_single_peer(hash, "e.peer_id, MAX_HTL, Some(quote.quote_id))
1624 .await
1625 {
1626 return Some(data);
1627 }
1628 }
1629
1630 self.request_from_mesh(hash).await
1631 }
1632
1633 async fn request_quote_from_peers(
1634 &self,
1635 hash: &Hash,
1636 payment_sat: u64,
1637 quote_ttl: Duration,
1638 ordered_peer_ids: &[String],
1639 ) -> Option<NegotiatedQuote> {
1640 if ordered_peer_ids.is_empty() {
1641 return None;
1642 }
1643 let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
1644 if ttl_ms == 0 {
1645 return None;
1646 }
1647 let requested_mint = self.requested_quote_mint().map(str::to_string);
1648
1649 let hash_key = hash_to_key(hash);
1650 let (tx, rx) = oneshot::channel();
1651 self.pending_quotes.write().await.insert(
1652 hash_key.clone(),
1653 PendingQuoteRequest {
1654 response_tx: tx,
1655 preferred_mint_url: requested_mint.clone(),
1656 offered_payment_sat: payment_sat,
1657 },
1658 );
1659
1660 let rx = Arc::new(Mutex::new(rx));
1661 let result = run_hedged_waves(
1662 ordered_peer_ids.len(),
1663 self.routing.dispatch,
1664 self.request_timeout,
1665 |range| {
1666 let wave_peer_ids = ordered_peer_ids[range].to_vec();
1667 let requested_mint = requested_mint.clone();
1668 let hash = *hash;
1669 async move {
1670 let mut sent = 0usize;
1671 for peer_id in wave_peer_ids {
1672 if self
1673 .send_quote_request_to_peer(
1674 &peer_id,
1675 &hash,
1676 payment_sat,
1677 ttl_ms,
1678 requested_mint.as_deref(),
1679 )
1680 .await
1681 {
1682 sent += 1;
1683 }
1684 }
1685 sent
1686 }
1687 },
1688 |wait| {
1689 let rx = rx.clone();
1690 async move {
1691 let mut rx = rx.lock().await;
1692 match tokio::time::timeout(wait, &mut *rx).await {
1693 Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
1694 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1695 Err(_) => HedgedWaveAction::Continue,
1696 }
1697 }
1698 },
1699 )
1700 .await;
1701 let _ = self.pending_quotes.write().await.remove(&hash_key);
1702 result
1703 }
1704
1705 async fn request_from_single_peer(
1706 &self,
1707 hash: &Hash,
1708 peer_id: &str,
1709 request_htl: u8,
1710 quote_id: Option<u64>,
1711 ) -> Option<Vec<u8>> {
1712 let hash_key = hash_to_key(hash);
1713 let (tx, rx) = oneshot::channel();
1714 self.pending_requests.write().await.insert(
1715 hash_key.clone(),
1716 PendingRequest {
1717 response_tx: tx,
1718 started_at: Instant::now(),
1719 queried_peers: vec![peer_id.to_string()],
1720 },
1721 );
1722
1723 let mut rx = rx;
1724 if !self
1725 .send_request_to_peer(peer_id, hash, request_htl, quote_id)
1726 .await
1727 {
1728 let _ = self.pending_requests.write().await.remove(&hash_key);
1729 return None;
1730 }
1731 self.reserve_peer_request(peer_id).await;
1732
1733 if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
1734 if hashtree_core::sha256(&data) == *hash {
1735 let _ = self.local_store.put(*hash, data.clone()).await;
1736 return Some(data);
1737 }
1738 }
1739
1740 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1741 self.release_queried_peer_requests(&pending.queried_peers)
1742 .await;
1743 for peer_id in pending.queried_peers {
1744 self.peer_selector.write().await.record_timeout(&peer_id);
1745 }
1746 }
1747 let _ = self.take_forward_requesters(&hash_key).await;
1748 None
1749 }
1750
1751 async fn request_from_ordered_peers(
1752 &self,
1753 hash: &Hash,
1754 ordered_peer_ids: &[String],
1755 request_htl: u8,
1756 ) -> RouteFetchOutcome {
1757 let hash_key = hash_to_key(hash);
1758 let (tx, rx) = oneshot::channel();
1759 self.pending_requests.write().await.insert(
1760 hash_key.clone(),
1761 PendingRequest {
1762 response_tx: tx,
1763 started_at: Instant::now(),
1764 queried_peers: Vec::new(),
1765 },
1766 );
1767
1768 let rx = Arc::new(Mutex::new(rx));
1769 let result = run_hedged_waves(
1770 ordered_peer_ids.len(),
1771 self.routing.dispatch,
1772 self.request_timeout,
1773 |range| {
1774 let wave_peer_ids = ordered_peer_ids[range].to_vec();
1775 let hash = *hash;
1776 let hash_key = hash_key.clone();
1777 async move {
1778 let mut sent = 0usize;
1779 for peer_id in wave_peer_ids {
1780 if self
1781 .send_request_to_peer(&peer_id, &hash, request_htl, None)
1782 .await
1783 {
1784 sent += 1;
1785 self.reserve_peer_request(&peer_id).await;
1786 if let Some(pending) =
1787 self.pending_requests.write().await.get_mut(&hash_key)
1788 {
1789 pending.queried_peers.push(peer_id);
1790 }
1791 }
1792 }
1793 sent
1794 }
1795 },
1796 |wait| {
1797 let rx = rx.clone();
1798 async move {
1799 let mut rx = rx.lock().await;
1800 match tokio::time::timeout(wait, &mut *rx).await {
1801 Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == *hash => {
1802 HedgedWaveAction::Success(data)
1803 }
1804 Ok(Ok(Some(_))) => HedgedWaveAction::Continue,
1805 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1806 Err(_) => HedgedWaveAction::Continue,
1807 }
1808 }
1809 },
1810 )
1811 .await;
1812
1813 let Some(data) = result else {
1814 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1815 self.release_queried_peer_requests(&pending.queried_peers)
1816 .await;
1817 for peer_id in pending.queried_peers {
1818 self.peer_selector.write().await.record_timeout(&peer_id);
1819 }
1820 }
1821 let _ = self.take_forward_requesters(&hash_key).await;
1822 return RouteFetchOutcome::Timeout;
1823 };
1824
1825 let _ = self.local_store.put(*hash, data.clone()).await;
1826 RouteFetchOutcome::Hit(data)
1827 }
1828
1829 async fn request_from_read_sources_inner(&self, hash: &Hash) -> RouteFetchOutcome {
1830 let ordered_sources = self.ordered_read_sources().await;
1831 if ordered_sources.is_empty() {
1832 return RouteFetchOutcome::Miss;
1833 }
1834
1835 let dispatch = normalize_dispatch_config(
1836 self.source_dispatch_for(ordered_sources.len()).await,
1837 ordered_sources.len(),
1838 );
1839 let wave_plan = build_hedged_wave_plan(ordered_sources.len(), dispatch);
1840 if wave_plan.is_empty() {
1841 return RouteFetchOutcome::Miss;
1842 }
1843
1844 let deadline = Instant::now() + self.request_timeout;
1845 let mut pending = FuturesUnordered::new();
1846 let mut pending_source_ids = HashSet::new();
1847 let mut saw_timeout = false;
1848 let mut next_source_idx = 0usize;
1849
1850 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
1851 let from = next_source_idx;
1852 let to = (next_source_idx + wave_size).min(ordered_sources.len());
1853 next_source_idx = to;
1854
1855 for source in ordered_sources[from..to].iter().cloned() {
1856 let source_id = source.id().to_string();
1857 self.record_read_source_request(&source_id).await;
1858 pending_source_ids.insert(source_id.clone());
1859 let hash = *hash;
1860 pending.push(tokio::spawn(async move {
1861 let started_at = Instant::now();
1862 let result = std::panic::AssertUnwindSafe(source.get(&hash))
1863 .catch_unwind()
1864 .await;
1865 match result {
1866 Ok(Some(data)) => SourceFetchOutcome::Hit {
1867 source_id,
1868 data,
1869 elapsed_ms: started_at.elapsed().as_millis().max(1) as u64,
1870 },
1871 Ok(None) => SourceFetchOutcome::Miss { source_id },
1872 Err(_) => SourceFetchOutcome::Failure { source_id },
1873 }
1874 }));
1875 }
1876
1877 let is_last_wave =
1878 wave_idx + 1 == wave_plan.len() || next_source_idx >= ordered_sources.len();
1879 let window_end = if is_last_wave {
1880 deadline
1881 } else {
1882 (Instant::now() + Duration::from_millis(dispatch.hedge_interval_ms)).min(deadline)
1883 };
1884
1885 while Instant::now() < window_end {
1886 let remaining = window_end.saturating_duration_since(Instant::now());
1887 let Some(result) = tokio::time::timeout(remaining, pending.next())
1888 .await
1889 .ok()
1890 .flatten()
1891 else {
1892 break;
1893 };
1894 let Ok(outcome) = result else {
1895 continue;
1896 };
1897 match outcome {
1898 SourceFetchOutcome::Hit {
1899 source_id,
1900 data,
1901 elapsed_ms,
1902 } => {
1903 pending_source_ids.remove(&source_id);
1904 self.record_read_source_success(&source_id, elapsed_ms)
1905 .await;
1906 return RouteFetchOutcome::Hit(data);
1907 }
1908 SourceFetchOutcome::Miss { source_id } => {
1909 pending_source_ids.remove(&source_id);
1910 self.record_read_source_miss(&source_id).await;
1911 }
1912 SourceFetchOutcome::Failure { source_id } => {
1913 pending_source_ids.remove(&source_id);
1914 self.record_read_source_failure(&source_id).await;
1915 }
1916 }
1917 }
1918
1919 if Instant::now() >= deadline {
1920 break;
1921 }
1922 }
1923
1924 for source_id in pending_source_ids {
1925 saw_timeout = true;
1926 self.record_read_source_timeout(&source_id).await;
1927 }
1928 if saw_timeout {
1929 RouteFetchOutcome::Timeout
1930 } else {
1931 RouteFetchOutcome::Miss
1932 }
1933 }
1934
1935 async fn request_from_read_sources(&self, hash: &Hash) -> RouteFetchOutcome {
1936 let hash_key = hash_to_key(hash);
1937 let existing_wait = {
1938 let mut inflight = self.inflight_source_fetches.lock().await;
1939 if let Some(existing) = inflight.get_mut(&hash_key) {
1940 let (tx, rx) = oneshot::channel();
1941 existing.waiters.push(tx);
1942 Some(rx)
1943 } else {
1944 inflight.insert(
1945 hash_key.clone(),
1946 InflightSourceFetch {
1947 waiters: Vec::new(),
1948 },
1949 );
1950 None
1951 }
1952 };
1953
1954 if let Some(wait) = existing_wait {
1955 return wait.await.unwrap_or(RouteFetchOutcome::Timeout);
1956 }
1957
1958 let result = self.request_from_read_sources_inner(hash).await;
1959 if let RouteFetchOutcome::Hit(hit) = &result {
1960 let _ = self.local_store.put(*hash, hit.clone()).await;
1961 }
1962
1963 let waiters = self
1964 .inflight_source_fetches
1965 .lock()
1966 .await
1967 .remove(&hash_key)
1968 .map(|inflight| inflight.waiters)
1969 .unwrap_or_default();
1970 for waiter in waiters {
1971 let _ = waiter.send(result.clone());
1972 }
1973
1974 result
1975 }
1976
1977 async fn ranked_read_routes(&self, context: &MeshReadContext) -> Vec<RankedReadRoute> {
1978 let mut routes = Vec::new();
1979 let ordered_peers = if should_forward_htl(context.request_htl) {
1980 self.ordered_connected_peers(context.exclude_peer_id.as_deref())
1981 .await
1982 } else {
1983 Vec::new()
1984 };
1985 if !ordered_peers.is_empty() {
1986 let best_peer_id = ordered_peers[0].clone();
1987 let selector = self.peer_selector.read().await;
1988 let best_peer = selector.get_stats(&best_peer_id).cloned();
1989 let now = Instant::now();
1990 let (score, has_history) = match best_peer.as_ref() {
1991 Some(stats) => (
1992 peer_endpoint_score(stats, now),
1993 peer_endpoint_has_history(stats),
1994 ),
1995 None => (0.0, false),
1996 };
1997 routes.push(RankedReadRoute {
1998 route: ReadRoute::Peers(ordered_peers),
1999 best_endpoint_id: format!("peer:{best_peer_id}"),
2000 score,
2001 has_history,
2002 });
2003 }
2004 let ordered_sources = self.ordered_read_sources().await;
2005 if let Some(best_source) = ordered_sources.first() {
2006 let stats = self.read_source_stats.read().await;
2007 let best_source_stats = stats.get(best_source.id()).cloned().unwrap_or_default();
2008 let now = Instant::now();
2009 routes.push(RankedReadRoute {
2010 route: ReadRoute::Sources,
2011 best_endpoint_id: format!("source:{}", best_source.id()),
2012 score: adaptive_source_score(&best_source_stats, now),
2013 has_history: source_has_history(&best_source_stats),
2014 });
2015 }
2016 if routes.len() <= 1 {
2017 return routes;
2018 }
2019
2020 routes.sort_by(|left, right| {
2021 right
2022 .score
2023 .partial_cmp(&left.score)
2024 .unwrap_or(std::cmp::Ordering::Equal)
2025 .then_with(|| ranked_route_kind(&left.route).cmp(&ranked_route_kind(&right.route)))
2026 .then_with(|| left.best_endpoint_id.cmp(&right.best_endpoint_id))
2027 .then_with(|| left.route.id().cmp(right.route.id()))
2028 });
2029 routes
2030 }
2031
2032 fn should_probe_multiple_routes(&self, routes: &[RankedReadRoute]) -> bool {
2033 if routes.len() <= 1 {
2034 return false;
2035 }
2036 if !routes[0].has_history || !routes[1].has_history {
2037 return false;
2038 }
2039 (routes[0].score - routes[1].score) < SOURCE_SCORE_TIE_DELTA
2040 }
2041
2042 async fn run_read_route(
2043 &self,
2044 hash: &Hash,
2045 route: &ReadRoute,
2046 context: &MeshReadContext,
2047 ) -> RouteFetchOutcome {
2048 match route {
2049 ReadRoute::Peers(peer_ids) => {
2050 self.request_from_ordered_peers(hash, peer_ids, context.request_htl)
2051 .await
2052 }
2053 ReadRoute::Sources => self.request_from_read_sources(hash).await,
2054 }
2055 }
2056
2057 async fn request_from_mesh_with_context(
2058 &self,
2059 hash: &Hash,
2060 context: &MeshReadContext,
2061 ) -> Option<Vec<u8>> {
2062 let routes = self.ranked_read_routes(context).await;
2063 match routes.as_slice() {
2064 [] => None,
2065 [ranked] => match self.run_read_route(hash, &ranked.route, context).await {
2066 RouteFetchOutcome::Hit(data) => Some(data),
2067 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
2068 },
2069 [first, second, ..] => {
2070 if self.should_probe_multiple_routes(&routes) {
2071 let first_fut = self.run_read_route(hash, &first.route, context);
2072 let second_fut = self.run_read_route(hash, &second.route, context);
2073 tokio::pin!(first_fut);
2074 tokio::pin!(second_fut);
2075 let mut first_done = false;
2076 let mut second_done = false;
2077 loop {
2078 tokio::select! {
2079 result = &mut first_fut, if !first_done => {
2080 first_done = true;
2081 if let RouteFetchOutcome::Hit(data) = result {
2082 return Some(data);
2083 }
2084 }
2085 result = &mut second_fut, if !second_done => {
2086 second_done = true;
2087 if let RouteFetchOutcome::Hit(data) = result {
2088 return Some(data);
2089 }
2090 }
2091 else => break,
2092 }
2093 if first_done && second_done {
2094 break;
2095 }
2096 }
2097 None
2098 } else {
2099 match self.run_read_route(hash, &first.route, context).await {
2100 RouteFetchOutcome::Hit(data) => return Some(data),
2101 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
2102 }
2103 for ranked in routes.iter().skip(1) {
2104 match self.run_read_route(hash, &ranked.route, context).await {
2105 RouteFetchOutcome::Hit(data) => return Some(data),
2106 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
2107 }
2108 }
2109 None
2110 }
2111 }
2112 }
2113 }
2114
2115 async fn request_from_mesh(&self, hash: &Hash) -> Option<Vec<u8>> {
2116 self.request_from_mesh_with_context(hash, &MeshReadContext::default())
2117 .await
2118 }
2119
2120 async fn begin_forward_request(&self, hash_key: &str, requester_id: &str) -> bool {
2121 let mut pending = self.pending_forward_requests.write().await;
2122 if let Some(existing) = pending.get_mut(hash_key) {
2123 existing.requester_ids.insert(requester_id.to_string());
2124 return false;
2125 }
2126
2127 let mut requester_ids = HashSet::new();
2128 requester_ids.insert(requester_id.to_string());
2129 pending.insert(
2130 hash_key.to_string(),
2131 PendingForwardRequest { requester_ids },
2132 );
2133 true
2134 }
2135
2136 async fn was_recent_forward_miss(&self, hash_key: &str) -> bool {
2137 self.recent_forward_misses.lock().await.contains(hash_key)
2138 }
2139
2140 async fn mark_recent_forward_miss(&self, hash_key: &str) {
2141 let _ = self
2142 .recent_forward_misses
2143 .lock()
2144 .await
2145 .insert_if_new(hash_key.to_string());
2146 }
2147
2148 async fn take_forward_requesters(&self, hash_key: &str) -> Vec<String> {
2149 self.pending_forward_requests
2150 .write()
2151 .await
2152 .remove(hash_key)
2153 .map(|pending| pending.requester_ids.into_iter().collect())
2154 .unwrap_or_default()
2155 }
2156
2157 async fn complete_pending_response(
2158 self: &Arc<Self>,
2159 from_peer: &str,
2160 hash: &Hash,
2161 hash_key: String,
2162 payload: Vec<u8>,
2163 ) {
2164 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2165 self.release_queried_peer_requests(&pending.queried_peers)
2166 .await;
2167 let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
2168 self.peer_selector.write().await.record_success(
2169 from_peer,
2170 rtt_ms,
2171 payload.len() as u64,
2172 );
2173 let forward_requesters = self.take_forward_requesters(&hash_key).await;
2174 let response_bytes = if forward_requesters.is_empty() {
2175 None
2176 } else {
2177 Some(encode_response(&create_response(hash, payload.clone())))
2178 };
2179 let _ = pending.response_tx.send(Some(payload));
2180 if let Some(response_bytes) = response_bytes {
2181 for requester_id in forward_requesters {
2182 Arc::clone(&self)
2183 .enqueue_response_send(requester_id, response_bytes.clone(), Instant::now())
2184 .await;
2185 }
2186 }
2187 }
2188 }
2189
2190 async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
2191 if !res.a {
2192 return;
2193 }
2194
2195 let Some(quote_id) = res.q else {
2196 return;
2197 };
2198
2199 let hash_key = hash_to_key(&res.h);
2200 let (preferred_mint_url, offered_payment_sat) = {
2201 let pending_quotes = self.pending_quotes.read().await;
2202 let Some(pending) = pending_quotes.get(&hash_key) else {
2203 return;
2204 };
2205 (
2206 pending.preferred_mint_url.clone(),
2207 pending.offered_payment_sat,
2208 )
2209 };
2210 if !self
2211 .should_accept_quote_response(
2212 from_peer,
2213 preferred_mint_url.as_deref(),
2214 offered_payment_sat,
2215 &res,
2216 )
2217 .await
2218 {
2219 return;
2220 }
2221 let mut pending_quotes = self.pending_quotes.write().await;
2222 if let Some(pending) = pending_quotes.remove(&hash_key) {
2223 let _ = pending.response_tx.send(Some(NegotiatedQuote {
2224 peer_id: from_peer.to_string(),
2225 quote_id,
2226 mint_url: res.m,
2227 }));
2228 }
2229 }
2230
2231 async fn handle_response_message(
2232 self: &Arc<Self>,
2233 from_peer: &str,
2234 res: crate::protocol::DataResponse,
2235 ) {
2236 let hash_key = hash_to_key(&res.h);
2237 let hash = match crate::protocol::bytes_to_hash(&res.h) {
2238 Some(h) => h,
2239 None => return,
2240 };
2241
2242 if hashtree_core::sha256(&res.d) != hash {
2244 self.peer_selector.write().await.record_failure(from_peer);
2245 if self.debug {
2246 println!("[MeshStoreCore] Ignoring invalid response payload for {hash_key}");
2247 }
2248 return;
2249 }
2250
2251 self.complete_pending_response(from_peer, &hash, hash_key, res.d)
2252 .await;
2253 }
2254
2255 async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
2256 let hash = match crate::protocol::bytes_to_hash(&req.h) {
2257 Some(h) => h,
2258 None => return,
2259 };
2260 let hash_key = hash_to_key(&hash);
2261
2262 {
2263 let selector = self.peer_selector.read().await;
2264 if self.should_refuse_requests_from_peer(&selector, from_peer) {
2265 if self.debug {
2266 println!(
2267 "[MeshStoreCore] Refusing quote request from delinquent peer {}",
2268 from_peer
2269 );
2270 }
2271 return;
2272 }
2273 }
2274
2275 let chosen_mint = self.choose_quote_mint(req.m.as_deref());
2276 let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
2277 && !self.should_drop_response(&hash)
2278 && !self.should_corrupt_response(&hash);
2279
2280 let res = if can_serve {
2281 let quote_id = self
2282 .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
2283 .await;
2284 create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
2285 } else {
2286 create_quote_response_unavailable(&hash)
2287 };
2288 let response_bytes = encode_quote_response(&res);
2289 if let Some(channel) = self.signaling.get_channel(from_peer).await {
2290 if channel.send(response_bytes.clone()).await.is_ok() {
2291 self.record_peer_wire_sent(from_peer, response_bytes.len() as u64)
2292 .await;
2293 }
2294 }
2295 }
2296
2297 async fn handle_request_message(
2298 self: &Arc<Self>,
2299 from_peer: &str,
2300 req: crate::protocol::DataRequest,
2301 ) {
2302 let hash = match crate::protocol::bytes_to_hash(&req.h) {
2303 Some(h) => h,
2304 None => return,
2305 };
2306 let hash_key = hash_to_key(&hash);
2307
2308 if let Some(quote_id) = req.q {
2309 if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
2310 if self.debug {
2311 println!(
2312 "[MeshStoreCore] Refusing request with invalid or expired quote {} from {}",
2313 quote_id, from_peer
2314 );
2315 }
2316 return;
2317 }
2318 }
2319
2320 let allow_peer_forwarding = {
2321 let selector = self.peer_selector.read().await;
2322 !self.should_refuse_requests_from_peer(&selector, from_peer)
2323 };
2324
2325 if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
2327 if self.should_drop_response(&hash) {
2328 if self.debug {
2329 println!(
2330 "[MeshStoreCore] Dropping response for {} due to actor profile",
2331 hash_to_key(&hash)
2332 );
2333 }
2334 return;
2335 }
2336
2337 let response_delay = self.response_send_delay(&hash, data.len());
2338 if self.should_corrupt_response(&hash) {
2339 if data.is_empty() {
2340 data.push(0x80);
2341 } else {
2342 data[0] ^= 0x80;
2343 }
2344 }
2345
2346 let res = create_response(&hash, data);
2348 let response_bytes = encode_response(&res);
2349 let ready_at = Instant::now() + response_delay;
2350 Arc::clone(self)
2351 .enqueue_response_send(from_peer.to_string(), response_bytes, ready_at)
2352 .await;
2353 return;
2354 }
2355
2356 if self.pending_requests.read().await.contains_key(&hash_key) {
2357 let _ = self.begin_forward_request(&hash_key, from_peer).await;
2358 return;
2359 }
2360
2361 if self.was_recent_forward_miss(&hash_key).await {
2362 if self.debug {
2363 println!(
2364 "[MeshStoreCore] Suppressing recently missed forwarded request for {}",
2365 hash_key
2366 );
2367 }
2368 return;
2369 }
2370
2371 if !self.begin_forward_request(&hash_key, from_peer).await {
2372 return;
2373 }
2374
2375 let from_peer = from_peer.to_string();
2376 let this = Arc::clone(self);
2377 let request_htl = req.htl;
2378 tokio::spawn(async move {
2379 let result = if allow_peer_forwarding {
2380 let context = MeshReadContext {
2381 exclude_peer_id: Some(from_peer.clone()),
2382 request_htl,
2383 };
2384 this.request_from_mesh_with_context(&hash, &context).await
2385 } else {
2386 if this.debug {
2387 println!(
2388 "[MeshStoreCore] Serving request from delinquent peer {} via read sources only",
2389 from_peer
2390 );
2391 }
2392 match this.request_from_read_sources(&hash).await {
2393 RouteFetchOutcome::Hit(data) => Some(data),
2394 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
2395 }
2396 };
2397 let requester_ids = this.take_forward_requesters(&hash_key).await;
2398 if let Some(data) = result {
2399 let ready_at = Instant::now() + this.response_send_delay(&hash, data.len());
2400 let res = create_response(&hash, data);
2401 let response_bytes = encode_response(&res);
2402 for requester_id in requester_ids {
2403 Arc::clone(&this)
2404 .enqueue_response_send(requester_id, response_bytes.clone(), ready_at)
2405 .await;
2406 }
2407 } else {
2408 this.mark_recent_forward_miss(&hash_key).await;
2409 }
2410 });
2411 }
2412
2413 pub async fn handle_data_message(self: &Arc<Self>, from_peer: &str, data: &[u8]) {
2415 self.record_peer_wire_received(from_peer, data.len() as u64)
2416 .await;
2417 let parsed = match parse_message(data) {
2418 Some(m) => m,
2419 None => return,
2420 };
2421
2422 match parsed {
2423 DataMessage::Request(req) => {
2424 self.handle_request_message(from_peer, req).await;
2425 }
2426 DataMessage::Response(res) => {
2427 self.handle_response_message(from_peer, res).await;
2428 }
2429 DataMessage::QuoteRequest(req) => {
2430 self.handle_quote_request_message(from_peer, req).await;
2431 }
2432 DataMessage::QuoteResponse(res) => {
2433 self.handle_quote_response_message(from_peer, res).await;
2434 }
2435 DataMessage::Payment(_) | DataMessage::PaymentAck(_) | DataMessage::Chunk(_) => {}
2436 }
2437 }
2438}
2439
2440#[async_trait]
2441impl<S, R, F> Store for MeshStoreCore<S, R, F>
2442where
2443 S: Store + Send + Sync + 'static,
2444 R: SignalingTransport + Send + Sync + 'static,
2445 F: PeerLinkFactory + Send + Sync + 'static,
2446{
2447 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2448 self.local_store.put(hash, data).await
2449 }
2450
2451 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2452 if let Some(data) = self.local_store.get(hash).await? {
2454 return Ok(Some(data));
2455 }
2456
2457 Ok(self.request_from_mesh(hash).await)
2459 }
2460
2461 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2462 self.local_store.has(hash).await
2463 }
2464
2465 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2466 self.local_store.delete(hash).await
2467 }
2468}
2469
2470#[cfg(test)]
2471mod tests;
2472
2473pub type SimMeshStore<S> =
2475 MeshStoreCore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
2476
2477pub type ProductionMeshStore<S> =
2479 MeshStoreCore<S, crate::nostr::NostrRelayTransport, crate::real_factory::WebRtcPeerLinkFactory>;