1use async_trait::async_trait;
8use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
9use std::collections::hash_map::DefaultHasher;
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::hash::{Hash as _, Hasher};
13use std::ops::Range;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{oneshot, Mutex, RwLock};
18use tokio::time::Instant;
19
20use hashtree_core::{Hash, Store, StoreError};
21
22use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
23use crate::protocol::{
24 create_quote_request, create_quote_response_available, create_quote_response_unavailable,
25 create_request, create_request_with_quote, create_response, encode_quote_request,
26 encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
27 DataMessage, DataQuoteRequest, DataQuoteResponse,
28};
29use crate::signaling::MeshRouter;
30use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
31use crate::types::{should_forward_htl, PeerHTLConfig, SignalingMessage, TimedSeenSet, MAX_HTL};
32
33const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
36const RECENT_FORWARD_MISS_CAPACITY: usize = 4096;
37const MIN_RECENT_FORWARD_MISS_TTL_MS: u64 = 250;
38
39struct PendingRequest {
41 response_tx: oneshot::Sender<Option<Vec<u8>>>,
42 started_at: Instant,
43 queried_peers: Vec<String>,
44}
45
46struct PendingQuoteRequest {
47 response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
48 preferred_mint_url: Option<String>,
49 offered_payment_sat: u64,
50}
51
52struct PendingForwardRequest {
53 requester_ids: HashSet<String>,
54}
55
56#[derive(Debug, Clone, Default)]
57struct PeerWireStats {
58 bytes_sent: u64,
59 bytes_received: u64,
60 bandwidth_debt: f64,
61}
62
63struct PendingResponseSend {
64 job_id: u64,
65 peer_id: String,
66 bytes: Vec<u8>,
67 ready_at: Instant,
68 queue_sequence: u64,
69}
70
71#[async_trait]
72pub trait MeshReadSource: Send + Sync {
73 fn id(&self) -> &str;
74
75 fn is_available(&self) -> bool {
76 true
77 }
78
79 async fn get(&self, hash: &Hash) -> Option<Vec<u8>>;
80}
81
82#[derive(Debug, Clone)]
83struct NegotiatedQuote {
84 peer_id: String,
85 quote_id: u64,
86 #[allow(dead_code)]
87 mint_url: Option<String>,
88}
89
90struct IssuedQuote {
91 expires_at: Instant,
92 #[allow(dead_code)]
93 payment_sat: u64,
94 #[allow(dead_code)]
95 mint_url: Option<String>,
96}
97
98#[derive(Debug, Clone, Default)]
99struct AdaptiveSourceStats {
100 requests: u64,
101 successes: u64,
102 misses: u64,
103 failures: u64,
104 timeouts: u64,
105 srtt_ms: f64,
106 rttvar_ms: f64,
107 backoff_level: u32,
108 backed_off_until: Option<Instant>,
109 last_success_at: Option<Instant>,
110 last_failure_at: Option<Instant>,
111}
112
113#[derive(Debug, Clone)]
114enum RouteFetchOutcome {
115 Hit(Vec<u8>),
116 Miss,
117 Timeout,
118}
119
120struct InflightSourceFetch {
121 waiters: Vec<oneshot::Sender<RouteFetchOutcome>>,
122}
123
124enum SourceFetchOutcome {
125 Hit {
126 source_id: String,
127 data: Vec<u8>,
128 elapsed_ms: u64,
129 },
130 Miss {
131 source_id: String,
132 },
133 Failure {
134 source_id: String,
135 },
136}
137
138const INITIAL_SOURCE_BACKOFF_MS: u64 = 250;
139const MAX_SOURCE_BACKOFF_MS: u64 = 10_000;
140const SOURCE_SCORE_TIE_DELTA: f64 = 0.15;
141const RECENT_SOURCE_SUCCESS_WINDOW: Duration = Duration::from_secs(60);
142const ACTIVE_PEER_REQUEST_RANK_PENALTY: usize = 3;
143
144fn source_reliability_score(stats: &AdaptiveSourceStats) -> f64 {
145 (stats.successes as f64 + 1.0) / (stats.requests as f64 + 2.0)
146}
147
148fn source_latency_score(stats: &AdaptiveSourceStats) -> f64 {
149 if stats.srtt_ms <= 0.0 {
150 return 0.5;
151 }
152 (500.0 / (stats.srtt_ms + 50.0)).min(1.0)
153}
154
155fn source_has_history(stats: &AdaptiveSourceStats) -> bool {
156 stats.requests > 0
157 || stats.successes > 0
158 || stats.misses > 0
159 || stats.failures > 0
160 || stats.timeouts > 0
161}
162
163fn adaptive_source_score(stats: &AdaptiveSourceStats, now: Instant) -> f64 {
164 if let Some(backed_off_until) = stats.backed_off_until {
165 if backed_off_until > now {
166 return f64::NEG_INFINITY;
167 }
168 }
169
170 let miss_penalty = if stats.requests > 0 {
171 (stats.misses as f64 / stats.requests as f64) * 0.15
172 } else {
173 0.0
174 };
175 let failure_penalty = if stats.requests > 0 {
176 ((stats.failures + stats.timeouts) as f64 / stats.requests as f64) * 0.3
177 } else {
178 0.0
179 };
180 let recency_bonus = if stats
181 .last_success_at
182 .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
183 {
184 0.1
185 } else {
186 0.0
187 };
188
189 0.6 * source_reliability_score(stats) + 0.3 * source_latency_score(stats) + recency_bonus
190 - miss_penalty
191 - failure_penalty
192}
193
194fn peer_endpoint_has_history(stats: &crate::peer_selector::PeerStats) -> bool {
195 stats.requests_sent > 0 || stats.successes > 0 || stats.failures > 0 || stats.timeouts > 0
196}
197
198fn peer_endpoint_score(stats: &crate::peer_selector::PeerStats, now: Instant) -> f64 {
199 if stats.backed_off_until.is_some_and(|until| until > now) {
200 return f64::NEG_INFINITY;
201 }
202
203 let miss_penalty = 0.0;
204 let failure_penalty = if stats.requests_sent > 0 {
205 ((stats.failures + stats.timeouts) as f64 / stats.requests_sent as f64) * 0.3
206 } else {
207 0.0
208 };
209 let recency_bonus = if stats
210 .last_success
211 .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
212 {
213 0.1
214 } else {
215 0.0
216 };
217
218 0.6 * stats.success_rate()
219 + 0.3
220 * source_latency_score(&AdaptiveSourceStats {
221 srtt_ms: stats.srtt_ms,
222 ..AdaptiveSourceStats::default()
223 })
224 + recency_bonus
225 - miss_penalty
226 - failure_penalty
227}
228
229#[derive(Clone)]
230enum ReadRoute {
231 Peers(Vec<String>),
232 Sources,
233}
234
235impl ReadRoute {
236 fn id(&self) -> &'static str {
237 match self {
238 Self::Peers(_) => "peers",
239 Self::Sources => "sources",
240 }
241 }
242}
243
244struct RankedReadRoute {
245 route: ReadRoute,
246 best_endpoint_id: String,
247 score: f64,
248 has_history: bool,
249}
250
251fn ranked_route_kind(route: &ReadRoute) -> u8 {
252 match route {
253 ReadRoute::Sources => 0,
254 ReadRoute::Peers(_) => 1,
255 }
256}
257
258#[derive(Debug, Clone)]
259struct MeshReadContext {
260 exclude_peer_id: Option<String>,
261 request_htl: u8,
262}
263
264impl Default for MeshReadContext {
265 fn default() -> Self {
266 Self {
267 exclude_peer_id: None,
268 request_htl: MAX_HTL,
269 }
270 }
271}
272
273#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
275pub struct DataPumpStats {
276 pub processed: usize,
277 pub request_messages: usize,
278 pub response_messages: usize,
279 pub quote_request_messages: u64,
280 pub quote_response_messages: u64,
281 pub processed_bytes: u64,
282}
283
284#[derive(Debug, Clone, Copy)]
290pub struct RequestDispatchConfig {
291 pub initial_fanout: usize,
293 pub hedge_fanout: usize,
295 pub max_fanout: usize,
297 pub hedge_interval_ms: u64,
299}
300
301impl Default for RequestDispatchConfig {
302 fn default() -> Self {
303 Self {
304 initial_fanout: usize::MAX,
305 hedge_fanout: usize::MAX,
306 max_fanout: usize::MAX,
307 hedge_interval_ms: 0,
308 }
309 }
310}
311
312pub fn normalize_dispatch_config(
314 dispatch: RequestDispatchConfig,
315 available_peers: usize,
316) -> RequestDispatchConfig {
317 let mut cfg = dispatch;
318 let cap = if cfg.max_fanout == 0 {
319 available_peers
320 } else {
321 cfg.max_fanout.min(available_peers)
322 };
323 cfg.max_fanout = cap;
324 cfg.initial_fanout = if cfg.initial_fanout == 0 {
325 1
326 } else {
327 cfg.initial_fanout.min(cap.max(1))
328 };
329 cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
330 1
331 } else {
332 cfg.hedge_fanout.min(cap.max(1))
333 };
334 cfg
335}
336
337pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
339 if peer_count == 0 {
340 return Vec::new();
341 }
342 let cap = dispatch.max_fanout.min(peer_count);
343 if cap == 0 {
344 return Vec::new();
345 }
346
347 let mut plan = Vec::new();
348 let mut sent = 0usize;
349 let first = dispatch.initial_fanout.min(cap).max(1);
350 plan.push(first);
351 sent += first;
352
353 while sent < cap {
354 let next = dispatch.hedge_fanout.min(cap - sent).max(1);
355 plan.push(next);
356 sent += next;
357 }
358 plan
359}
360
361#[derive(Debug)]
363pub enum HedgedWaveAction<T> {
364 Continue,
365 Success(T),
366 Abort,
367}
368
369pub async fn run_hedged_waves<T, SendWave, SendWaveFut, WaitWave, WaitWaveFut>(
374 peer_count: usize,
375 dispatch: RequestDispatchConfig,
376 request_timeout: Duration,
377 mut send_wave: SendWave,
378 mut wait_wave: WaitWave,
379) -> Option<T>
380where
381 SendWave: FnMut(Range<usize>) -> SendWaveFut,
382 SendWaveFut: Future<Output = usize>,
383 WaitWave: FnMut(Duration) -> WaitWaveFut,
384 WaitWaveFut: Future<Output = HedgedWaveAction<T>>,
385{
386 let dispatch = normalize_dispatch_config(dispatch, peer_count);
387 let wave_plan = build_hedged_wave_plan(peer_count, dispatch);
388 if wave_plan.is_empty() {
389 return None;
390 }
391
392 let deadline = Instant::now() + request_timeout;
393 let mut sent_total = 0usize;
394 let mut next_peer_idx = 0usize;
395
396 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
397 let from = next_peer_idx;
398 let to = (next_peer_idx + wave_size).min(peer_count);
399 next_peer_idx = to;
400
401 if from == to {
402 continue;
403 }
404
405 sent_total += send_wave(from..to).await;
406 if sent_total == 0 {
407 if next_peer_idx >= peer_count {
408 break;
409 }
410 continue;
411 }
412
413 let now = Instant::now();
414 if now >= deadline {
415 break;
416 }
417 let remaining = deadline.saturating_duration_since(now);
418 let is_last_wave = wave_idx + 1 == wave_plan.len() || next_peer_idx >= peer_count;
419 let wait = if is_last_wave {
420 remaining
421 } else if dispatch.hedge_interval_ms == 0 {
422 Duration::ZERO
423 } else {
424 Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
425 };
426
427 if wait.is_zero() {
428 continue;
429 }
430
431 match wait_wave(wait).await {
432 HedgedWaveAction::Continue => {}
433 HedgedWaveAction::Success(value) => return Some(value),
434 HedgedWaveAction::Abort => break,
435 }
436 }
437
438 None
439}
440
441pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
443 let mut selector = selector.write().await;
444 let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
445 let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
446 for peer_id in known {
447 if !current.contains(peer_id.as_str()) {
448 selector.remove_peer(&peer_id);
449 }
450 }
451 for peer_id in current_peer_ids {
452 selector.add_peer(peer_id.clone());
453 }
454}
455
456#[derive(Debug, Clone, Copy)]
460pub struct ResponseBehaviorConfig {
461 pub drop_response_prob: f64,
463 pub corrupt_response_prob: f64,
465 pub extra_delay_ms: u64,
467 pub first_byte_delay_ms: u64,
469 pub bytes_per_second: u64,
471 pub stall_response_prob: f64,
473 pub stall_delay_ms: u64,
475}
476
477impl Default for ResponseBehaviorConfig {
478 fn default() -> Self {
479 Self {
480 drop_response_prob: 0.0,
481 corrupt_response_prob: 0.0,
482 extra_delay_ms: 0,
483 first_byte_delay_ms: 0,
484 bytes_per_second: 0,
485 stall_response_prob: 0.0,
486 stall_delay_ms: 0,
487 }
488 }
489}
490
491impl ResponseBehaviorConfig {
492 fn normalized(self) -> Self {
493 Self {
494 drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
495 corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
496 extra_delay_ms: self.extra_delay_ms,
497 first_byte_delay_ms: self.first_byte_delay_ms,
498 bytes_per_second: self.bytes_per_second,
499 stall_response_prob: self.stall_response_prob.clamp(0.0, 1.0),
500 stall_delay_ms: self.stall_delay_ms,
501 }
502 }
503}
504
505#[derive(Debug, Clone)]
507pub struct MeshRoutingConfig {
508 pub selection_strategy: SelectionStrategy,
509 pub fairness_enabled: bool,
510 pub cashu_payment_weight: f64,
512 pub cashu_payment_default_block_threshold: u64,
515 pub cashu_accepted_mints: Vec<String>,
517 pub cashu_default_mint: Option<String>,
519 pub cashu_peer_suggested_mint_base_cap_sat: u64,
521 pub cashu_peer_suggested_mint_success_step_sat: u64,
523 pub cashu_peer_suggested_mint_receipt_step_sat: u64,
525 pub cashu_peer_suggested_mint_max_cap_sat: u64,
527 pub dispatch: RequestDispatchConfig,
528 pub response_behavior: ResponseBehaviorConfig,
529}
530
531impl Default for MeshRoutingConfig {
532 fn default() -> Self {
533 Self {
534 selection_strategy: SelectionStrategy::Weighted,
535 fairness_enabled: true,
536 cashu_payment_weight: 0.0,
537 cashu_payment_default_block_threshold: 0,
538 cashu_accepted_mints: Vec::new(),
539 cashu_default_mint: None,
540 cashu_peer_suggested_mint_base_cap_sat: 0,
541 cashu_peer_suggested_mint_success_step_sat: 0,
542 cashu_peer_suggested_mint_receipt_step_sat: 0,
543 cashu_peer_suggested_mint_max_cap_sat: 0,
544 dispatch: RequestDispatchConfig::default(),
545 response_behavior: ResponseBehaviorConfig::default(),
546 }
547 }
548}
549
550pub struct MeshStoreCore<S, R, F>
557where
558 S: Store + Send + Sync + 'static,
559 R: SignalingTransport + Send + Sync + 'static,
560 F: PeerLinkFactory + Send + Sync + 'static,
561{
562 local_store: Arc<S>,
564 signaling: Arc<MeshRouter<R, F>>,
566 htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
568 pending_requests: RwLock<HashMap<String, PendingRequest>>,
570 pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
572 pending_forward_requests: RwLock<HashMap<String, PendingForwardRequest>>,
574 recent_forward_misses: Mutex<TimedSeenSet>,
576 issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
578 next_quote_id: RwLock<u64>,
580 read_sources: RwLock<HashMap<String, Arc<dyn MeshReadSource>>>,
582 read_source_stats: RwLock<HashMap<String, AdaptiveSourceStats>>,
584 inflight_source_fetches: Mutex<HashMap<String, InflightSourceFetch>>,
586 peer_selector: RwLock<PeerSelector>,
588 peer_active_requests: RwLock<HashMap<String, usize>>,
590 peer_wire_stats: RwLock<HashMap<String, PeerWireStats>>,
592 pending_response_sends: Mutex<Vec<PendingResponseSend>>,
594 response_scheduler_running: AtomicBool,
596 next_response_job_id: AtomicU64,
598 routing: MeshRoutingConfig,
600 request_timeout: Duration,
602 debug: bool,
604 running: RwLock<bool>,
606}
607
608impl<S, R, F> MeshStoreCore<S, R, F>
609where
610 S: Store + Send + Sync + 'static,
611 R: SignalingTransport + Send + Sync + 'static,
612 F: PeerLinkFactory + Send + Sync + 'static,
613{
614 pub fn new(
616 local_store: Arc<S>,
617 signaling: Arc<MeshRouter<R, F>>,
618 request_timeout: Duration,
619 debug: bool,
620 ) -> Self {
621 Self::new_with_routing(
622 local_store,
623 signaling,
624 request_timeout,
625 debug,
626 Default::default(),
627 )
628 }
629
630 pub fn new_with_routing(
632 local_store: Arc<S>,
633 signaling: Arc<MeshRouter<R, F>>,
634 request_timeout: Duration,
635 debug: bool,
636 routing: MeshRoutingConfig,
637 ) -> Self {
638 let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
639 selector.set_fairness(routing.fairness_enabled);
640 selector.set_cashu_payment_weight(routing.cashu_payment_weight);
641 Self {
642 local_store,
643 signaling,
644 htl_configs: RwLock::new(HashMap::new()),
645 pending_requests: RwLock::new(HashMap::new()),
646 pending_quotes: RwLock::new(HashMap::new()),
647 pending_forward_requests: RwLock::new(HashMap::new()),
648 recent_forward_misses: Mutex::new(TimedSeenSet::new(
649 RECENT_FORWARD_MISS_CAPACITY,
650 Self::recent_forward_miss_ttl(request_timeout),
651 )),
652 issued_quotes: RwLock::new(HashMap::new()),
653 next_quote_id: RwLock::new(1),
654 read_sources: RwLock::new(HashMap::new()),
655 read_source_stats: RwLock::new(HashMap::new()),
656 inflight_source_fetches: Mutex::new(HashMap::new()),
657 peer_selector: RwLock::new(selector),
658 peer_active_requests: RwLock::new(HashMap::new()),
659 peer_wire_stats: RwLock::new(HashMap::new()),
660 pending_response_sends: Mutex::new(Vec::new()),
661 response_scheduler_running: AtomicBool::new(false),
662 next_response_job_id: AtomicU64::new(1),
663 routing,
664 request_timeout,
665 debug,
666 running: RwLock::new(false),
667 }
668 }
669
670 fn recent_forward_miss_ttl(request_timeout: Duration) -> Duration {
671 let ttl_ms = request_timeout
672 .as_millis()
673 .saturating_mul(2)
674 .max(MIN_RECENT_FORWARD_MISS_TTL_MS as u128)
675 .min(u64::MAX as u128) as u64;
676 Duration::from_millis(ttl_ms)
677 }
678
679 pub async fn start(&self) -> Result<(), TransportError> {
681 *self.running.write().await = true;
682
683 self.signaling.send_hello(vec![]).await?;
685
686 Ok(())
687 }
688
689 pub async fn stop(&self) {
691 *self.running.write().await = false;
692 }
693
694 pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
696 let peer_id = msg.peer_id().to_string();
698 {
699 let mut configs = self.htl_configs.write().await;
700 if !configs.contains_key(&peer_id) {
701 configs.insert(peer_id.clone(), PeerHTLConfig::random());
702 }
703 }
704 self.peer_selector.write().await.add_peer(peer_id);
705
706 self.signaling.handle_message(msg).await
707 }
708
709 pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
711 &self.signaling
712 }
713
714 fn response_behavior(&self) -> ResponseBehaviorConfig {
715 self.routing.response_behavior.normalized()
716 }
717
718 async fn record_peer_wire_sent(&self, peer_id: &str, bytes: u64) {
719 if bytes == 0 {
720 return;
721 }
722 let mut stats = self.peer_wire_stats.write().await;
723 let entry = stats.entry(peer_id.to_string()).or_default();
724 entry.bytes_sent = entry.bytes_sent.saturating_add(bytes);
725 }
726
727 async fn record_peer_wire_received(&self, peer_id: &str, bytes: u64) {
728 if bytes == 0 {
729 return;
730 }
731 let mut stats = self.peer_wire_stats.write().await;
732 let entry = stats.entry(peer_id.to_string()).or_default();
733 entry.bytes_received = entry.bytes_received.saturating_add(bytes);
734 }
735
736 fn peer_upload_weight(stats: &PeerWireStats) -> f64 {
737 let raw_ratio = (stats.bytes_received.saturating_add(1024) as f64)
738 / (stats.bytes_sent.saturating_add(1024) as f64);
739 let bounded_ratio = raw_ratio / (1.0 + raw_ratio);
740 0.5 + 1.5 * bounded_ratio
741 }
742
743 fn choose_ready_response_job(
744 ready_jobs: &[(u64, String, usize, Instant, u64)],
745 stats: &HashMap<String, PeerWireStats>,
746 ) -> Option<(u64, f64)> {
747 ready_jobs
748 .iter()
749 .map(|job| {
750 let peer_stats = stats.get(&job.1).cloned().unwrap_or_default();
751 let finish = peer_stats.bandwidth_debt
752 + (job.2 as f64 / Self::peer_upload_weight(&peer_stats));
753 (job.0, job.1.as_str(), job.4, finish)
754 })
755 .min_by(|left, right| {
756 left.3
757 .partial_cmp(&right.3)
758 .unwrap_or(std::cmp::Ordering::Equal)
759 .then_with(|| left.2.cmp(&right.2))
760 .then_with(|| left.1.cmp(right.1))
761 })
762 .map(|choice| (choice.0, choice.3))
763 }
764
765 async fn enqueue_response_send(
766 self: &Arc<Self>,
767 peer_id: String,
768 bytes: Vec<u8>,
769 ready_at: Instant,
770 ) {
771 let job_id = self.next_response_job_id.fetch_add(1, Ordering::Relaxed);
772 {
773 let mut queue = self.pending_response_sends.lock().await;
774 queue.push(PendingResponseSend {
775 job_id,
776 peer_id,
777 bytes,
778 ready_at,
779 queue_sequence: job_id,
780 });
781 }
782
783 if self
784 .response_scheduler_running
785 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
786 .is_ok()
787 {
788 let this = Arc::clone(self);
789 tokio::spawn(async move {
790 this.run_response_scheduler().await;
791 });
792 }
793 }
794
795 async fn run_response_scheduler(self: Arc<Self>) {
796 loop {
797 let snapshot = {
798 let queue = self.pending_response_sends.lock().await;
799 if queue.is_empty() {
800 self.response_scheduler_running
801 .store(false, Ordering::Release);
802 return;
803 }
804 queue
805 .iter()
806 .map(|job| {
807 (
808 job.job_id,
809 job.peer_id.clone(),
810 job.bytes.len(),
811 job.ready_at,
812 job.queue_sequence,
813 )
814 })
815 .collect::<Vec<_>>()
816 };
817
818 let now = Instant::now();
819 let mut earliest_ready_at: Option<Instant> = None;
820 let mut ready_jobs = Vec::new();
821 for job in &snapshot {
822 if job.3 <= now {
823 ready_jobs.push(job.clone());
824 } else {
825 earliest_ready_at = Some(match earliest_ready_at {
826 Some(current) => current.min(job.3),
827 None => job.3,
828 });
829 }
830 }
831
832 if ready_jobs.is_empty() {
833 if let Some(ready_at) = earliest_ready_at {
834 tokio::time::sleep(ready_at.saturating_duration_since(Instant::now())).await;
835 continue;
836 }
837 self.response_scheduler_running
838 .store(false, Ordering::Release);
839 return;
840 }
841
842 let (selected_job_id, selected_finish) = {
843 let stats = self.peer_wire_stats.read().await;
844 Self::choose_ready_response_job(&ready_jobs, &stats).expect("ready response job")
845 };
846
847 let selected = {
848 let mut queue = self.pending_response_sends.lock().await;
849 let Some(index) = queue.iter().position(|job| job.job_id == selected_job_id) else {
850 continue;
851 };
852 queue.swap_remove(index)
853 };
854
855 let sent = if let Some(channel) = self.signaling.get_channel(&selected.peer_id).await {
856 channel.send(selected.bytes.clone()).await.is_ok()
857 } else {
858 false
859 };
860
861 let queued_peers = {
862 let queue = self.pending_response_sends.lock().await;
863 queue
864 .iter()
865 .map(|job| job.peer_id.clone())
866 .collect::<HashSet<_>>()
867 };
868 let mut stats = self.peer_wire_stats.write().await;
869 let entry = stats.entry(selected.peer_id.clone()).or_default();
870 if sent {
871 entry.bytes_sent = entry.bytes_sent.saturating_add(selected.bytes.len() as u64);
872 entry.bandwidth_debt = selected_finish;
873 }
874 if queued_peers.is_empty() {
875 for peer_stats in stats.values_mut() {
876 peer_stats.bandwidth_debt = 0.0;
877 }
878 } else {
879 let floor = queued_peers
880 .iter()
881 .filter_map(|peer_id| stats.get(peer_id).map(|peer| peer.bandwidth_debt))
882 .fold(f64::INFINITY, f64::min);
883 if floor.is_finite() && floor > 0.0 {
884 for peer_id in queued_peers {
885 if let Some(peer_stats) = stats.get_mut(&peer_id) {
886 peer_stats.bandwidth_debt =
887 (peer_stats.bandwidth_debt - floor).max(0.0);
888 }
889 }
890 }
891 }
892 }
893 }
894
895 fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
896 let mut hasher = DefaultHasher::new();
897 peer_id.hash(&mut hasher);
898 hash.hash(&mut hasher);
899 salt.hash(&mut hasher);
900 let v = hasher.finish();
901 (v as f64) / (u64::MAX as f64)
902 }
903
904 fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
905 Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
906 }
907
908 fn peer_metadata_pointer_slot_hash() -> Hash {
909 hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
910 }
911
912 fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
913 let bytes = hex::decode(hash_hex)
914 .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
915 if bytes.len() != 32 {
916 return Err(StoreError::Other(format!(
917 "Invalid hash length {}, expected 32 bytes",
918 bytes.len()
919 )));
920 }
921 let mut hash = [0u8; 32];
922 hash.copy_from_slice(&bytes);
923 Ok(hash)
924 }
925
926 fn should_drop_response(&self, hash: &Hash) -> bool {
927 let p = self.response_behavior().drop_response_prob;
928 if p <= 0.0 {
929 return false;
930 }
931 self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
932 }
933
934 fn should_corrupt_response(&self, hash: &Hash) -> bool {
935 let p = self.response_behavior().corrupt_response_prob;
936 if p <= 0.0 {
937 return false;
938 }
939 self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
940 }
941
942 fn should_stall_response(&self, hash: &Hash) -> bool {
943 let p = self.response_behavior().stall_response_prob;
944 if p <= 0.0 {
945 return false;
946 }
947 self.deterministic_actor_draw(hash, 0x5A_11_5A_11_5A_11_5A_11) < p
948 }
949
950 fn response_send_delay(&self, hash: &Hash, payload_len: usize) -> Duration {
951 let behavior = self.response_behavior();
952 let mut total_ms = behavior
953 .extra_delay_ms
954 .saturating_add(behavior.first_byte_delay_ms);
955
956 if behavior.bytes_per_second > 0 && payload_len > 0 {
957 let throughput_ms = ((payload_len as u128) * 1000)
958 .div_ceil(behavior.bytes_per_second as u128)
959 .min(u64::MAX as u128) as u64;
960 total_ms = total_ms.saturating_add(throughput_ms);
961 }
962
963 if behavior.stall_delay_ms > 0 && self.should_stall_response(hash) {
964 total_ms = total_ms.saturating_add(behavior.stall_delay_ms);
965 }
966
967 Duration::from_millis(total_ms)
968 }
969
970 async fn ordered_connected_peers(&self, exclude_peer_id: Option<&str>) -> Vec<String> {
971 let current_peer_ids = self.signaling.peer_ids().await;
972 if current_peer_ids.is_empty() {
973 return Vec::new();
974 }
975
976 sync_selector_peers(&self.peer_selector, ¤t_peer_ids).await;
977 let hash_get_peer_ids: HashSet<String> = self
978 .signaling
979 .hash_get_peer_ids()
980 .await
981 .into_iter()
982 .collect();
983 let mut candidate_peer_ids: Vec<String> = current_peer_ids
984 .into_iter()
985 .filter(|peer_id| hash_get_peer_ids.contains(peer_id))
986 .filter(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude))
987 .collect();
988 if candidate_peer_ids.is_empty() {
989 return Vec::new();
990 }
991
992 let current_set: HashSet<&str> = candidate_peer_ids.iter().map(String::as_str).collect();
993 let mut selector = self.peer_selector.write().await;
994 let mut selector_order = selector.select_peers();
995 selector_order.retain(|peer_id| current_set.contains(peer_id.as_str()));
996 if selector_order.is_empty() {
997 let mut fallback = candidate_peer_ids;
998 fallback.sort();
999 return fallback;
1000 }
1001 let backed_off: HashMap<String, bool> = candidate_peer_ids
1002 .iter()
1003 .map(|peer_id| (peer_id.clone(), selector.is_peer_backed_off(peer_id)))
1004 .collect();
1005 drop(selector);
1006
1007 let rank: HashMap<&str, usize> = selector_order
1008 .iter()
1009 .enumerate()
1010 .map(|(idx, peer_id)| (peer_id.as_str(), idx))
1011 .collect();
1012 let active = self.peer_active_requests.read().await;
1013 candidate_peer_ids.sort_by(|left, right| {
1014 let left_backed_off = backed_off.get(left).copied().unwrap_or(false);
1015 let right_backed_off = backed_off.get(right).copied().unwrap_or(false);
1016 if left_backed_off != right_backed_off {
1017 return if left_backed_off {
1018 std::cmp::Ordering::Greater
1019 } else {
1020 std::cmp::Ordering::Less
1021 };
1022 }
1023 let left_rank = rank.get(left.as_str()).copied().unwrap_or(usize::MAX / 2);
1024 let right_rank = rank.get(right.as_str()).copied().unwrap_or(usize::MAX / 2);
1025 let left_load = active.get(left).copied().unwrap_or(0);
1026 let right_load = active.get(right).copied().unwrap_or(0);
1027 (left_rank + left_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY))
1028 .cmp(&(right_rank + right_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY)))
1029 .then_with(|| left.cmp(right))
1030 });
1031 candidate_peer_ids
1032 }
1033
1034 async fn reserve_peer_request(&self, peer_id: &str) {
1035 let mut active = self.peer_active_requests.write().await;
1036 *active.entry(peer_id.to_string()).or_insert(0) += 1;
1037 }
1038
1039 async fn release_peer_request(&self, peer_id: &str) {
1040 let mut active = self.peer_active_requests.write().await;
1041 let Some(count) = active.get_mut(peer_id) else {
1042 return;
1043 };
1044 if *count <= 1 {
1045 active.remove(peer_id);
1046 } else {
1047 *count -= 1;
1048 }
1049 }
1050
1051 async fn release_queried_peer_requests(&self, peer_ids: &[String]) {
1052 for peer_id in peer_ids {
1053 self.release_peer_request(peer_id).await;
1054 }
1055 }
1056
1057 fn requested_quote_mint(&self) -> Option<&str> {
1058 if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
1059 if self.routing.cashu_accepted_mints.is_empty()
1060 || self
1061 .routing
1062 .cashu_accepted_mints
1063 .iter()
1064 .any(|mint| mint == default_mint)
1065 {
1066 return Some(default_mint);
1067 }
1068 }
1069
1070 self.routing
1071 .cashu_accepted_mints
1072 .first()
1073 .map(String::as_str)
1074 }
1075
1076 fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
1077 if let Some(requested_mint) = requested_mint {
1078 if self.accepts_quote_mint(Some(requested_mint)) {
1079 return Some(requested_mint.to_string());
1080 }
1081 }
1082 if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
1083 return Some(default_mint.clone());
1084 }
1085 if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
1086 return Some(first_mint.clone());
1087 }
1088 requested_mint.map(str::to_string)
1089 }
1090
1091 fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
1092 if self.routing.cashu_accepted_mints.is_empty() {
1093 return true;
1094 }
1095
1096 let Some(mint_url) = mint_url else {
1097 return false;
1098 };
1099 self.routing
1100 .cashu_accepted_mints
1101 .iter()
1102 .any(|mint| mint == mint_url)
1103 }
1104
1105 fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
1106 let Some(mint_url) = mint_url else {
1107 return self.routing.cashu_default_mint.is_none()
1108 && self.routing.cashu_accepted_mints.is_empty();
1109 };
1110 self.routing.cashu_default_mint.as_deref() == Some(mint_url)
1111 || self
1112 .routing
1113 .cashu_accepted_mints
1114 .iter()
1115 .any(|mint| mint == mint_url)
1116 }
1117
1118 async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
1119 let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
1120 if base == 0 {
1121 return 0;
1122 }
1123
1124 let selector = self.peer_selector.read().await;
1125 let Some(stats) = selector.get_stats(peer_id) else {
1126 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
1127 return if max_cap > 0 { base.min(max_cap) } else { base };
1128 };
1129
1130 if stats.cashu_payment_defaults > 0
1131 && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
1132 {
1133 return 0;
1134 }
1135
1136 let success_bonus = stats
1137 .successes
1138 .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
1139 let receipt_bonus = stats
1140 .cashu_payment_receipts
1141 .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
1142 let mut cap = base
1143 .saturating_add(success_bonus)
1144 .saturating_add(receipt_bonus);
1145 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
1146 if max_cap > 0 {
1147 cap = cap.min(max_cap);
1148 }
1149 cap
1150 }
1151
1152 async fn should_accept_quote_response(
1153 &self,
1154 from_peer: &str,
1155 preferred_mint_url: Option<&str>,
1156 offered_payment_sat: u64,
1157 res: &DataQuoteResponse,
1158 ) -> bool {
1159 let Some(payment_sat) = res.p else {
1160 return false;
1161 };
1162 if payment_sat > offered_payment_sat {
1163 return false;
1164 }
1165
1166 let response_mint = res.m.as_deref();
1167 if response_mint == preferred_mint_url {
1168 return true;
1169 }
1170 if self.trusts_quote_mint(response_mint) {
1171 return true;
1172 }
1173 if response_mint.is_none() {
1174 return false;
1175 }
1176
1177 payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
1178 }
1179
1180 async fn issue_quote(
1181 &self,
1182 peer_id: &str,
1183 hash_key: &str,
1184 payment_sat: u64,
1185 ttl_ms: u32,
1186 mint_url: Option<&str>,
1187 ) -> u64 {
1188 let quote_id = {
1189 let mut next = self.next_quote_id.write().await;
1190 let quote_id = *next;
1191 *next = next.saturating_add(1);
1192 quote_id
1193 };
1194
1195 let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
1196 self.issued_quotes.write().await.insert(
1197 (peer_id.to_string(), hash_key.to_string(), quote_id),
1198 IssuedQuote {
1199 expires_at,
1200 payment_sat,
1201 mint_url: mint_url.map(str::to_string),
1202 },
1203 );
1204 quote_id
1205 }
1206
1207 async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
1208 let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
1209 let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
1210 return false;
1211 };
1212 quote.expires_at > Instant::now()
1213 }
1214
1215 async fn send_request_to_peer(
1216 &self,
1217 peer_id: &str,
1218 hash: &Hash,
1219 request_htl: u8,
1220 quote_id: Option<u64>,
1221 ) -> bool {
1222 if !should_forward_htl(request_htl) {
1223 return false;
1224 }
1225
1226 let channel = match self.signaling.get_channel(peer_id).await {
1227 Some(c) => c,
1228 None => return false,
1229 };
1230
1231 let htl_config = {
1232 let configs = self.htl_configs.read().await;
1233 configs
1234 .get(peer_id)
1235 .cloned()
1236 .unwrap_or_else(PeerHTLConfig::random)
1237 };
1238
1239 let send_htl = htl_config.decrement(request_htl);
1240 let req = match quote_id {
1241 Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
1242 None => create_request(hash, send_htl),
1243 };
1244 let request_bytes = encode_request(&req);
1245 let request_len = request_bytes.len() as u64;
1246
1247 {
1248 let mut selector = self.peer_selector.write().await;
1249 selector.record_request(peer_id, request_len);
1250 }
1251
1252 match channel.send(request_bytes).await {
1253 Ok(()) => {
1254 self.record_peer_wire_sent(peer_id, request_len).await;
1255 true
1256 }
1257 Err(_) => {
1258 self.peer_selector.write().await.record_failure(peer_id);
1259 false
1260 }
1261 }
1262 }
1263
1264 async fn send_quote_request_to_peer(
1265 &self,
1266 peer_id: &str,
1267 hash: &Hash,
1268 payment_sat: u64,
1269 ttl_ms: u32,
1270 mint_url: Option<&str>,
1271 ) -> bool {
1272 let channel = match self.signaling.get_channel(peer_id).await {
1273 Some(c) => c,
1274 None => return false,
1275 };
1276
1277 let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
1278 let request_bytes = encode_quote_request(&req);
1279 let request_len = request_bytes.len() as u64;
1280
1281 match channel.send(request_bytes).await {
1282 Ok(()) => {
1283 self.record_peer_wire_sent(peer_id, request_len).await;
1284 true
1285 }
1286 Err(_) => false,
1287 }
1288 }
1289
1290 pub async fn set_read_sources(&self, sources: Vec<Arc<dyn MeshReadSource>>) {
1291 let mut by_id = HashMap::new();
1292 let mut stats = self.read_source_stats.write().await;
1293 for source in sources {
1294 let source_id = source.id().to_string();
1295 by_id.insert(source_id.clone(), source);
1296 stats
1297 .entry(source_id)
1298 .or_insert_with(AdaptiveSourceStats::default);
1299 }
1300 *self.read_sources.write().await = by_id;
1301 }
1302
1303 async fn record_read_source_request(&self, source_id: &str) {
1304 let mut stats = self.read_source_stats.write().await;
1305 stats
1306 .entry(source_id.to_string())
1307 .or_insert_with(AdaptiveSourceStats::default)
1308 .requests += 1;
1309 }
1310
1311 async fn record_read_source_miss(&self, source_id: &str) {
1312 let mut stats = self.read_source_stats.write().await;
1313 stats
1314 .entry(source_id.to_string())
1315 .or_insert_with(AdaptiveSourceStats::default)
1316 .misses += 1;
1317 }
1318
1319 async fn record_read_source_success(&self, source_id: &str, elapsed_ms: u64) {
1320 let now = Instant::now();
1321 let mut stats = self.read_source_stats.write().await;
1322 let stats = stats
1323 .entry(source_id.to_string())
1324 .or_insert_with(AdaptiveSourceStats::default);
1325 stats.successes += 1;
1326 stats.last_success_at = Some(now);
1327 stats.backoff_level = 0;
1328 stats.backed_off_until = None;
1329 if stats.srtt_ms <= 0.0 {
1330 stats.srtt_ms = elapsed_ms as f64;
1331 stats.rttvar_ms = elapsed_ms as f64 / 2.0;
1332 return;
1333 }
1334 let elapsed = elapsed_ms as f64;
1335 stats.rttvar_ms = 0.75 * stats.rttvar_ms + 0.25 * (stats.srtt_ms - elapsed).abs();
1336 stats.srtt_ms = 0.875 * stats.srtt_ms + 0.125 * elapsed;
1337 }
1338
1339 async fn record_read_source_failure(&self, source_id: &str) {
1340 let now = Instant::now();
1341 let mut stats = self.read_source_stats.write().await;
1342 let stats = stats
1343 .entry(source_id.to_string())
1344 .or_insert_with(AdaptiveSourceStats::default);
1345 stats.failures += 1;
1346 stats.last_failure_at = Some(now);
1347 Self::apply_source_backoff(stats, now);
1348 }
1349
1350 async fn record_read_source_timeout(&self, source_id: &str) {
1351 let now = Instant::now();
1352 let mut stats = self.read_source_stats.write().await;
1353 let stats = stats
1354 .entry(source_id.to_string())
1355 .or_insert_with(AdaptiveSourceStats::default);
1356 stats.timeouts += 1;
1357 stats.last_failure_at = Some(now);
1358 Self::apply_source_backoff(stats, now);
1359 }
1360
1361 fn apply_source_backoff(stats: &mut AdaptiveSourceStats, now: Instant) {
1362 stats.backoff_level = stats.backoff_level.saturating_add(1);
1363 let backoff_ms = (INITIAL_SOURCE_BACKOFF_MS
1364 .saturating_mul(2u64.saturating_pow(stats.backoff_level.saturating_sub(1))))
1365 .min(MAX_SOURCE_BACKOFF_MS);
1366 stats.backed_off_until = Some(now + Duration::from_millis(backoff_ms));
1367 }
1368
1369 async fn ordered_read_sources(&self) -> Vec<Arc<dyn MeshReadSource>> {
1370 let sources = self.read_sources.read().await;
1371 if sources.is_empty() {
1372 return Vec::new();
1373 }
1374
1375 let mut available: Vec<Arc<dyn MeshReadSource>> = sources
1376 .values()
1377 .filter(|source| source.is_available())
1378 .cloned()
1379 .collect();
1380 if available.is_empty() {
1381 return Vec::new();
1382 }
1383
1384 let now = Instant::now();
1385 let stats = self.read_source_stats.read().await;
1386 let mut healthy: Vec<Arc<dyn MeshReadSource>> = available
1387 .iter()
1388 .filter(|source| {
1389 stats
1390 .get(source.id())
1391 .and_then(|s| s.backed_off_until)
1392 .is_none_or(|until| until <= now)
1393 })
1394 .cloned()
1395 .collect();
1396 if !healthy.is_empty() {
1397 available = std::mem::take(&mut healthy);
1398 }
1399
1400 available.sort_by(|left, right| {
1401 let left_stats = stats.get(left.id()).cloned().unwrap_or_default();
1402 let right_stats = stats.get(right.id()).cloned().unwrap_or_default();
1403 adaptive_source_score(&right_stats, now)
1404 .partial_cmp(&adaptive_source_score(&left_stats, now))
1405 .unwrap_or(std::cmp::Ordering::Equal)
1406 .then_with(|| left.id().cmp(right.id()))
1407 });
1408 available
1409 }
1410
1411 async fn should_probe_multiple_read_sources(
1412 &self,
1413 ordered_sources: &[Arc<dyn MeshReadSource>],
1414 ) -> bool {
1415 if ordered_sources.len() <= 1 {
1416 return false;
1417 }
1418 let stats = self.read_source_stats.read().await;
1419 let best = stats
1420 .get(ordered_sources[0].id())
1421 .cloned()
1422 .unwrap_or_default();
1423 let second = stats
1424 .get(ordered_sources[1].id())
1425 .cloned()
1426 .unwrap_or_default();
1427 if !source_has_history(&best) || !source_has_history(&second) {
1428 return false;
1429 }
1430 let now = Instant::now();
1431 adaptive_source_score(&best, now) - adaptive_source_score(&second, now)
1432 < SOURCE_SCORE_TIE_DELTA
1433 }
1434
1435 async fn source_dispatch_for(&self, source_count: usize) -> RequestDispatchConfig {
1436 if source_count == 0 {
1437 return self.routing.dispatch;
1438 }
1439 let ordered_sources = self.ordered_read_sources().await;
1440 let probe_multiple = self
1441 .should_probe_multiple_read_sources(&ordered_sources)
1442 .await;
1443 let initial_fanout = if probe_multiple {
1444 source_count.min(2)
1445 } else {
1446 1
1447 };
1448 RequestDispatchConfig {
1449 initial_fanout,
1450 hedge_fanout: self.routing.dispatch.hedge_fanout,
1451 max_fanout: self.routing.dispatch.max_fanout.min(source_count),
1452 hedge_interval_ms: self.routing.dispatch.hedge_interval_ms,
1453 }
1454 }
1455
1456 pub async fn peer_count(&self) -> usize {
1458 self.signaling.peer_count().await
1459 }
1460
1461 pub async fn needs_peers(&self) -> bool {
1463 self.signaling.needs_peers().await
1464 }
1465
1466 pub async fn send_hello(&self) -> Result<(), TransportError> {
1468 self.signaling.send_hello(vec![]).await
1469 }
1470
1471 pub async fn drain_available_data_messages(self: &Arc<Self>) -> DataPumpStats {
1476 let mut stats = DataPumpStats::default();
1477 let peer_ids = self.signaling.peer_ids().await;
1478 for peer_id in peer_ids {
1479 let Some(channel) = self.signaling.get_channel(&peer_id).await else {
1480 continue;
1481 };
1482
1483 while let Some(data) = channel.try_recv() {
1484 stats.processed += 1;
1485 stats.processed_bytes += data.len() as u64;
1486 if let Some(msg) = parse_message(&data) {
1487 match msg {
1488 DataMessage::Request(_) => stats.request_messages += 1,
1489 DataMessage::Response(_) => stats.response_messages += 1,
1490 DataMessage::QuoteRequest(_) => stats.quote_request_messages += 1,
1491 DataMessage::QuoteResponse(_) => stats.quote_response_messages += 1,
1492 DataMessage::Payment(_)
1493 | DataMessage::PaymentAck(_)
1494 | DataMessage::Chunk(_)
1495 | DataMessage::PeerHints(_) => {}
1496 }
1497 }
1498 self.handle_data_message(&peer_id, &data).await;
1499 }
1500 }
1501 stats
1502 }
1503
1504 pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
1506 self.peer_selector
1507 .write()
1508 .await
1509 .record_cashu_payment(peer_id, amount_sat);
1510 }
1511
1512 pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
1514 self.peer_selector
1515 .write()
1516 .await
1517 .record_cashu_receipt(peer_id, amount_sat);
1518 }
1519
1520 pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
1522 self.peer_selector
1523 .write()
1524 .await
1525 .record_cashu_payment_default(peer_id);
1526 }
1527
1528 pub async fn selector_summary(&self) -> crate::peer_selector::SelectorSummary {
1530 self.peer_selector.read().await.summary()
1531 }
1532
1533 fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
1534 selector.is_peer_blocked_for_payment_defaults(
1535 peer_id,
1536 self.routing.cashu_payment_default_block_threshold,
1537 )
1538 }
1539
1540 pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
1542 self.peer_selector
1543 .read()
1544 .await
1545 .export_peer_metadata_snapshot()
1546 }
1547
1548 pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
1553 let snapshot = self
1554 .peer_selector
1555 .read()
1556 .await
1557 .export_peer_metadata_snapshot();
1558 let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
1559 StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
1560 })?;
1561 let snapshot_hash = hashtree_core::sha256(&bytes);
1562 let _ = self.local_store.put(snapshot_hash, bytes).await?;
1563
1564 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
1565 let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
1566 let _ = self.local_store.delete(&pointer_slot).await?;
1567 let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
1568
1569 Ok(snapshot_hash)
1570 }
1571
1572 pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
1574 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
1575 let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
1576 return Ok(false);
1577 };
1578 let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
1579 StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
1580 })?;
1581 let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
1582
1583 let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
1584 return Ok(false);
1585 };
1586 let snapshot: PeerMetadataSnapshot =
1587 serde_json::from_slice(&snapshot_bytes).map_err(|e| {
1588 StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
1589 })?;
1590 self.peer_selector
1591 .write()
1592 .await
1593 .import_peer_metadata_snapshot(&snapshot);
1594 Ok(true)
1595 }
1596
1597 pub async fn get_with_quote(
1602 &self,
1603 hash: &Hash,
1604 payment_sat: u64,
1605 quote_ttl: Duration,
1606 ) -> Result<Option<Vec<u8>>, StoreError> {
1607 if let Some(data) = self.local_store.get(hash).await? {
1608 return Ok(Some(data));
1609 }
1610 Ok(self
1611 .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
1612 .await)
1613 }
1614
1615 async fn request_from_peers_with_quote(
1616 &self,
1617 hash: &Hash,
1618 payment_sat: u64,
1619 quote_ttl: Duration,
1620 ) -> Option<Vec<u8>> {
1621 let ordered_peer_ids = self.ordered_connected_peers(None).await;
1622 if ordered_peer_ids.is_empty() {
1623 return None;
1624 }
1625
1626 if let Some(quote) = self
1627 .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
1628 .await
1629 {
1630 if let Some(data) = self
1631 .request_from_single_peer(hash, "e.peer_id, MAX_HTL, Some(quote.quote_id))
1632 .await
1633 {
1634 return Some(data);
1635 }
1636 }
1637
1638 self.request_from_mesh(hash).await
1639 }
1640
1641 async fn request_quote_from_peers(
1642 &self,
1643 hash: &Hash,
1644 payment_sat: u64,
1645 quote_ttl: Duration,
1646 ordered_peer_ids: &[String],
1647 ) -> Option<NegotiatedQuote> {
1648 if ordered_peer_ids.is_empty() {
1649 return None;
1650 }
1651 let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
1652 if ttl_ms == 0 {
1653 return None;
1654 }
1655 let requested_mint = self.requested_quote_mint().map(str::to_string);
1656
1657 let hash_key = hash_to_key(hash);
1658 let (tx, rx) = oneshot::channel();
1659 self.pending_quotes.write().await.insert(
1660 hash_key.clone(),
1661 PendingQuoteRequest {
1662 response_tx: tx,
1663 preferred_mint_url: requested_mint.clone(),
1664 offered_payment_sat: payment_sat,
1665 },
1666 );
1667
1668 let rx = Arc::new(Mutex::new(rx));
1669 let result = run_hedged_waves(
1670 ordered_peer_ids.len(),
1671 self.routing.dispatch,
1672 self.request_timeout,
1673 |range| {
1674 let wave_peer_ids = ordered_peer_ids[range].to_vec();
1675 let requested_mint = requested_mint.clone();
1676 let hash = *hash;
1677 async move {
1678 let mut sent = 0usize;
1679 for peer_id in wave_peer_ids {
1680 if self
1681 .send_quote_request_to_peer(
1682 &peer_id,
1683 &hash,
1684 payment_sat,
1685 ttl_ms,
1686 requested_mint.as_deref(),
1687 )
1688 .await
1689 {
1690 sent += 1;
1691 }
1692 }
1693 sent
1694 }
1695 },
1696 |wait| {
1697 let rx = rx.clone();
1698 async move {
1699 let mut rx = rx.lock().await;
1700 match tokio::time::timeout(wait, &mut *rx).await {
1701 Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
1702 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1703 Err(_) => HedgedWaveAction::Continue,
1704 }
1705 }
1706 },
1707 )
1708 .await;
1709 let _ = self.pending_quotes.write().await.remove(&hash_key);
1710 result
1711 }
1712
1713 async fn request_from_single_peer(
1714 &self,
1715 hash: &Hash,
1716 peer_id: &str,
1717 request_htl: u8,
1718 quote_id: Option<u64>,
1719 ) -> Option<Vec<u8>> {
1720 let hash_key = hash_to_key(hash);
1721 let (tx, rx) = oneshot::channel();
1722 self.pending_requests.write().await.insert(
1723 hash_key.clone(),
1724 PendingRequest {
1725 response_tx: tx,
1726 started_at: Instant::now(),
1727 queried_peers: vec![peer_id.to_string()],
1728 },
1729 );
1730
1731 let mut rx = rx;
1732 if !self
1733 .send_request_to_peer(peer_id, hash, request_htl, quote_id)
1734 .await
1735 {
1736 let _ = self.pending_requests.write().await.remove(&hash_key);
1737 return None;
1738 }
1739 self.reserve_peer_request(peer_id).await;
1740
1741 if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
1742 if hashtree_core::sha256(&data) == *hash {
1743 let _ = self.local_store.put(*hash, data.clone()).await;
1744 return Some(data);
1745 }
1746 }
1747
1748 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1749 self.release_queried_peer_requests(&pending.queried_peers)
1750 .await;
1751 for peer_id in pending.queried_peers {
1752 self.peer_selector.write().await.record_timeout(&peer_id);
1753 }
1754 }
1755 let _ = self.take_forward_requesters(&hash_key).await;
1756 None
1757 }
1758
1759 async fn request_from_ordered_peers(
1760 &self,
1761 hash: &Hash,
1762 ordered_peer_ids: &[String],
1763 request_htl: u8,
1764 ) -> RouteFetchOutcome {
1765 let hash_key = hash_to_key(hash);
1766 let (tx, rx) = oneshot::channel();
1767 self.pending_requests.write().await.insert(
1768 hash_key.clone(),
1769 PendingRequest {
1770 response_tx: tx,
1771 started_at: Instant::now(),
1772 queried_peers: Vec::new(),
1773 },
1774 );
1775
1776 let rx = Arc::new(Mutex::new(rx));
1777 let result = run_hedged_waves(
1778 ordered_peer_ids.len(),
1779 self.routing.dispatch,
1780 self.request_timeout,
1781 |range| {
1782 let wave_peer_ids = ordered_peer_ids[range].to_vec();
1783 let hash = *hash;
1784 let hash_key = hash_key.clone();
1785 async move {
1786 let mut sent = 0usize;
1787 for peer_id in wave_peer_ids {
1788 if self
1789 .send_request_to_peer(&peer_id, &hash, request_htl, None)
1790 .await
1791 {
1792 sent += 1;
1793 self.reserve_peer_request(&peer_id).await;
1794 if let Some(pending) =
1795 self.pending_requests.write().await.get_mut(&hash_key)
1796 {
1797 pending.queried_peers.push(peer_id);
1798 }
1799 }
1800 }
1801 sent
1802 }
1803 },
1804 |wait| {
1805 let rx = rx.clone();
1806 async move {
1807 let mut rx = rx.lock().await;
1808 match tokio::time::timeout(wait, &mut *rx).await {
1809 Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == *hash => {
1810 HedgedWaveAction::Success(data)
1811 }
1812 Ok(Ok(Some(_))) => HedgedWaveAction::Continue,
1813 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1814 Err(_) => HedgedWaveAction::Continue,
1815 }
1816 }
1817 },
1818 )
1819 .await;
1820
1821 let Some(data) = result else {
1822 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1823 self.release_queried_peer_requests(&pending.queried_peers)
1824 .await;
1825 for peer_id in pending.queried_peers {
1826 self.peer_selector.write().await.record_timeout(&peer_id);
1827 }
1828 }
1829 let _ = self.take_forward_requesters(&hash_key).await;
1830 return RouteFetchOutcome::Timeout;
1831 };
1832
1833 let _ = self.local_store.put(*hash, data.clone()).await;
1834 RouteFetchOutcome::Hit(data)
1835 }
1836
1837 async fn request_from_read_sources_inner(&self, hash: &Hash) -> RouteFetchOutcome {
1838 let ordered_sources = self.ordered_read_sources().await;
1839 if ordered_sources.is_empty() {
1840 return RouteFetchOutcome::Miss;
1841 }
1842
1843 let dispatch = normalize_dispatch_config(
1844 self.source_dispatch_for(ordered_sources.len()).await,
1845 ordered_sources.len(),
1846 );
1847 let wave_plan = build_hedged_wave_plan(ordered_sources.len(), dispatch);
1848 if wave_plan.is_empty() {
1849 return RouteFetchOutcome::Miss;
1850 }
1851
1852 let deadline = Instant::now() + self.request_timeout;
1853 let mut pending = FuturesUnordered::new();
1854 let mut pending_source_ids = HashSet::new();
1855 let mut saw_timeout = false;
1856 let mut next_source_idx = 0usize;
1857
1858 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
1859 let from = next_source_idx;
1860 let to = (next_source_idx + wave_size).min(ordered_sources.len());
1861 next_source_idx = to;
1862
1863 for source in &ordered_sources[from..to] {
1864 let source = Arc::clone(source);
1865 let source_id = source.id().to_string();
1866 self.record_read_source_request(&source_id).await;
1867 pending_source_ids.insert(source_id.clone());
1868 let hash = *hash;
1869 pending.push(tokio::spawn(async move {
1870 let started_at = Instant::now();
1871 let result = std::panic::AssertUnwindSafe(source.get(&hash))
1872 .catch_unwind()
1873 .await;
1874 match result {
1875 Ok(Some(data)) => SourceFetchOutcome::Hit {
1876 source_id,
1877 data,
1878 elapsed_ms: started_at.elapsed().as_millis().max(1) as u64,
1879 },
1880 Ok(None) => SourceFetchOutcome::Miss { source_id },
1881 Err(_) => SourceFetchOutcome::Failure { source_id },
1882 }
1883 }));
1884 }
1885
1886 let is_last_wave =
1887 wave_idx + 1 == wave_plan.len() || next_source_idx >= ordered_sources.len();
1888 let window_end = if is_last_wave {
1889 deadline
1890 } else {
1891 (Instant::now() + Duration::from_millis(dispatch.hedge_interval_ms)).min(deadline)
1892 };
1893
1894 while Instant::now() < window_end {
1895 let remaining = window_end.saturating_duration_since(Instant::now());
1896 let Some(result) = tokio::time::timeout(remaining, pending.next())
1897 .await
1898 .ok()
1899 .flatten()
1900 else {
1901 break;
1902 };
1903 let Ok(outcome) = result else {
1904 continue;
1905 };
1906 match outcome {
1907 SourceFetchOutcome::Hit {
1908 source_id,
1909 data,
1910 elapsed_ms,
1911 } => {
1912 pending_source_ids.remove(&source_id);
1913 self.record_read_source_success(&source_id, elapsed_ms)
1914 .await;
1915 return RouteFetchOutcome::Hit(data);
1916 }
1917 SourceFetchOutcome::Miss { source_id } => {
1918 pending_source_ids.remove(&source_id);
1919 self.record_read_source_miss(&source_id).await;
1920 }
1921 SourceFetchOutcome::Failure { source_id } => {
1922 pending_source_ids.remove(&source_id);
1923 self.record_read_source_failure(&source_id).await;
1924 }
1925 }
1926 }
1927
1928 if Instant::now() >= deadline {
1929 break;
1930 }
1931 }
1932
1933 for source_id in pending_source_ids {
1934 saw_timeout = true;
1935 self.record_read_source_timeout(&source_id).await;
1936 }
1937 if saw_timeout {
1938 RouteFetchOutcome::Timeout
1939 } else {
1940 RouteFetchOutcome::Miss
1941 }
1942 }
1943
1944 async fn request_from_read_sources(&self, hash: &Hash) -> RouteFetchOutcome {
1945 let hash_key = hash_to_key(hash);
1946 let existing_wait = {
1947 let mut inflight = self.inflight_source_fetches.lock().await;
1948 if let Some(existing) = inflight.get_mut(&hash_key) {
1949 let (tx, rx) = oneshot::channel();
1950 existing.waiters.push(tx);
1951 Some(rx)
1952 } else {
1953 inflight.insert(
1954 hash_key.clone(),
1955 InflightSourceFetch {
1956 waiters: Vec::new(),
1957 },
1958 );
1959 None
1960 }
1961 };
1962
1963 if let Some(wait) = existing_wait {
1964 return wait.await.unwrap_or(RouteFetchOutcome::Timeout);
1965 }
1966
1967 let result = self.request_from_read_sources_inner(hash).await;
1968 if let RouteFetchOutcome::Hit(hit) = &result {
1969 let _ = self.local_store.put(*hash, hit.clone()).await;
1970 }
1971 self.complete_inflight_source_fetch(&hash_key, result.clone())
1972 .await;
1973
1974 result
1975 }
1976
1977 async fn complete_inflight_source_fetch(&self, hash_key: &str, result: RouteFetchOutcome) {
1978 let waiters = self
1979 .inflight_source_fetches
1980 .lock()
1981 .await
1982 .remove(hash_key)
1983 .map(|inflight| inflight.waiters)
1984 .unwrap_or_default();
1985 for waiter in waiters {
1986 let _ = waiter.send(result.clone());
1987 }
1988 }
1989
1990 async fn cancel_pending_peer_route(&self, hash: &Hash) {
1991 let hash_key = hash_to_key(hash);
1992 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1993 self.release_queried_peer_requests(&pending.queried_peers)
1994 .await;
1995 }
1996 }
1997
1998 async fn cancel_losing_route(&self, hash: &Hash, route: &ReadRoute, winner_data: &[u8]) {
1999 match route {
2000 ReadRoute::Peers(_) => self.cancel_pending_peer_route(hash).await,
2001 ReadRoute::Sources => {
2002 let hash_key = hash_to_key(hash);
2003 self.complete_inflight_source_fetch(
2004 &hash_key,
2005 RouteFetchOutcome::Hit(winner_data.to_vec()),
2006 )
2007 .await;
2008 }
2009 }
2010 }
2011
2012 async fn ranked_read_routes(&self, context: &MeshReadContext) -> Vec<RankedReadRoute> {
2013 let mut routes = Vec::new();
2014 let ordered_peers = if should_forward_htl(context.request_htl) {
2015 self.ordered_connected_peers(context.exclude_peer_id.as_deref())
2016 .await
2017 } else {
2018 Vec::new()
2019 };
2020 if !ordered_peers.is_empty() {
2021 let best_peer_id = ordered_peers[0].clone();
2022 let selector = self.peer_selector.read().await;
2023 let best_peer = selector.get_stats(&best_peer_id).cloned();
2024 let now = Instant::now();
2025 let (score, has_history) = match best_peer.as_ref() {
2026 Some(stats) => (
2027 peer_endpoint_score(stats, now),
2028 peer_endpoint_has_history(stats),
2029 ),
2030 None => (0.0, false),
2031 };
2032 routes.push(RankedReadRoute {
2033 route: ReadRoute::Peers(ordered_peers),
2034 best_endpoint_id: format!("peer:{best_peer_id}"),
2035 score,
2036 has_history,
2037 });
2038 }
2039 let ordered_sources = self.ordered_read_sources().await;
2040 if let Some(best_source) = ordered_sources.first() {
2041 let stats = self.read_source_stats.read().await;
2042 let best_source_stats = stats.get(best_source.id()).cloned().unwrap_or_default();
2043 let now = Instant::now();
2044 routes.push(RankedReadRoute {
2045 route: ReadRoute::Sources,
2046 best_endpoint_id: format!("source:{}", best_source.id()),
2047 score: adaptive_source_score(&best_source_stats, now),
2048 has_history: source_has_history(&best_source_stats),
2049 });
2050 }
2051 if routes.len() <= 1 {
2052 return routes;
2053 }
2054
2055 routes.sort_by(|left, right| {
2056 right
2057 .score
2058 .partial_cmp(&left.score)
2059 .unwrap_or(std::cmp::Ordering::Equal)
2060 .then_with(|| ranked_route_kind(&left.route).cmp(&ranked_route_kind(&right.route)))
2061 .then_with(|| left.best_endpoint_id.cmp(&right.best_endpoint_id))
2062 .then_with(|| left.route.id().cmp(right.route.id()))
2063 });
2064 routes
2065 }
2066
2067 fn should_probe_multiple_routes(&self, routes: &[RankedReadRoute]) -> bool {
2068 if routes.len() <= 1 {
2069 return false;
2070 }
2071 if !routes[0].has_history || !routes[1].has_history {
2072 return false;
2073 }
2074 (routes[0].score - routes[1].score) < SOURCE_SCORE_TIE_DELTA
2075 }
2076
2077 async fn run_read_route(
2078 &self,
2079 hash: &Hash,
2080 route: &ReadRoute,
2081 context: &MeshReadContext,
2082 ) -> RouteFetchOutcome {
2083 match route {
2084 ReadRoute::Peers(peer_ids) => {
2085 self.request_from_ordered_peers(hash, peer_ids, context.request_htl)
2086 .await
2087 }
2088 ReadRoute::Sources => self.request_from_read_sources(hash).await,
2089 }
2090 }
2091
2092 async fn request_from_mesh_with_context(
2093 &self,
2094 hash: &Hash,
2095 context: &MeshReadContext,
2096 ) -> Option<Vec<u8>> {
2097 let routes = self.ranked_read_routes(context).await;
2098 match routes.as_slice() {
2099 [] => None,
2100 [ranked] => match self.run_read_route(hash, &ranked.route, context).await {
2101 RouteFetchOutcome::Hit(data) => Some(data),
2102 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
2103 },
2104 [first, second, ..] => {
2105 if self.should_probe_multiple_routes(&routes) {
2106 let first_fut = self.run_read_route(hash, &first.route, context);
2107 let second_fut = self.run_read_route(hash, &second.route, context);
2108 tokio::pin!(first_fut);
2109 tokio::pin!(second_fut);
2110 let mut first_done = false;
2111 let mut second_done = false;
2112 loop {
2113 tokio::select! {
2114 result = &mut first_fut, if !first_done => {
2115 first_done = true;
2116 if let RouteFetchOutcome::Hit(data) = result {
2117 if !second_done {
2118 self.cancel_losing_route(hash, &second.route, &data).await;
2119 }
2120 return Some(data);
2121 }
2122 }
2123 result = &mut second_fut, if !second_done => {
2124 second_done = true;
2125 if let RouteFetchOutcome::Hit(data) = result {
2126 if !first_done {
2127 self.cancel_losing_route(hash, &first.route, &data).await;
2128 }
2129 return Some(data);
2130 }
2131 }
2132 else => break,
2133 }
2134 if first_done && second_done {
2135 break;
2136 }
2137 }
2138 None
2139 } else {
2140 match self.run_read_route(hash, &first.route, context).await {
2141 RouteFetchOutcome::Hit(data) => return Some(data),
2142 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
2143 }
2144 for ranked in routes.iter().skip(1) {
2145 match self.run_read_route(hash, &ranked.route, context).await {
2146 RouteFetchOutcome::Hit(data) => return Some(data),
2147 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
2148 }
2149 }
2150 None
2151 }
2152 }
2153 }
2154 }
2155
2156 async fn request_from_mesh(&self, hash: &Hash) -> Option<Vec<u8>> {
2157 self.request_from_mesh_with_context(hash, &MeshReadContext::default())
2158 .await
2159 }
2160
2161 async fn begin_forward_request(&self, hash_key: &str, requester_id: &str) -> bool {
2162 let mut pending = self.pending_forward_requests.write().await;
2163 if let Some(existing) = pending.get_mut(hash_key) {
2164 existing.requester_ids.insert(requester_id.to_string());
2165 return false;
2166 }
2167
2168 let mut requester_ids = HashSet::new();
2169 requester_ids.insert(requester_id.to_string());
2170 pending.insert(
2171 hash_key.to_string(),
2172 PendingForwardRequest { requester_ids },
2173 );
2174 true
2175 }
2176
2177 async fn was_recent_forward_miss(&self, hash_key: &str) -> bool {
2178 self.recent_forward_misses.lock().await.contains(hash_key)
2179 }
2180
2181 async fn mark_recent_forward_miss(&self, hash_key: &str) {
2182 let _ = self
2183 .recent_forward_misses
2184 .lock()
2185 .await
2186 .insert_if_new(hash_key.to_string());
2187 }
2188
2189 async fn take_forward_requesters(&self, hash_key: &str) -> Vec<String> {
2190 self.pending_forward_requests
2191 .write()
2192 .await
2193 .remove(hash_key)
2194 .map(|pending| pending.requester_ids.into_iter().collect())
2195 .unwrap_or_default()
2196 }
2197
2198 async fn complete_pending_response(
2199 self: &Arc<Self>,
2200 from_peer: &str,
2201 hash: &Hash,
2202 hash_key: String,
2203 payload: Vec<u8>,
2204 ) {
2205 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2206 self.release_queried_peer_requests(&pending.queried_peers)
2207 .await;
2208 let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
2209 self.peer_selector.write().await.record_success(
2210 from_peer,
2211 rtt_ms,
2212 payload.len() as u64,
2213 );
2214 let forward_requesters = self.take_forward_requesters(&hash_key).await;
2215 let response_bytes = if forward_requesters.is_empty() {
2216 None
2217 } else {
2218 Some(encode_response(&create_response(hash, payload.clone())))
2219 };
2220 let _ = pending.response_tx.send(Some(payload));
2221 if let Some(response_bytes) = response_bytes {
2222 for requester_id in forward_requesters {
2223 Arc::clone(self)
2224 .enqueue_response_send(requester_id, response_bytes.clone(), Instant::now())
2225 .await;
2226 }
2227 }
2228 }
2229 }
2230
2231 async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
2232 if !res.a {
2233 return;
2234 }
2235
2236 let Some(quote_id) = res.q else {
2237 return;
2238 };
2239
2240 let hash_key = hash_to_key(&res.h);
2241 let (preferred_mint_url, offered_payment_sat) = {
2242 let pending_quotes = self.pending_quotes.read().await;
2243 let Some(pending) = pending_quotes.get(&hash_key) else {
2244 return;
2245 };
2246 (
2247 pending.preferred_mint_url.clone(),
2248 pending.offered_payment_sat,
2249 )
2250 };
2251 if !self
2252 .should_accept_quote_response(
2253 from_peer,
2254 preferred_mint_url.as_deref(),
2255 offered_payment_sat,
2256 &res,
2257 )
2258 .await
2259 {
2260 return;
2261 }
2262 let mut pending_quotes = self.pending_quotes.write().await;
2263 if let Some(pending) = pending_quotes.remove(&hash_key) {
2264 let _ = pending.response_tx.send(Some(NegotiatedQuote {
2265 peer_id: from_peer.to_string(),
2266 quote_id,
2267 mint_url: res.m,
2268 }));
2269 }
2270 }
2271
2272 async fn handle_response_message(
2273 self: &Arc<Self>,
2274 from_peer: &str,
2275 res: crate::protocol::DataResponse,
2276 ) {
2277 let hash_key = hash_to_key(&res.h);
2278 let hash = match crate::protocol::bytes_to_hash(&res.h) {
2279 Some(h) => h,
2280 None => return,
2281 };
2282
2283 if hashtree_core::sha256(&res.d) != hash {
2285 self.peer_selector.write().await.record_failure(from_peer);
2286 if self.debug {
2287 println!("[MeshStoreCore] Ignoring invalid response payload for {hash_key}");
2288 }
2289 return;
2290 }
2291
2292 self.complete_pending_response(from_peer, &hash, hash_key, res.d)
2293 .await;
2294 }
2295
2296 async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
2297 let hash = match crate::protocol::bytes_to_hash(&req.h) {
2298 Some(h) => h,
2299 None => return,
2300 };
2301 let hash_key = hash_to_key(&hash);
2302
2303 {
2304 let selector = self.peer_selector.read().await;
2305 if self.should_refuse_requests_from_peer(&selector, from_peer) {
2306 if self.debug {
2307 println!(
2308 "[MeshStoreCore] Refusing quote request from delinquent peer {}",
2309 from_peer
2310 );
2311 }
2312 return;
2313 }
2314 }
2315
2316 let chosen_mint = self.choose_quote_mint(req.m.as_deref());
2317 let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
2318 && !self.should_drop_response(&hash)
2319 && !self.should_corrupt_response(&hash);
2320
2321 let res = if can_serve {
2322 let quote_id = self
2323 .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
2324 .await;
2325 create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
2326 } else {
2327 create_quote_response_unavailable(&hash)
2328 };
2329 let response_bytes = encode_quote_response(&res);
2330 if let Some(channel) = self.signaling.get_channel(from_peer).await {
2331 if channel.send(response_bytes.clone()).await.is_ok() {
2332 self.record_peer_wire_sent(from_peer, response_bytes.len() as u64)
2333 .await;
2334 }
2335 }
2336 }
2337
2338 async fn handle_request_message(
2339 self: &Arc<Self>,
2340 from_peer: &str,
2341 req: crate::protocol::DataRequest,
2342 ) {
2343 let hash = match crate::protocol::bytes_to_hash(&req.h) {
2344 Some(h) => h,
2345 None => return,
2346 };
2347 let hash_key = hash_to_key(&hash);
2348
2349 if let Some(quote_id) = req.q {
2350 if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
2351 if self.debug {
2352 println!(
2353 "[MeshStoreCore] Refusing request with invalid or expired quote {} from {}",
2354 quote_id, from_peer
2355 );
2356 }
2357 return;
2358 }
2359 }
2360
2361 let allow_peer_forwarding = {
2362 let selector = self.peer_selector.read().await;
2363 !self.should_refuse_requests_from_peer(&selector, from_peer)
2364 };
2365
2366 if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
2368 if self.should_drop_response(&hash) {
2369 if self.debug {
2370 println!(
2371 "[MeshStoreCore] Dropping response for {} due to actor profile",
2372 hash_to_key(&hash)
2373 );
2374 }
2375 return;
2376 }
2377
2378 let response_delay = self.response_send_delay(&hash, data.len());
2379 if self.should_corrupt_response(&hash) {
2380 if data.is_empty() {
2381 data.push(0x80);
2382 } else {
2383 data[0] ^= 0x80;
2384 }
2385 }
2386
2387 let res = create_response(&hash, data);
2389 let response_bytes = encode_response(&res);
2390 let ready_at = Instant::now() + response_delay;
2391 Arc::clone(self)
2392 .enqueue_response_send(from_peer.to_string(), response_bytes, ready_at)
2393 .await;
2394 return;
2395 }
2396
2397 if self.pending_requests.read().await.contains_key(&hash_key) {
2398 let _ = self.begin_forward_request(&hash_key, from_peer).await;
2399 return;
2400 }
2401
2402 if self.was_recent_forward_miss(&hash_key).await {
2403 if self.debug {
2404 println!(
2405 "[MeshStoreCore] Suppressing recently missed forwarded request for {}",
2406 hash_key
2407 );
2408 }
2409 return;
2410 }
2411
2412 if !self.begin_forward_request(&hash_key, from_peer).await {
2413 return;
2414 }
2415
2416 let from_peer = from_peer.to_string();
2417 let this = Arc::clone(self);
2418 let request_htl = req.htl;
2419 tokio::spawn(async move {
2420 let result = if allow_peer_forwarding {
2421 let context = MeshReadContext {
2422 exclude_peer_id: Some(from_peer.clone()),
2423 request_htl,
2424 };
2425 this.request_from_mesh_with_context(&hash, &context).await
2426 } else {
2427 if this.debug {
2428 println!(
2429 "[MeshStoreCore] Serving request from delinquent peer {} via read sources only",
2430 from_peer
2431 );
2432 }
2433 match this.request_from_read_sources(&hash).await {
2434 RouteFetchOutcome::Hit(data) => Some(data),
2435 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
2436 }
2437 };
2438 let requester_ids = this.take_forward_requesters(&hash_key).await;
2439 if let Some(data) = result {
2440 let ready_at = Instant::now() + this.response_send_delay(&hash, data.len());
2441 let res = create_response(&hash, data);
2442 let response_bytes = encode_response(&res);
2443 for requester_id in requester_ids {
2444 Arc::clone(&this)
2445 .enqueue_response_send(requester_id, response_bytes.clone(), ready_at)
2446 .await;
2447 }
2448 } else {
2449 this.mark_recent_forward_miss(&hash_key).await;
2450 }
2451 });
2452 }
2453
2454 pub async fn handle_data_message(self: &Arc<Self>, from_peer: &str, data: &[u8]) {
2456 self.record_peer_wire_received(from_peer, data.len() as u64)
2457 .await;
2458 let parsed = match parse_message(data) {
2459 Some(m) => m,
2460 None => return,
2461 };
2462
2463 match parsed {
2464 DataMessage::Request(req) => {
2465 self.handle_request_message(from_peer, req).await;
2466 }
2467 DataMessage::Response(res) => {
2468 self.handle_response_message(from_peer, res).await;
2469 }
2470 DataMessage::QuoteRequest(req) => {
2471 self.handle_quote_request_message(from_peer, req).await;
2472 }
2473 DataMessage::QuoteResponse(res) => {
2474 self.handle_quote_response_message(from_peer, res).await;
2475 }
2476 DataMessage::Payment(_)
2477 | DataMessage::PaymentAck(_)
2478 | DataMessage::Chunk(_)
2479 | DataMessage::PeerHints(_) => {}
2480 }
2481 }
2482}
2483
2484#[async_trait]
2485impl<S, R, F> Store for MeshStoreCore<S, R, F>
2486where
2487 S: Store + Send + Sync + 'static,
2488 R: SignalingTransport + Send + Sync + 'static,
2489 F: PeerLinkFactory + Send + Sync + 'static,
2490{
2491 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2492 self.local_store.put(hash, data).await
2493 }
2494
2495 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2496 if let Some(data) = self.local_store.get(hash).await? {
2498 return Ok(Some(data));
2499 }
2500
2501 Ok(self.request_from_mesh(hash).await)
2503 }
2504
2505 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2506 self.local_store.has(hash).await
2507 }
2508
2509 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2510 self.local_store.delete(hash).await
2511 }
2512}
2513
2514#[cfg(test)]
2515mod tests;
2516
2517pub type SimMeshStore<S> =
2519 MeshStoreCore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
2520
2521pub type ProductionMeshStore<S> =
2523 MeshStoreCore<S, crate::nostr::NostrRelayTransport, crate::real_factory::WebRtcPeerLinkFactory>;