1use async_trait::async_trait;
8use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
9use std::collections::hash_map::DefaultHasher;
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::hash::{Hash as _, Hasher};
13use std::ops::Range;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::{oneshot, Mutex, RwLock};
18
19use hashtree_core::{Hash, Store, StoreError};
20
21use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
22use crate::protocol::{
23 create_quote_request, create_quote_response_available, create_quote_response_unavailable,
24 create_request, create_request_with_quote, create_response, encode_quote_request,
25 encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
26 DataMessage, DataQuoteRequest, DataQuoteResponse,
27};
28use crate::signaling::MeshRouter;
29use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
30use crate::types::{PeerHTLConfig, SignalingMessage, MAX_HTL};
31
32const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
35
36struct PendingRequest {
38 response_tx: oneshot::Sender<Option<Vec<u8>>>,
39 started_at: Instant,
40 queried_peers: Vec<String>,
41}
42
43struct PendingQuoteRequest {
44 response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
45 preferred_mint_url: Option<String>,
46 offered_payment_sat: u64,
47}
48
49struct PendingForwardRequest {
50 requester_ids: HashSet<String>,
51}
52
53#[derive(Debug, Clone, Default)]
54struct PeerWireStats {
55 bytes_sent: u64,
56 bytes_received: u64,
57 bandwidth_debt: f64,
58}
59
60struct PendingResponseSend {
61 job_id: u64,
62 peer_id: String,
63 bytes: Vec<u8>,
64 ready_at: Instant,
65 queue_sequence: u64,
66}
67
68#[async_trait]
69pub trait MeshReadSource: Send + Sync {
70 fn id(&self) -> &str;
71
72 fn is_available(&self) -> bool {
73 true
74 }
75
76 async fn get(&self, hash: &Hash) -> Option<Vec<u8>>;
77}
78
79#[derive(Debug, Clone)]
80struct NegotiatedQuote {
81 peer_id: String,
82 quote_id: u64,
83 #[allow(dead_code)]
84 mint_url: Option<String>,
85}
86
87struct IssuedQuote {
88 expires_at: Instant,
89 #[allow(dead_code)]
90 payment_sat: u64,
91 #[allow(dead_code)]
92 mint_url: Option<String>,
93}
94
95#[derive(Debug, Clone, Default)]
96struct AdaptiveSourceStats {
97 requests: u64,
98 successes: u64,
99 misses: u64,
100 failures: u64,
101 timeouts: u64,
102 srtt_ms: f64,
103 rttvar_ms: f64,
104 backoff_level: u32,
105 backed_off_until: Option<Instant>,
106 last_success_at: Option<Instant>,
107 last_failure_at: Option<Instant>,
108}
109
110#[derive(Debug, Clone)]
111enum RouteFetchOutcome {
112 Hit(Vec<u8>),
113 Miss,
114 Timeout,
115}
116
117struct InflightSourceFetch {
118 waiters: Vec<oneshot::Sender<RouteFetchOutcome>>,
119}
120
121enum SourceFetchOutcome {
122 Hit {
123 source_id: String,
124 data: Vec<u8>,
125 elapsed_ms: u64,
126 },
127 Miss {
128 source_id: String,
129 },
130 Failure {
131 source_id: String,
132 },
133}
134
135const INITIAL_SOURCE_BACKOFF_MS: u64 = 250;
136const MAX_SOURCE_BACKOFF_MS: u64 = 10_000;
137const SOURCE_SCORE_TIE_DELTA: f64 = 0.15;
138const RECENT_SOURCE_SUCCESS_WINDOW: Duration = Duration::from_secs(60);
139const ACTIVE_PEER_REQUEST_RANK_PENALTY: usize = 3;
140
141fn source_reliability_score(stats: &AdaptiveSourceStats) -> f64 {
142 (stats.successes as f64 + 1.0) / (stats.requests as f64 + 2.0)
143}
144
145fn source_latency_score(stats: &AdaptiveSourceStats) -> f64 {
146 if stats.srtt_ms <= 0.0 {
147 return 0.5;
148 }
149 (500.0 / (stats.srtt_ms + 50.0)).min(1.0)
150}
151
152fn source_has_history(stats: &AdaptiveSourceStats) -> bool {
153 stats.requests > 0
154 || stats.successes > 0
155 || stats.misses > 0
156 || stats.failures > 0
157 || stats.timeouts > 0
158}
159
160fn adaptive_source_score(stats: &AdaptiveSourceStats, now: Instant) -> f64 {
161 if let Some(backed_off_until) = stats.backed_off_until {
162 if backed_off_until > now {
163 return f64::NEG_INFINITY;
164 }
165 }
166
167 let miss_penalty = if stats.requests > 0 {
168 (stats.misses as f64 / stats.requests as f64) * 0.15
169 } else {
170 0.0
171 };
172 let failure_penalty = if stats.requests > 0 {
173 ((stats.failures + stats.timeouts) as f64 / stats.requests as f64) * 0.3
174 } else {
175 0.0
176 };
177 let recency_bonus = if stats
178 .last_success_at
179 .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
180 {
181 0.1
182 } else {
183 0.0
184 };
185
186 0.6 * source_reliability_score(stats) + 0.3 * source_latency_score(stats) + recency_bonus
187 - miss_penalty
188 - failure_penalty
189}
190
191#[derive(Clone)]
192enum ReadRoute {
193 Peers(Vec<String>),
194 Sources,
195}
196
197impl ReadRoute {
198 fn id(&self) -> &'static str {
199 match self {
200 Self::Peers(_) => "peers",
201 Self::Sources => "sources",
202 }
203 }
204}
205
206#[derive(Debug, Clone)]
207struct MeshReadContext {
208 exclude_peer_id: Option<String>,
209 request_htl: u8,
210}
211
212impl Default for MeshReadContext {
213 fn default() -> Self {
214 Self {
215 exclude_peer_id: None,
216 request_htl: MAX_HTL,
217 }
218 }
219}
220
221#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
223pub struct DataPumpStats {
224 pub processed: usize,
225 pub request_messages: usize,
226 pub response_messages: usize,
227 pub quote_request_messages: u64,
228 pub quote_response_messages: u64,
229 pub processed_bytes: u64,
230}
231
232#[derive(Debug, Clone, Copy)]
238pub struct RequestDispatchConfig {
239 pub initial_fanout: usize,
241 pub hedge_fanout: usize,
243 pub max_fanout: usize,
245 pub hedge_interval_ms: u64,
247}
248
249impl Default for RequestDispatchConfig {
250 fn default() -> Self {
251 Self {
252 initial_fanout: usize::MAX,
253 hedge_fanout: usize::MAX,
254 max_fanout: usize::MAX,
255 hedge_interval_ms: 0,
256 }
257 }
258}
259
260pub fn normalize_dispatch_config(
262 dispatch: RequestDispatchConfig,
263 available_peers: usize,
264) -> RequestDispatchConfig {
265 let mut cfg = dispatch;
266 let cap = if cfg.max_fanout == 0 {
267 available_peers
268 } else {
269 cfg.max_fanout.min(available_peers)
270 };
271 cfg.max_fanout = cap;
272 cfg.initial_fanout = if cfg.initial_fanout == 0 {
273 1
274 } else {
275 cfg.initial_fanout.min(cap.max(1))
276 };
277 cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
278 1
279 } else {
280 cfg.hedge_fanout.min(cap.max(1))
281 };
282 cfg
283}
284
285pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
287 if peer_count == 0 {
288 return Vec::new();
289 }
290 let cap = dispatch.max_fanout.min(peer_count);
291 if cap == 0 {
292 return Vec::new();
293 }
294
295 let mut plan = Vec::new();
296 let mut sent = 0usize;
297 let first = dispatch.initial_fanout.min(cap).max(1);
298 plan.push(first);
299 sent += first;
300
301 while sent < cap {
302 let next = dispatch.hedge_fanout.min(cap - sent).max(1);
303 plan.push(next);
304 sent += next;
305 }
306 plan
307}
308
309#[derive(Debug)]
311pub enum HedgedWaveAction<T> {
312 Continue,
313 Success(T),
314 Abort,
315}
316
317pub async fn run_hedged_waves<T, SendWave, SendWaveFut, WaitWave, WaitWaveFut>(
322 peer_count: usize,
323 dispatch: RequestDispatchConfig,
324 request_timeout: Duration,
325 mut send_wave: SendWave,
326 mut wait_wave: WaitWave,
327) -> Option<T>
328where
329 SendWave: FnMut(Range<usize>) -> SendWaveFut,
330 SendWaveFut: Future<Output = usize>,
331 WaitWave: FnMut(Duration) -> WaitWaveFut,
332 WaitWaveFut: Future<Output = HedgedWaveAction<T>>,
333{
334 let dispatch = normalize_dispatch_config(dispatch, peer_count);
335 let wave_plan = build_hedged_wave_plan(peer_count, dispatch);
336 if wave_plan.is_empty() {
337 return None;
338 }
339
340 let deadline = Instant::now() + request_timeout;
341 let mut sent_total = 0usize;
342 let mut next_peer_idx = 0usize;
343
344 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
345 let from = next_peer_idx;
346 let to = (next_peer_idx + wave_size).min(peer_count);
347 next_peer_idx = to;
348
349 if from == to {
350 continue;
351 }
352
353 sent_total += send_wave(from..to).await;
354 if sent_total == 0 {
355 if next_peer_idx >= peer_count {
356 break;
357 }
358 continue;
359 }
360
361 let now = Instant::now();
362 if now >= deadline {
363 break;
364 }
365 let remaining = deadline.saturating_duration_since(now);
366 let is_last_wave = wave_idx + 1 == wave_plan.len() || next_peer_idx >= peer_count;
367 let wait = if is_last_wave {
368 remaining
369 } else if dispatch.hedge_interval_ms == 0 {
370 Duration::ZERO
371 } else {
372 Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
373 };
374
375 if wait.is_zero() {
376 continue;
377 }
378
379 match wait_wave(wait).await {
380 HedgedWaveAction::Continue => {}
381 HedgedWaveAction::Success(value) => return Some(value),
382 HedgedWaveAction::Abort => break,
383 }
384 }
385
386 None
387}
388
389pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
391 let mut selector = selector.write().await;
392 let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
393 let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
394 for peer_id in known {
395 if !current.contains(peer_id.as_str()) {
396 selector.remove_peer(&peer_id);
397 }
398 }
399 for peer_id in current_peer_ids {
400 selector.add_peer(peer_id.clone());
401 }
402}
403
404#[derive(Debug, Clone, Copy)]
408pub struct ResponseBehaviorConfig {
409 pub drop_response_prob: f64,
411 pub corrupt_response_prob: f64,
413 pub extra_delay_ms: u64,
415 pub first_byte_delay_ms: u64,
417 pub bytes_per_second: u64,
419 pub stall_response_prob: f64,
421 pub stall_delay_ms: u64,
423}
424
425impl Default for ResponseBehaviorConfig {
426 fn default() -> Self {
427 Self {
428 drop_response_prob: 0.0,
429 corrupt_response_prob: 0.0,
430 extra_delay_ms: 0,
431 first_byte_delay_ms: 0,
432 bytes_per_second: 0,
433 stall_response_prob: 0.0,
434 stall_delay_ms: 0,
435 }
436 }
437}
438
439impl ResponseBehaviorConfig {
440 fn normalized(self) -> Self {
441 Self {
442 drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
443 corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
444 extra_delay_ms: self.extra_delay_ms,
445 first_byte_delay_ms: self.first_byte_delay_ms,
446 bytes_per_second: self.bytes_per_second,
447 stall_response_prob: self.stall_response_prob.clamp(0.0, 1.0),
448 stall_delay_ms: self.stall_delay_ms,
449 }
450 }
451}
452
453#[derive(Debug, Clone)]
455pub struct MeshRoutingConfig {
456 pub selection_strategy: SelectionStrategy,
457 pub fairness_enabled: bool,
458 pub cashu_payment_weight: f64,
460 pub cashu_payment_default_block_threshold: u64,
463 pub cashu_accepted_mints: Vec<String>,
465 pub cashu_default_mint: Option<String>,
467 pub cashu_peer_suggested_mint_base_cap_sat: u64,
469 pub cashu_peer_suggested_mint_success_step_sat: u64,
471 pub cashu_peer_suggested_mint_receipt_step_sat: u64,
473 pub cashu_peer_suggested_mint_max_cap_sat: u64,
475 pub dispatch: RequestDispatchConfig,
476 pub response_behavior: ResponseBehaviorConfig,
477}
478
479impl Default for MeshRoutingConfig {
480 fn default() -> Self {
481 Self {
482 selection_strategy: SelectionStrategy::Weighted,
483 fairness_enabled: true,
484 cashu_payment_weight: 0.0,
485 cashu_payment_default_block_threshold: 0,
486 cashu_accepted_mints: Vec::new(),
487 cashu_default_mint: None,
488 cashu_peer_suggested_mint_base_cap_sat: 0,
489 cashu_peer_suggested_mint_success_step_sat: 0,
490 cashu_peer_suggested_mint_receipt_step_sat: 0,
491 cashu_peer_suggested_mint_max_cap_sat: 0,
492 dispatch: RequestDispatchConfig::default(),
493 response_behavior: ResponseBehaviorConfig::default(),
494 }
495 }
496}
497
498pub struct MeshStoreCore<S, R, F>
505where
506 S: Store + Send + Sync + 'static,
507 R: SignalingTransport + Send + Sync + 'static,
508 F: PeerLinkFactory + Send + Sync + 'static,
509{
510 local_store: Arc<S>,
512 signaling: Arc<MeshRouter<R, F>>,
514 htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
516 pending_requests: RwLock<HashMap<String, PendingRequest>>,
518 pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
520 pending_forward_requests: RwLock<HashMap<String, PendingForwardRequest>>,
522 issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
524 next_quote_id: RwLock<u64>,
526 read_sources: RwLock<HashMap<String, Arc<dyn MeshReadSource>>>,
528 read_source_stats: RwLock<HashMap<String, AdaptiveSourceStats>>,
530 inflight_source_fetches: Mutex<HashMap<String, InflightSourceFetch>>,
532 read_route_stats: RwLock<HashMap<String, AdaptiveSourceStats>>,
534 peer_selector: RwLock<PeerSelector>,
536 peer_active_requests: RwLock<HashMap<String, usize>>,
538 peer_wire_stats: RwLock<HashMap<String, PeerWireStats>>,
540 pending_response_sends: Mutex<Vec<PendingResponseSend>>,
542 response_scheduler_running: AtomicBool,
544 next_response_job_id: AtomicU64,
546 routing: MeshRoutingConfig,
548 request_timeout: Duration,
550 debug: bool,
552 running: RwLock<bool>,
554}
555
556impl<S, R, F> MeshStoreCore<S, R, F>
557where
558 S: Store + Send + Sync + 'static,
559 R: SignalingTransport + Send + Sync + 'static,
560 F: PeerLinkFactory + Send + Sync + 'static,
561{
562 pub fn new(
564 local_store: Arc<S>,
565 signaling: Arc<MeshRouter<R, F>>,
566 request_timeout: Duration,
567 debug: bool,
568 ) -> Self {
569 Self::new_with_routing(
570 local_store,
571 signaling,
572 request_timeout,
573 debug,
574 Default::default(),
575 )
576 }
577
578 pub fn new_with_routing(
580 local_store: Arc<S>,
581 signaling: Arc<MeshRouter<R, F>>,
582 request_timeout: Duration,
583 debug: bool,
584 routing: MeshRoutingConfig,
585 ) -> Self {
586 let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
587 selector.set_fairness(routing.fairness_enabled);
588 selector.set_cashu_payment_weight(routing.cashu_payment_weight);
589 Self {
590 local_store,
591 signaling,
592 htl_configs: RwLock::new(HashMap::new()),
593 pending_requests: RwLock::new(HashMap::new()),
594 pending_quotes: RwLock::new(HashMap::new()),
595 pending_forward_requests: RwLock::new(HashMap::new()),
596 issued_quotes: RwLock::new(HashMap::new()),
597 next_quote_id: RwLock::new(1),
598 read_sources: RwLock::new(HashMap::new()),
599 read_source_stats: RwLock::new(HashMap::new()),
600 inflight_source_fetches: Mutex::new(HashMap::new()),
601 read_route_stats: RwLock::new(HashMap::new()),
602 peer_selector: RwLock::new(selector),
603 peer_active_requests: RwLock::new(HashMap::new()),
604 peer_wire_stats: RwLock::new(HashMap::new()),
605 pending_response_sends: Mutex::new(Vec::new()),
606 response_scheduler_running: AtomicBool::new(false),
607 next_response_job_id: AtomicU64::new(1),
608 routing,
609 request_timeout,
610 debug,
611 running: RwLock::new(false),
612 }
613 }
614
615 pub async fn start(&self) -> Result<(), TransportError> {
617 *self.running.write().await = true;
618
619 self.signaling.send_hello(vec![]).await?;
621
622 Ok(())
623 }
624
625 pub async fn stop(&self) {
627 *self.running.write().await = false;
628 }
629
630 pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
632 let peer_id = msg.peer_id().to_string();
634 {
635 let mut configs = self.htl_configs.write().await;
636 if !configs.contains_key(&peer_id) {
637 configs.insert(peer_id.clone(), PeerHTLConfig::random());
638 }
639 }
640 self.peer_selector.write().await.add_peer(peer_id);
641
642 self.signaling.handle_message(msg).await
643 }
644
645 pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
647 &self.signaling
648 }
649
650 fn response_behavior(&self) -> ResponseBehaviorConfig {
651 self.routing.response_behavior.normalized()
652 }
653
654 async fn record_peer_wire_sent(&self, peer_id: &str, bytes: u64) {
655 if bytes == 0 {
656 return;
657 }
658 let mut stats = self.peer_wire_stats.write().await;
659 let entry = stats.entry(peer_id.to_string()).or_default();
660 entry.bytes_sent = entry.bytes_sent.saturating_add(bytes);
661 }
662
663 async fn record_peer_wire_received(&self, peer_id: &str, bytes: u64) {
664 if bytes == 0 {
665 return;
666 }
667 let mut stats = self.peer_wire_stats.write().await;
668 let entry = stats.entry(peer_id.to_string()).or_default();
669 entry.bytes_received = entry.bytes_received.saturating_add(bytes);
670 }
671
672 fn peer_upload_weight(stats: &PeerWireStats) -> f64 {
673 let raw_ratio = (stats.bytes_received.saturating_add(1024) as f64)
674 / (stats.bytes_sent.saturating_add(1024) as f64);
675 let bounded_ratio = raw_ratio / (1.0 + raw_ratio);
676 0.5 + 1.5 * bounded_ratio
677 }
678
679 fn choose_ready_response_job(
680 ready_jobs: &[(u64, String, usize, Instant, u64)],
681 stats: &HashMap<String, PeerWireStats>,
682 ) -> Option<(u64, f64)> {
683 ready_jobs
684 .iter()
685 .map(|job| {
686 let peer_stats = stats.get(&job.1).cloned().unwrap_or_default();
687 let finish = peer_stats.bandwidth_debt
688 + (job.2 as f64 / Self::peer_upload_weight(&peer_stats));
689 (job.0, job.1.as_str(), job.4, finish)
690 })
691 .min_by(|left, right| {
692 left.3
693 .partial_cmp(&right.3)
694 .unwrap_or(std::cmp::Ordering::Equal)
695 .then_with(|| left.2.cmp(&right.2))
696 .then_with(|| left.1.cmp(right.1))
697 })
698 .map(|choice| (choice.0, choice.3))
699 }
700
701 async fn enqueue_response_send(
702 self: &Arc<Self>,
703 peer_id: String,
704 bytes: Vec<u8>,
705 ready_at: Instant,
706 ) {
707 let job_id = self.next_response_job_id.fetch_add(1, Ordering::Relaxed);
708 {
709 let mut queue = self.pending_response_sends.lock().await;
710 queue.push(PendingResponseSend {
711 job_id,
712 peer_id,
713 bytes,
714 ready_at,
715 queue_sequence: job_id,
716 });
717 }
718
719 if self
720 .response_scheduler_running
721 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
722 .is_ok()
723 {
724 let this = Arc::clone(self);
725 tokio::spawn(async move {
726 this.run_response_scheduler().await;
727 });
728 }
729 }
730
731 async fn run_response_scheduler(self: Arc<Self>) {
732 loop {
733 let snapshot = {
734 let queue = self.pending_response_sends.lock().await;
735 if queue.is_empty() {
736 self.response_scheduler_running
737 .store(false, Ordering::Release);
738 return;
739 }
740 queue
741 .iter()
742 .map(|job| {
743 (
744 job.job_id,
745 job.peer_id.clone(),
746 job.bytes.len(),
747 job.ready_at,
748 job.queue_sequence,
749 )
750 })
751 .collect::<Vec<_>>()
752 };
753
754 let now = Instant::now();
755 let mut earliest_ready_at: Option<Instant> = None;
756 let mut ready_jobs = Vec::new();
757 for job in &snapshot {
758 if job.3 <= now {
759 ready_jobs.push(job.clone());
760 } else {
761 earliest_ready_at = Some(match earliest_ready_at {
762 Some(current) => current.min(job.3),
763 None => job.3,
764 });
765 }
766 }
767
768 if ready_jobs.is_empty() {
769 if let Some(ready_at) = earliest_ready_at {
770 tokio::time::sleep(ready_at.saturating_duration_since(Instant::now())).await;
771 continue;
772 }
773 self.response_scheduler_running
774 .store(false, Ordering::Release);
775 return;
776 }
777
778 let (selected_job_id, selected_finish) = {
779 let stats = self.peer_wire_stats.read().await;
780 Self::choose_ready_response_job(&ready_jobs, &stats).expect("ready response job")
781 };
782
783 let selected = {
784 let mut queue = self.pending_response_sends.lock().await;
785 let Some(index) = queue.iter().position(|job| job.job_id == selected_job_id) else {
786 continue;
787 };
788 queue.swap_remove(index)
789 };
790
791 let sent = if let Some(channel) = self.signaling.get_channel(&selected.peer_id).await {
792 channel.send(selected.bytes.clone()).await.is_ok()
793 } else {
794 false
795 };
796
797 let queued_peers = {
798 let queue = self.pending_response_sends.lock().await;
799 queue
800 .iter()
801 .map(|job| job.peer_id.clone())
802 .collect::<HashSet<_>>()
803 };
804 let mut stats = self.peer_wire_stats.write().await;
805 let entry = stats.entry(selected.peer_id.clone()).or_default();
806 if sent {
807 entry.bytes_sent = entry.bytes_sent.saturating_add(selected.bytes.len() as u64);
808 entry.bandwidth_debt = selected_finish;
809 }
810 if queued_peers.is_empty() {
811 for peer_stats in stats.values_mut() {
812 peer_stats.bandwidth_debt = 0.0;
813 }
814 } else {
815 let floor = queued_peers
816 .iter()
817 .filter_map(|peer_id| stats.get(peer_id).map(|peer| peer.bandwidth_debt))
818 .fold(f64::INFINITY, f64::min);
819 if floor.is_finite() && floor > 0.0 {
820 for peer_id in queued_peers {
821 if let Some(peer_stats) = stats.get_mut(&peer_id) {
822 peer_stats.bandwidth_debt =
823 (peer_stats.bandwidth_debt - floor).max(0.0);
824 }
825 }
826 }
827 }
828 }
829 }
830
831 fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
832 let mut hasher = DefaultHasher::new();
833 peer_id.hash(&mut hasher);
834 hash.hash(&mut hasher);
835 salt.hash(&mut hasher);
836 let v = hasher.finish();
837 (v as f64) / (u64::MAX as f64)
838 }
839
840 fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
841 Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
842 }
843
844 fn peer_metadata_pointer_slot_hash() -> Hash {
845 hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
846 }
847
848 fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
849 let bytes = hex::decode(hash_hex)
850 .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
851 if bytes.len() != 32 {
852 return Err(StoreError::Other(format!(
853 "Invalid hash length {}, expected 32 bytes",
854 bytes.len()
855 )));
856 }
857 let mut hash = [0u8; 32];
858 hash.copy_from_slice(&bytes);
859 Ok(hash)
860 }
861
862 fn should_drop_response(&self, hash: &Hash) -> bool {
863 let p = self.response_behavior().drop_response_prob;
864 if p <= 0.0 {
865 return false;
866 }
867 self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
868 }
869
870 fn should_corrupt_response(&self, hash: &Hash) -> bool {
871 let p = self.response_behavior().corrupt_response_prob;
872 if p <= 0.0 {
873 return false;
874 }
875 self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
876 }
877
878 fn should_stall_response(&self, hash: &Hash) -> bool {
879 let p = self.response_behavior().stall_response_prob;
880 if p <= 0.0 {
881 return false;
882 }
883 self.deterministic_actor_draw(hash, 0x5A_11_5A_11_5A_11_5A_11) < p
884 }
885
886 fn response_send_delay(&self, hash: &Hash, payload_len: usize) -> Duration {
887 let behavior = self.response_behavior();
888 let mut total_ms = behavior
889 .extra_delay_ms
890 .saturating_add(behavior.first_byte_delay_ms);
891
892 if behavior.bytes_per_second > 0 && payload_len > 0 {
893 let throughput_ms = ((payload_len as u128) * 1000)
894 .div_ceil(behavior.bytes_per_second as u128)
895 .min(u64::MAX as u128) as u64;
896 total_ms = total_ms.saturating_add(throughput_ms);
897 }
898
899 if behavior.stall_delay_ms > 0 && self.should_stall_response(hash) {
900 total_ms = total_ms.saturating_add(behavior.stall_delay_ms);
901 }
902
903 Duration::from_millis(total_ms)
904 }
905
906 async fn ordered_connected_peers(&self, exclude_peer_id: Option<&str>) -> Vec<String> {
907 let current_peer_ids = self.signaling.peer_ids().await;
908 if current_peer_ids.is_empty() {
909 return Vec::new();
910 }
911
912 sync_selector_peers(&self.peer_selector, ¤t_peer_ids).await;
913 let mut candidate_peer_ids: Vec<String> = current_peer_ids
914 .into_iter()
915 .filter(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude))
916 .collect();
917 if candidate_peer_ids.is_empty() {
918 return Vec::new();
919 }
920
921 let current_set: HashSet<&str> = candidate_peer_ids.iter().map(String::as_str).collect();
922 let mut selector = self.peer_selector.write().await;
923 let mut selector_order = selector.select_peers();
924 selector_order.retain(|peer_id| current_set.contains(peer_id.as_str()));
925 if selector_order.is_empty() {
926 let mut fallback = candidate_peer_ids;
927 fallback.sort();
928 return fallback;
929 }
930 let backed_off: HashMap<String, bool> = candidate_peer_ids
931 .iter()
932 .map(|peer_id| (peer_id.clone(), selector.is_peer_backed_off(peer_id)))
933 .collect();
934 drop(selector);
935
936 let rank: HashMap<&str, usize> = selector_order
937 .iter()
938 .enumerate()
939 .map(|(idx, peer_id)| (peer_id.as_str(), idx))
940 .collect();
941 let active = self.peer_active_requests.read().await;
942 candidate_peer_ids.sort_by(|left, right| {
943 let left_backed_off = backed_off.get(left).copied().unwrap_or(false);
944 let right_backed_off = backed_off.get(right).copied().unwrap_or(false);
945 if left_backed_off != right_backed_off {
946 return if left_backed_off {
947 std::cmp::Ordering::Greater
948 } else {
949 std::cmp::Ordering::Less
950 };
951 }
952 let left_rank = rank.get(left.as_str()).copied().unwrap_or(usize::MAX / 2);
953 let right_rank = rank.get(right.as_str()).copied().unwrap_or(usize::MAX / 2);
954 let left_load = active.get(left).copied().unwrap_or(0);
955 let right_load = active.get(right).copied().unwrap_or(0);
956 (left_rank + left_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY))
957 .cmp(&(right_rank + right_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY)))
958 .then_with(|| left.cmp(right))
959 });
960 candidate_peer_ids
961 }
962
963 async fn reserve_peer_request(&self, peer_id: &str) {
964 let mut active = self.peer_active_requests.write().await;
965 *active.entry(peer_id.to_string()).or_insert(0) += 1;
966 }
967
968 async fn release_peer_request(&self, peer_id: &str) {
969 let mut active = self.peer_active_requests.write().await;
970 let Some(count) = active.get_mut(peer_id) else {
971 return;
972 };
973 if *count <= 1 {
974 active.remove(peer_id);
975 } else {
976 *count -= 1;
977 }
978 }
979
980 async fn release_queried_peer_requests(&self, peer_ids: &[String]) {
981 for peer_id in peer_ids {
982 self.release_peer_request(peer_id).await;
983 }
984 }
985
986 fn requested_quote_mint(&self) -> Option<&str> {
987 if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
988 if self.routing.cashu_accepted_mints.is_empty()
989 || self
990 .routing
991 .cashu_accepted_mints
992 .iter()
993 .any(|mint| mint == default_mint)
994 {
995 return Some(default_mint);
996 }
997 }
998
999 self.routing
1000 .cashu_accepted_mints
1001 .first()
1002 .map(String::as_str)
1003 }
1004
1005 fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
1006 if let Some(requested_mint) = requested_mint {
1007 if self.accepts_quote_mint(Some(requested_mint)) {
1008 return Some(requested_mint.to_string());
1009 }
1010 }
1011 if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
1012 return Some(default_mint.clone());
1013 }
1014 if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
1015 return Some(first_mint.clone());
1016 }
1017 requested_mint.map(str::to_string)
1018 }
1019
1020 fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
1021 if self.routing.cashu_accepted_mints.is_empty() {
1022 return true;
1023 }
1024
1025 let Some(mint_url) = mint_url else {
1026 return false;
1027 };
1028 self.routing
1029 .cashu_accepted_mints
1030 .iter()
1031 .any(|mint| mint == mint_url)
1032 }
1033
1034 fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
1035 let Some(mint_url) = mint_url else {
1036 return self.routing.cashu_default_mint.is_none()
1037 && self.routing.cashu_accepted_mints.is_empty();
1038 };
1039 self.routing.cashu_default_mint.as_deref() == Some(mint_url)
1040 || self
1041 .routing
1042 .cashu_accepted_mints
1043 .iter()
1044 .any(|mint| mint == mint_url)
1045 }
1046
1047 async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
1048 let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
1049 if base == 0 {
1050 return 0;
1051 }
1052
1053 let selector = self.peer_selector.read().await;
1054 let Some(stats) = selector.get_stats(peer_id) else {
1055 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
1056 return if max_cap > 0 { base.min(max_cap) } else { base };
1057 };
1058
1059 if stats.cashu_payment_defaults > 0
1060 && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
1061 {
1062 return 0;
1063 }
1064
1065 let success_bonus = stats
1066 .successes
1067 .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
1068 let receipt_bonus = stats
1069 .cashu_payment_receipts
1070 .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
1071 let mut cap = base
1072 .saturating_add(success_bonus)
1073 .saturating_add(receipt_bonus);
1074 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
1075 if max_cap > 0 {
1076 cap = cap.min(max_cap);
1077 }
1078 cap
1079 }
1080
1081 async fn should_accept_quote_response(
1082 &self,
1083 from_peer: &str,
1084 preferred_mint_url: Option<&str>,
1085 offered_payment_sat: u64,
1086 res: &DataQuoteResponse,
1087 ) -> bool {
1088 let Some(payment_sat) = res.p else {
1089 return false;
1090 };
1091 if payment_sat > offered_payment_sat {
1092 return false;
1093 }
1094
1095 let response_mint = res.m.as_deref();
1096 if response_mint == preferred_mint_url {
1097 return true;
1098 }
1099 if self.trusts_quote_mint(response_mint) {
1100 return true;
1101 }
1102 if response_mint.is_none() {
1103 return false;
1104 }
1105
1106 payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
1107 }
1108
1109 async fn issue_quote(
1110 &self,
1111 peer_id: &str,
1112 hash_key: &str,
1113 payment_sat: u64,
1114 ttl_ms: u32,
1115 mint_url: Option<&str>,
1116 ) -> u64 {
1117 let quote_id = {
1118 let mut next = self.next_quote_id.write().await;
1119 let quote_id = *next;
1120 *next = next.saturating_add(1);
1121 quote_id
1122 };
1123
1124 let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
1125 self.issued_quotes.write().await.insert(
1126 (peer_id.to_string(), hash_key.to_string(), quote_id),
1127 IssuedQuote {
1128 expires_at,
1129 payment_sat,
1130 mint_url: mint_url.map(str::to_string),
1131 },
1132 );
1133 quote_id
1134 }
1135
1136 async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
1137 let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
1138 let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
1139 return false;
1140 };
1141 quote.expires_at > Instant::now()
1142 }
1143
1144 async fn send_request_to_peer(
1145 &self,
1146 peer_id: &str,
1147 hash: &Hash,
1148 request_htl: u8,
1149 quote_id: Option<u64>,
1150 ) -> bool {
1151 let channel = match self.signaling.get_channel(peer_id).await {
1152 Some(c) => c,
1153 None => return false,
1154 };
1155
1156 let htl_config = {
1157 let configs = self.htl_configs.read().await;
1158 configs
1159 .get(peer_id)
1160 .cloned()
1161 .unwrap_or_else(PeerHTLConfig::random)
1162 };
1163
1164 let send_htl = htl_config.decrement(request_htl);
1165 let req = match quote_id {
1166 Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
1167 None => create_request(hash, send_htl),
1168 };
1169 let request_bytes = encode_request(&req);
1170 let request_len = request_bytes.len() as u64;
1171
1172 {
1173 let mut selector = self.peer_selector.write().await;
1174 selector.record_request(peer_id, request_len);
1175 }
1176
1177 match channel.send(request_bytes).await {
1178 Ok(()) => {
1179 self.record_peer_wire_sent(peer_id, request_len).await;
1180 true
1181 }
1182 Err(_) => {
1183 self.peer_selector.write().await.record_failure(peer_id);
1184 false
1185 }
1186 }
1187 }
1188
1189 async fn send_quote_request_to_peer(
1190 &self,
1191 peer_id: &str,
1192 hash: &Hash,
1193 payment_sat: u64,
1194 ttl_ms: u32,
1195 mint_url: Option<&str>,
1196 ) -> bool {
1197 let channel = match self.signaling.get_channel(peer_id).await {
1198 Some(c) => c,
1199 None => return false,
1200 };
1201
1202 let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
1203 let request_bytes = encode_quote_request(&req);
1204 let request_len = request_bytes.len() as u64;
1205
1206 match channel.send(request_bytes).await {
1207 Ok(()) => {
1208 self.record_peer_wire_sent(peer_id, request_len).await;
1209 true
1210 }
1211 Err(_) => false,
1212 }
1213 }
1214
1215 pub async fn set_read_sources(&self, sources: Vec<Arc<dyn MeshReadSource>>) {
1216 let mut by_id = HashMap::new();
1217 let mut stats = self.read_source_stats.write().await;
1218 for source in sources {
1219 let source_id = source.id().to_string();
1220 by_id.insert(source_id.clone(), source);
1221 stats
1222 .entry(source_id)
1223 .or_insert_with(AdaptiveSourceStats::default);
1224 }
1225 *self.read_sources.write().await = by_id;
1226 }
1227
1228 fn route_stats_for<'a>(
1229 stats: &'a mut HashMap<String, AdaptiveSourceStats>,
1230 route_id: &str,
1231 ) -> &'a mut AdaptiveSourceStats {
1232 stats
1233 .entry(route_id.to_string())
1234 .or_insert_with(AdaptiveSourceStats::default)
1235 }
1236
1237 async fn record_route_request(&self, route_id: &str) {
1238 let mut stats = self.read_route_stats.write().await;
1239 Self::route_stats_for(&mut stats, route_id).requests += 1;
1240 }
1241
1242 async fn record_route_success(&self, route_id: &str, elapsed_ms: u64) {
1243 let now = Instant::now();
1244 let mut stats = self.read_route_stats.write().await;
1245 let stats = Self::route_stats_for(&mut stats, route_id);
1246 stats.successes += 1;
1247 stats.last_success_at = Some(now);
1248 stats.backoff_level = 0;
1249 stats.backed_off_until = None;
1250 if stats.srtt_ms <= 0.0 {
1251 stats.srtt_ms = elapsed_ms as f64;
1252 stats.rttvar_ms = elapsed_ms as f64 / 2.0;
1253 return;
1254 }
1255 let elapsed = elapsed_ms as f64;
1256 stats.rttvar_ms = 0.75 * stats.rttvar_ms + 0.25 * (stats.srtt_ms - elapsed).abs();
1257 stats.srtt_ms = 0.875 * stats.srtt_ms + 0.125 * elapsed;
1258 }
1259
1260 async fn record_route_miss(&self, route_id: &str) {
1261 let mut stats = self.read_route_stats.write().await;
1262 Self::route_stats_for(&mut stats, route_id).misses += 1;
1263 }
1264
1265 async fn record_route_timeout(&self, route_id: &str) {
1266 let mut stats = self.read_route_stats.write().await;
1267 Self::route_stats_for(&mut stats, route_id).timeouts += 1;
1268 }
1269
1270 async fn record_read_source_request(&self, source_id: &str) {
1271 let mut stats = self.read_source_stats.write().await;
1272 stats
1273 .entry(source_id.to_string())
1274 .or_insert_with(AdaptiveSourceStats::default)
1275 .requests += 1;
1276 }
1277
1278 async fn record_read_source_miss(&self, source_id: &str) {
1279 let mut stats = self.read_source_stats.write().await;
1280 stats
1281 .entry(source_id.to_string())
1282 .or_insert_with(AdaptiveSourceStats::default)
1283 .misses += 1;
1284 }
1285
1286 async fn record_read_source_success(&self, source_id: &str, elapsed_ms: u64) {
1287 let now = Instant::now();
1288 let mut stats = self.read_source_stats.write().await;
1289 let stats = stats
1290 .entry(source_id.to_string())
1291 .or_insert_with(AdaptiveSourceStats::default);
1292 stats.successes += 1;
1293 stats.last_success_at = Some(now);
1294 stats.backoff_level = 0;
1295 stats.backed_off_until = None;
1296 if stats.srtt_ms <= 0.0 {
1297 stats.srtt_ms = elapsed_ms as f64;
1298 stats.rttvar_ms = elapsed_ms as f64 / 2.0;
1299 return;
1300 }
1301 let elapsed = elapsed_ms as f64;
1302 stats.rttvar_ms = 0.75 * stats.rttvar_ms + 0.25 * (stats.srtt_ms - elapsed).abs();
1303 stats.srtt_ms = 0.875 * stats.srtt_ms + 0.125 * elapsed;
1304 }
1305
1306 async fn record_read_source_failure(&self, source_id: &str) {
1307 let now = Instant::now();
1308 let mut stats = self.read_source_stats.write().await;
1309 let stats = stats
1310 .entry(source_id.to_string())
1311 .or_insert_with(AdaptiveSourceStats::default);
1312 stats.failures += 1;
1313 stats.last_failure_at = Some(now);
1314 Self::apply_source_backoff(stats, now);
1315 }
1316
1317 async fn record_read_source_timeout(&self, source_id: &str) {
1318 let now = Instant::now();
1319 let mut stats = self.read_source_stats.write().await;
1320 let stats = stats
1321 .entry(source_id.to_string())
1322 .or_insert_with(AdaptiveSourceStats::default);
1323 stats.timeouts += 1;
1324 stats.last_failure_at = Some(now);
1325 Self::apply_source_backoff(stats, now);
1326 }
1327
1328 fn apply_source_backoff(stats: &mut AdaptiveSourceStats, now: Instant) {
1329 stats.backoff_level = stats.backoff_level.saturating_add(1);
1330 let backoff_ms = (INITIAL_SOURCE_BACKOFF_MS
1331 .saturating_mul(2u64.saturating_pow(stats.backoff_level.saturating_sub(1))))
1332 .min(MAX_SOURCE_BACKOFF_MS);
1333 stats.backed_off_until = Some(now + Duration::from_millis(backoff_ms));
1334 }
1335
1336 async fn ordered_read_sources(&self) -> Vec<Arc<dyn MeshReadSource>> {
1337 let sources = self.read_sources.read().await;
1338 if sources.is_empty() {
1339 return Vec::new();
1340 }
1341
1342 let mut available: Vec<Arc<dyn MeshReadSource>> = sources
1343 .values()
1344 .filter(|source| source.is_available())
1345 .cloned()
1346 .collect();
1347 if available.is_empty() {
1348 return Vec::new();
1349 }
1350
1351 let now = Instant::now();
1352 let stats = self.read_source_stats.read().await;
1353 let mut healthy: Vec<Arc<dyn MeshReadSource>> = available
1354 .iter()
1355 .filter(|source| {
1356 stats
1357 .get(source.id())
1358 .and_then(|s| s.backed_off_until)
1359 .is_none_or(|until| until <= now)
1360 })
1361 .cloned()
1362 .collect();
1363 if !healthy.is_empty() {
1364 available = std::mem::take(&mut healthy);
1365 }
1366
1367 available.sort_by(|left, right| {
1368 let left_stats = stats.get(left.id()).cloned().unwrap_or_default();
1369 let right_stats = stats.get(right.id()).cloned().unwrap_or_default();
1370 adaptive_source_score(&right_stats, now)
1371 .partial_cmp(&adaptive_source_score(&left_stats, now))
1372 .unwrap_or(std::cmp::Ordering::Equal)
1373 .then_with(|| left.id().cmp(right.id()))
1374 });
1375 available
1376 }
1377
1378 async fn should_probe_multiple_read_sources(
1379 &self,
1380 ordered_sources: &[Arc<dyn MeshReadSource>],
1381 ) -> bool {
1382 if ordered_sources.len() <= 1 {
1383 return false;
1384 }
1385 let stats = self.read_source_stats.read().await;
1386 let best = stats
1387 .get(ordered_sources[0].id())
1388 .cloned()
1389 .unwrap_or_default();
1390 let second = stats
1391 .get(ordered_sources[1].id())
1392 .cloned()
1393 .unwrap_or_default();
1394 if !source_has_history(&best) || !source_has_history(&second) {
1395 return true;
1396 }
1397 let now = Instant::now();
1398 adaptive_source_score(&best, now) - adaptive_source_score(&second, now)
1399 < SOURCE_SCORE_TIE_DELTA
1400 }
1401
1402 async fn source_dispatch_for(&self, source_count: usize) -> RequestDispatchConfig {
1403 if source_count == 0 {
1404 return self.routing.dispatch;
1405 }
1406 let ordered_sources = self.ordered_read_sources().await;
1407 let probe_multiple = self
1408 .should_probe_multiple_read_sources(&ordered_sources)
1409 .await;
1410 RequestDispatchConfig {
1411 initial_fanout: if probe_multiple {
1412 source_count.min(2)
1413 } else {
1414 1
1415 },
1416 hedge_fanout: self.routing.dispatch.hedge_fanout,
1417 max_fanout: self.routing.dispatch.max_fanout.min(source_count),
1418 hedge_interval_ms: self.routing.dispatch.hedge_interval_ms,
1419 }
1420 }
1421
1422 pub async fn peer_count(&self) -> usize {
1424 self.signaling.peer_count().await
1425 }
1426
1427 pub async fn needs_peers(&self) -> bool {
1429 self.signaling.needs_peers().await
1430 }
1431
1432 pub async fn send_hello(&self) -> Result<(), TransportError> {
1434 self.signaling.send_hello(vec![]).await
1435 }
1436
1437 pub async fn drain_available_data_messages(self: &Arc<Self>) -> DataPumpStats {
1442 let mut stats = DataPumpStats::default();
1443 let peer_ids = self.signaling.peer_ids().await;
1444 for peer_id in peer_ids {
1445 let Some(channel) = self.signaling.get_channel(&peer_id).await else {
1446 continue;
1447 };
1448
1449 while let Some(data) = channel.try_recv() {
1450 stats.processed += 1;
1451 stats.processed_bytes += data.len() as u64;
1452 if let Some(msg) = parse_message(&data) {
1453 match msg {
1454 DataMessage::Request(_) => stats.request_messages += 1,
1455 DataMessage::Response(_) => stats.response_messages += 1,
1456 DataMessage::QuoteRequest(_) => stats.quote_request_messages += 1,
1457 DataMessage::QuoteResponse(_) => stats.quote_response_messages += 1,
1458 DataMessage::Payment(_)
1459 | DataMessage::PaymentAck(_)
1460 | DataMessage::Chunk(_) => {}
1461 }
1462 }
1463 self.handle_data_message(&peer_id, &data).await;
1464 }
1465 }
1466 stats
1467 }
1468
1469 pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
1471 self.peer_selector
1472 .write()
1473 .await
1474 .record_cashu_payment(peer_id, amount_sat);
1475 }
1476
1477 pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
1479 self.peer_selector
1480 .write()
1481 .await
1482 .record_cashu_receipt(peer_id, amount_sat);
1483 }
1484
1485 pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
1487 self.peer_selector
1488 .write()
1489 .await
1490 .record_cashu_payment_default(peer_id);
1491 }
1492
1493 pub async fn selector_summary(&self) -> crate::peer_selector::SelectorSummary {
1495 self.peer_selector.read().await.summary()
1496 }
1497
1498 fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
1499 selector.is_peer_blocked_for_payment_defaults(
1500 peer_id,
1501 self.routing.cashu_payment_default_block_threshold,
1502 )
1503 }
1504
1505 pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
1507 self.peer_selector
1508 .read()
1509 .await
1510 .export_peer_metadata_snapshot()
1511 }
1512
1513 pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
1518 let snapshot = self
1519 .peer_selector
1520 .read()
1521 .await
1522 .export_peer_metadata_snapshot();
1523 let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
1524 StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
1525 })?;
1526 let snapshot_hash = hashtree_core::sha256(&bytes);
1527 let _ = self.local_store.put(snapshot_hash, bytes).await?;
1528
1529 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
1530 let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
1531 let _ = self.local_store.delete(&pointer_slot).await?;
1532 let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
1533
1534 Ok(snapshot_hash)
1535 }
1536
1537 pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
1539 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
1540 let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
1541 return Ok(false);
1542 };
1543 let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
1544 StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
1545 })?;
1546 let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
1547
1548 let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
1549 return Ok(false);
1550 };
1551 let snapshot: PeerMetadataSnapshot =
1552 serde_json::from_slice(&snapshot_bytes).map_err(|e| {
1553 StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
1554 })?;
1555 self.peer_selector
1556 .write()
1557 .await
1558 .import_peer_metadata_snapshot(&snapshot);
1559 Ok(true)
1560 }
1561
1562 pub async fn get_with_quote(
1567 &self,
1568 hash: &Hash,
1569 payment_sat: u64,
1570 quote_ttl: Duration,
1571 ) -> Result<Option<Vec<u8>>, StoreError> {
1572 if let Some(data) = self.local_store.get(hash).await? {
1573 return Ok(Some(data));
1574 }
1575 Ok(self
1576 .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
1577 .await)
1578 }
1579
1580 async fn request_from_peers_with_quote(
1581 &self,
1582 hash: &Hash,
1583 payment_sat: u64,
1584 quote_ttl: Duration,
1585 ) -> Option<Vec<u8>> {
1586 let ordered_peer_ids = self.ordered_connected_peers(None).await;
1587 if ordered_peer_ids.is_empty() {
1588 return None;
1589 }
1590
1591 if let Some(quote) = self
1592 .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
1593 .await
1594 {
1595 if let Some(data) = self
1596 .request_from_single_peer(hash, "e.peer_id, MAX_HTL, Some(quote.quote_id))
1597 .await
1598 {
1599 return Some(data);
1600 }
1601 }
1602
1603 self.request_from_mesh(hash).await
1604 }
1605
1606 async fn request_quote_from_peers(
1607 &self,
1608 hash: &Hash,
1609 payment_sat: u64,
1610 quote_ttl: Duration,
1611 ordered_peer_ids: &[String],
1612 ) -> Option<NegotiatedQuote> {
1613 if ordered_peer_ids.is_empty() {
1614 return None;
1615 }
1616 let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
1617 if ttl_ms == 0 {
1618 return None;
1619 }
1620 let requested_mint = self.requested_quote_mint().map(str::to_string);
1621
1622 let hash_key = hash_to_key(hash);
1623 let (tx, rx) = oneshot::channel();
1624 self.pending_quotes.write().await.insert(
1625 hash_key.clone(),
1626 PendingQuoteRequest {
1627 response_tx: tx,
1628 preferred_mint_url: requested_mint.clone(),
1629 offered_payment_sat: payment_sat,
1630 },
1631 );
1632
1633 let rx = Arc::new(Mutex::new(rx));
1634 let result = run_hedged_waves(
1635 ordered_peer_ids.len(),
1636 self.routing.dispatch,
1637 self.request_timeout,
1638 |range| {
1639 let wave_peer_ids = ordered_peer_ids[range].to_vec();
1640 let requested_mint = requested_mint.clone();
1641 let hash = *hash;
1642 async move {
1643 let mut sent = 0usize;
1644 for peer_id in wave_peer_ids {
1645 if self
1646 .send_quote_request_to_peer(
1647 &peer_id,
1648 &hash,
1649 payment_sat,
1650 ttl_ms,
1651 requested_mint.as_deref(),
1652 )
1653 .await
1654 {
1655 sent += 1;
1656 }
1657 }
1658 sent
1659 }
1660 },
1661 |wait| {
1662 let rx = rx.clone();
1663 async move {
1664 let mut rx = rx.lock().await;
1665 match tokio::time::timeout(wait, &mut *rx).await {
1666 Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
1667 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1668 Err(_) => HedgedWaveAction::Continue,
1669 }
1670 }
1671 },
1672 )
1673 .await;
1674 let _ = self.pending_quotes.write().await.remove(&hash_key);
1675 result
1676 }
1677
1678 async fn request_from_single_peer(
1679 &self,
1680 hash: &Hash,
1681 peer_id: &str,
1682 request_htl: u8,
1683 quote_id: Option<u64>,
1684 ) -> Option<Vec<u8>> {
1685 let hash_key = hash_to_key(hash);
1686 let (tx, rx) = oneshot::channel();
1687 self.pending_requests.write().await.insert(
1688 hash_key.clone(),
1689 PendingRequest {
1690 response_tx: tx,
1691 started_at: Instant::now(),
1692 queried_peers: vec![peer_id.to_string()],
1693 },
1694 );
1695
1696 let mut rx = rx;
1697 if !self
1698 .send_request_to_peer(peer_id, hash, request_htl, quote_id)
1699 .await
1700 {
1701 let _ = self.pending_requests.write().await.remove(&hash_key);
1702 return None;
1703 }
1704 self.reserve_peer_request(peer_id).await;
1705
1706 if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
1707 if hashtree_core::sha256(&data) == *hash {
1708 let _ = self.local_store.put(*hash, data.clone()).await;
1709 return Some(data);
1710 }
1711 }
1712
1713 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1714 self.release_queried_peer_requests(&pending.queried_peers)
1715 .await;
1716 for peer_id in pending.queried_peers {
1717 self.peer_selector.write().await.record_timeout(&peer_id);
1718 }
1719 }
1720 let _ = self.take_forward_requesters(&hash_key).await;
1721 None
1722 }
1723
1724 async fn request_from_ordered_peers(
1725 &self,
1726 hash: &Hash,
1727 ordered_peer_ids: &[String],
1728 request_htl: u8,
1729 ) -> RouteFetchOutcome {
1730 let hash_key = hash_to_key(hash);
1731 let (tx, rx) = oneshot::channel();
1732 self.pending_requests.write().await.insert(
1733 hash_key.clone(),
1734 PendingRequest {
1735 response_tx: tx,
1736 started_at: Instant::now(),
1737 queried_peers: Vec::new(),
1738 },
1739 );
1740
1741 let rx = Arc::new(Mutex::new(rx));
1742 let result = run_hedged_waves(
1743 ordered_peer_ids.len(),
1744 self.routing.dispatch,
1745 self.request_timeout,
1746 |range| {
1747 let wave_peer_ids = ordered_peer_ids[range].to_vec();
1748 let hash = *hash;
1749 let hash_key = hash_key.clone();
1750 async move {
1751 let mut sent = 0usize;
1752 for peer_id in wave_peer_ids {
1753 if self
1754 .send_request_to_peer(&peer_id, &hash, request_htl, None)
1755 .await
1756 {
1757 sent += 1;
1758 self.reserve_peer_request(&peer_id).await;
1759 if let Some(pending) =
1760 self.pending_requests.write().await.get_mut(&hash_key)
1761 {
1762 pending.queried_peers.push(peer_id);
1763 }
1764 }
1765 }
1766 sent
1767 }
1768 },
1769 |wait| {
1770 let rx = rx.clone();
1771 async move {
1772 let mut rx = rx.lock().await;
1773 match tokio::time::timeout(wait, &mut *rx).await {
1774 Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == *hash => {
1775 HedgedWaveAction::Success(data)
1776 }
1777 Ok(Ok(Some(_))) => HedgedWaveAction::Continue,
1778 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1779 Err(_) => HedgedWaveAction::Continue,
1780 }
1781 }
1782 },
1783 )
1784 .await;
1785
1786 let Some(data) = result else {
1787 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1788 self.release_queried_peer_requests(&pending.queried_peers)
1789 .await;
1790 for peer_id in pending.queried_peers {
1791 self.peer_selector.write().await.record_timeout(&peer_id);
1792 }
1793 }
1794 let _ = self.take_forward_requesters(&hash_key).await;
1795 return RouteFetchOutcome::Timeout;
1796 };
1797
1798 let _ = self.local_store.put(*hash, data.clone()).await;
1799 RouteFetchOutcome::Hit(data)
1800 }
1801
1802 async fn request_from_read_sources_inner(&self, hash: &Hash) -> RouteFetchOutcome {
1803 let ordered_sources = self.ordered_read_sources().await;
1804 if ordered_sources.is_empty() {
1805 return RouteFetchOutcome::Miss;
1806 }
1807
1808 let dispatch = normalize_dispatch_config(
1809 self.source_dispatch_for(ordered_sources.len()).await,
1810 ordered_sources.len(),
1811 );
1812 let wave_plan = build_hedged_wave_plan(ordered_sources.len(), dispatch);
1813 if wave_plan.is_empty() {
1814 return RouteFetchOutcome::Miss;
1815 }
1816
1817 let deadline = Instant::now() + self.request_timeout;
1818 let mut pending = FuturesUnordered::new();
1819 let mut pending_source_ids = HashSet::new();
1820 let mut saw_timeout = false;
1821 let mut next_source_idx = 0usize;
1822
1823 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
1824 let from = next_source_idx;
1825 let to = (next_source_idx + wave_size).min(ordered_sources.len());
1826 next_source_idx = to;
1827
1828 for source in ordered_sources[from..to].iter().cloned() {
1829 let source_id = source.id().to_string();
1830 self.record_read_source_request(&source_id).await;
1831 pending_source_ids.insert(source_id.clone());
1832 let hash = *hash;
1833 pending.push(tokio::spawn(async move {
1834 let started_at = Instant::now();
1835 let result = std::panic::AssertUnwindSafe(source.get(&hash))
1836 .catch_unwind()
1837 .await;
1838 match result {
1839 Ok(Some(data)) => SourceFetchOutcome::Hit {
1840 source_id,
1841 data,
1842 elapsed_ms: started_at.elapsed().as_millis().max(1) as u64,
1843 },
1844 Ok(None) => SourceFetchOutcome::Miss { source_id },
1845 Err(_) => SourceFetchOutcome::Failure { source_id },
1846 }
1847 }));
1848 }
1849
1850 let is_last_wave =
1851 wave_idx + 1 == wave_plan.len() || next_source_idx >= ordered_sources.len();
1852 let window_end = if is_last_wave {
1853 deadline
1854 } else {
1855 (Instant::now() + Duration::from_millis(dispatch.hedge_interval_ms)).min(deadline)
1856 };
1857
1858 while Instant::now() < window_end {
1859 let remaining = window_end.saturating_duration_since(Instant::now());
1860 let Some(result) = tokio::time::timeout(remaining, pending.next())
1861 .await
1862 .ok()
1863 .flatten()
1864 else {
1865 break;
1866 };
1867 let Ok(outcome) = result else {
1868 continue;
1869 };
1870 match outcome {
1871 SourceFetchOutcome::Hit {
1872 source_id,
1873 data,
1874 elapsed_ms,
1875 } => {
1876 pending_source_ids.remove(&source_id);
1877 self.record_read_source_success(&source_id, elapsed_ms)
1878 .await;
1879 return RouteFetchOutcome::Hit(data);
1880 }
1881 SourceFetchOutcome::Miss { source_id } => {
1882 pending_source_ids.remove(&source_id);
1883 self.record_read_source_miss(&source_id).await;
1884 }
1885 SourceFetchOutcome::Failure { source_id } => {
1886 pending_source_ids.remove(&source_id);
1887 self.record_read_source_failure(&source_id).await;
1888 }
1889 }
1890 }
1891
1892 if Instant::now() >= deadline {
1893 break;
1894 }
1895 }
1896
1897 for source_id in pending_source_ids {
1898 saw_timeout = true;
1899 self.record_read_source_timeout(&source_id).await;
1900 }
1901 if saw_timeout {
1902 RouteFetchOutcome::Timeout
1903 } else {
1904 RouteFetchOutcome::Miss
1905 }
1906 }
1907
1908 async fn request_from_read_sources(&self, hash: &Hash) -> RouteFetchOutcome {
1909 let hash_key = hash_to_key(hash);
1910 let existing_wait = {
1911 let mut inflight = self.inflight_source_fetches.lock().await;
1912 if let Some(existing) = inflight.get_mut(&hash_key) {
1913 let (tx, rx) = oneshot::channel();
1914 existing.waiters.push(tx);
1915 Some(rx)
1916 } else {
1917 inflight.insert(
1918 hash_key.clone(),
1919 InflightSourceFetch {
1920 waiters: Vec::new(),
1921 },
1922 );
1923 None
1924 }
1925 };
1926
1927 if let Some(wait) = existing_wait {
1928 return wait.await.unwrap_or(RouteFetchOutcome::Timeout);
1929 }
1930
1931 let result = self.request_from_read_sources_inner(hash).await;
1932 if let RouteFetchOutcome::Hit(hit) = &result {
1933 let _ = self.local_store.put(*hash, hit.clone()).await;
1934 }
1935
1936 let waiters = self
1937 .inflight_source_fetches
1938 .lock()
1939 .await
1940 .remove(&hash_key)
1941 .map(|inflight| inflight.waiters)
1942 .unwrap_or_default();
1943 for waiter in waiters {
1944 let _ = waiter.send(result.clone());
1945 }
1946
1947 result
1948 }
1949
1950 async fn available_read_routes(&self, context: &MeshReadContext) -> Vec<ReadRoute> {
1951 let mut routes = Vec::new();
1952 let ordered_peers = self
1953 .ordered_connected_peers(context.exclude_peer_id.as_deref())
1954 .await;
1955 if !ordered_peers.is_empty() {
1956 routes.push(ReadRoute::Peers(ordered_peers));
1957 }
1958 if !self.ordered_read_sources().await.is_empty() {
1959 routes.push(ReadRoute::Sources);
1960 }
1961 if routes.len() <= 1 {
1962 return routes;
1963 }
1964
1965 let now = Instant::now();
1966 let stats = self.read_route_stats.read().await;
1967 routes.sort_by(|left, right| {
1968 let left_stats = stats.get(left.id()).cloned().unwrap_or_default();
1969 let right_stats = stats.get(right.id()).cloned().unwrap_or_default();
1970 adaptive_source_score(&right_stats, now)
1971 .partial_cmp(&adaptive_source_score(&left_stats, now))
1972 .unwrap_or(std::cmp::Ordering::Equal)
1973 .then_with(|| left.id().cmp(right.id()))
1974 });
1975 routes
1976 }
1977
1978 async fn should_probe_multiple_routes(&self, routes: &[ReadRoute]) -> bool {
1979 if routes.len() <= 1 {
1980 return false;
1981 }
1982 let stats = self.read_route_stats.read().await;
1983 let best = stats.get(routes[0].id()).cloned().unwrap_or_default();
1984 let second = stats.get(routes[1].id()).cloned().unwrap_or_default();
1985 if !source_has_history(&best) || !source_has_history(&second) {
1986 return true;
1987 }
1988 let now = Instant::now();
1989 adaptive_source_score(&best, now) - adaptive_source_score(&second, now)
1990 < SOURCE_SCORE_TIE_DELTA
1991 }
1992
1993 async fn run_read_route(
1994 &self,
1995 hash: &Hash,
1996 route: &ReadRoute,
1997 context: &MeshReadContext,
1998 ) -> RouteFetchOutcome {
1999 let route_id = route.id();
2000 self.record_route_request(route_id).await;
2001 let started_at = Instant::now();
2002 let result = match route {
2003 ReadRoute::Peers(peer_ids) => {
2004 self.request_from_ordered_peers(hash, peer_ids, context.request_htl)
2005 .await
2006 }
2007 ReadRoute::Sources => self.request_from_read_sources(hash).await,
2008 };
2009 match &result {
2010 RouteFetchOutcome::Hit(_) => {
2011 self.record_route_success(route_id, started_at.elapsed().as_millis().max(1) as u64)
2012 .await;
2013 }
2014 RouteFetchOutcome::Miss => {
2015 self.record_route_miss(route_id).await;
2016 }
2017 RouteFetchOutcome::Timeout => {
2018 self.record_route_timeout(route_id).await;
2019 }
2020 }
2021 result
2022 }
2023
2024 async fn request_from_mesh_with_context(
2025 &self,
2026 hash: &Hash,
2027 context: &MeshReadContext,
2028 ) -> Option<Vec<u8>> {
2029 let routes = self.available_read_routes(context).await;
2030 match routes.as_slice() {
2031 [] => None,
2032 [route] => match self.run_read_route(hash, route, context).await {
2033 RouteFetchOutcome::Hit(data) => Some(data),
2034 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
2035 },
2036 [first, second, ..] => {
2037 if self.should_probe_multiple_routes(&routes).await {
2038 let first_fut = self.run_read_route(hash, first, context);
2039 let second_fut = self.run_read_route(hash, second, context);
2040 tokio::pin!(first_fut);
2041 tokio::pin!(second_fut);
2042 let mut first_done = false;
2043 let mut second_done = false;
2044 loop {
2045 tokio::select! {
2046 result = &mut first_fut, if !first_done => {
2047 first_done = true;
2048 if let RouteFetchOutcome::Hit(data) = result {
2049 return Some(data);
2050 }
2051 }
2052 result = &mut second_fut, if !second_done => {
2053 second_done = true;
2054 if let RouteFetchOutcome::Hit(data) = result {
2055 return Some(data);
2056 }
2057 }
2058 else => break,
2059 }
2060 if first_done && second_done {
2061 break;
2062 }
2063 }
2064 None
2065 } else {
2066 match self.run_read_route(hash, first, context).await {
2067 RouteFetchOutcome::Hit(data) => return Some(data),
2068 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
2069 }
2070 match self.run_read_route(hash, second, context).await {
2071 RouteFetchOutcome::Hit(data) => Some(data),
2072 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
2073 }
2074 }
2075 }
2076 }
2077 }
2078
2079 async fn request_from_mesh(&self, hash: &Hash) -> Option<Vec<u8>> {
2080 self.request_from_mesh_with_context(hash, &MeshReadContext::default())
2081 .await
2082 }
2083
2084 async fn begin_forward_request(&self, hash_key: &str, requester_id: &str) -> bool {
2085 let mut pending = self.pending_forward_requests.write().await;
2086 if let Some(existing) = pending.get_mut(hash_key) {
2087 existing.requester_ids.insert(requester_id.to_string());
2088 return false;
2089 }
2090
2091 let mut requester_ids = HashSet::new();
2092 requester_ids.insert(requester_id.to_string());
2093 pending.insert(
2094 hash_key.to_string(),
2095 PendingForwardRequest { requester_ids },
2096 );
2097 true
2098 }
2099
2100 async fn take_forward_requesters(&self, hash_key: &str) -> Vec<String> {
2101 self.pending_forward_requests
2102 .write()
2103 .await
2104 .remove(hash_key)
2105 .map(|pending| pending.requester_ids.into_iter().collect())
2106 .unwrap_or_default()
2107 }
2108
2109 async fn complete_pending_response(
2110 self: &Arc<Self>,
2111 from_peer: &str,
2112 hash: &Hash,
2113 hash_key: String,
2114 payload: Vec<u8>,
2115 ) {
2116 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2117 self.release_queried_peer_requests(&pending.queried_peers)
2118 .await;
2119 let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
2120 self.peer_selector.write().await.record_success(
2121 from_peer,
2122 rtt_ms,
2123 payload.len() as u64,
2124 );
2125 let forward_requesters = self.take_forward_requesters(&hash_key).await;
2126 let response_bytes = if forward_requesters.is_empty() {
2127 None
2128 } else {
2129 Some(encode_response(&create_response(hash, payload.clone())))
2130 };
2131 let _ = pending.response_tx.send(Some(payload));
2132 if let Some(response_bytes) = response_bytes {
2133 for requester_id in forward_requesters {
2134 Arc::clone(&self)
2135 .enqueue_response_send(requester_id, response_bytes.clone(), Instant::now())
2136 .await;
2137 }
2138 }
2139 }
2140 }
2141
2142 async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
2143 if !res.a {
2144 return;
2145 }
2146
2147 let Some(quote_id) = res.q else {
2148 return;
2149 };
2150
2151 let hash_key = hash_to_key(&res.h);
2152 let (preferred_mint_url, offered_payment_sat) = {
2153 let pending_quotes = self.pending_quotes.read().await;
2154 let Some(pending) = pending_quotes.get(&hash_key) else {
2155 return;
2156 };
2157 (
2158 pending.preferred_mint_url.clone(),
2159 pending.offered_payment_sat,
2160 )
2161 };
2162 if !self
2163 .should_accept_quote_response(
2164 from_peer,
2165 preferred_mint_url.as_deref(),
2166 offered_payment_sat,
2167 &res,
2168 )
2169 .await
2170 {
2171 return;
2172 }
2173 let mut pending_quotes = self.pending_quotes.write().await;
2174 if let Some(pending) = pending_quotes.remove(&hash_key) {
2175 let _ = pending.response_tx.send(Some(NegotiatedQuote {
2176 peer_id: from_peer.to_string(),
2177 quote_id,
2178 mint_url: res.m,
2179 }));
2180 }
2181 }
2182
2183 async fn handle_response_message(
2184 self: &Arc<Self>,
2185 from_peer: &str,
2186 res: crate::protocol::DataResponse,
2187 ) {
2188 let hash_key = hash_to_key(&res.h);
2189 let hash = match crate::protocol::bytes_to_hash(&res.h) {
2190 Some(h) => h,
2191 None => return,
2192 };
2193
2194 if hashtree_core::sha256(&res.d) != hash {
2196 self.peer_selector.write().await.record_failure(from_peer);
2197 if self.debug {
2198 println!("[MeshStoreCore] Ignoring invalid response payload for {hash_key}");
2199 }
2200 return;
2201 }
2202
2203 self.complete_pending_response(from_peer, &hash, hash_key, res.d)
2204 .await;
2205 }
2206
2207 async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
2208 let hash = match crate::protocol::bytes_to_hash(&req.h) {
2209 Some(h) => h,
2210 None => return,
2211 };
2212 let hash_key = hash_to_key(&hash);
2213
2214 {
2215 let selector = self.peer_selector.read().await;
2216 if self.should_refuse_requests_from_peer(&selector, from_peer) {
2217 if self.debug {
2218 println!(
2219 "[MeshStoreCore] Refusing quote request from delinquent peer {}",
2220 from_peer
2221 );
2222 }
2223 return;
2224 }
2225 }
2226
2227 let chosen_mint = self.choose_quote_mint(req.m.as_deref());
2228 let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
2229 && !self.should_drop_response(&hash)
2230 && !self.should_corrupt_response(&hash);
2231
2232 let res = if can_serve {
2233 let quote_id = self
2234 .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
2235 .await;
2236 create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
2237 } else {
2238 create_quote_response_unavailable(&hash)
2239 };
2240 let response_bytes = encode_quote_response(&res);
2241 if let Some(channel) = self.signaling.get_channel(from_peer).await {
2242 if channel.send(response_bytes.clone()).await.is_ok() {
2243 self.record_peer_wire_sent(from_peer, response_bytes.len() as u64)
2244 .await;
2245 }
2246 }
2247 }
2248
2249 async fn handle_request_message(
2250 self: &Arc<Self>,
2251 from_peer: &str,
2252 req: crate::protocol::DataRequest,
2253 ) {
2254 let hash = match crate::protocol::bytes_to_hash(&req.h) {
2255 Some(h) => h,
2256 None => return,
2257 };
2258 let hash_key = hash_to_key(&hash);
2259
2260 if let Some(quote_id) = req.q {
2261 if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
2262 if self.debug {
2263 println!(
2264 "[MeshStoreCore] Refusing request with invalid or expired quote {} from {}",
2265 quote_id, from_peer
2266 );
2267 }
2268 return;
2269 }
2270 }
2271
2272 let allow_peer_forwarding = {
2273 let selector = self.peer_selector.read().await;
2274 !self.should_refuse_requests_from_peer(&selector, from_peer)
2275 };
2276
2277 if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
2279 if self.should_drop_response(&hash) {
2280 if self.debug {
2281 println!(
2282 "[MeshStoreCore] Dropping response for {} due to actor profile",
2283 hash_to_key(&hash)
2284 );
2285 }
2286 return;
2287 }
2288
2289 let response_delay = self.response_send_delay(&hash, data.len());
2290 if self.should_corrupt_response(&hash) {
2291 if data.is_empty() {
2292 data.push(0x80);
2293 } else {
2294 data[0] ^= 0x80;
2295 }
2296 }
2297
2298 let res = create_response(&hash, data);
2300 let response_bytes = encode_response(&res);
2301 let ready_at = Instant::now() + response_delay;
2302 Arc::clone(self)
2303 .enqueue_response_send(from_peer.to_string(), response_bytes, ready_at)
2304 .await;
2305 return;
2306 }
2307
2308 if self.pending_requests.read().await.contains_key(&hash_key) {
2309 let _ = self.begin_forward_request(&hash_key, from_peer).await;
2310 return;
2311 }
2312
2313 if !self.begin_forward_request(&hash_key, from_peer).await {
2314 return;
2315 }
2316
2317 let from_peer = from_peer.to_string();
2318 let this = Arc::clone(self);
2319 let request_htl = req.htl;
2320 tokio::spawn(async move {
2321 let result = if allow_peer_forwarding {
2322 let context = MeshReadContext {
2323 exclude_peer_id: Some(from_peer.clone()),
2324 request_htl,
2325 };
2326 this.request_from_mesh_with_context(&hash, &context).await
2327 } else {
2328 if this.debug {
2329 println!(
2330 "[MeshStoreCore] Serving request from delinquent peer {} via read sources only",
2331 from_peer
2332 );
2333 }
2334 match this.request_from_read_sources(&hash).await {
2335 RouteFetchOutcome::Hit(data) => Some(data),
2336 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
2337 }
2338 };
2339 let requester_ids = this.take_forward_requesters(&hash_key).await;
2340 if let Some(data) = result {
2341 let ready_at = Instant::now() + this.response_send_delay(&hash, data.len());
2342 let res = create_response(&hash, data);
2343 let response_bytes = encode_response(&res);
2344 for requester_id in requester_ids {
2345 Arc::clone(&this)
2346 .enqueue_response_send(requester_id, response_bytes.clone(), ready_at)
2347 .await;
2348 }
2349 }
2350 });
2351 }
2352
2353 pub async fn handle_data_message(self: &Arc<Self>, from_peer: &str, data: &[u8]) {
2355 self.record_peer_wire_received(from_peer, data.len() as u64)
2356 .await;
2357 let parsed = match parse_message(data) {
2358 Some(m) => m,
2359 None => return,
2360 };
2361
2362 match parsed {
2363 DataMessage::Request(req) => {
2364 self.handle_request_message(from_peer, req).await;
2365 }
2366 DataMessage::Response(res) => {
2367 self.handle_response_message(from_peer, res).await;
2368 }
2369 DataMessage::QuoteRequest(req) => {
2370 self.handle_quote_request_message(from_peer, req).await;
2371 }
2372 DataMessage::QuoteResponse(res) => {
2373 self.handle_quote_response_message(from_peer, res).await;
2374 }
2375 DataMessage::Payment(_) | DataMessage::PaymentAck(_) | DataMessage::Chunk(_) => {}
2376 }
2377 }
2378}
2379
2380#[async_trait]
2381impl<S, R, F> Store for MeshStoreCore<S, R, F>
2382where
2383 S: Store + Send + Sync + 'static,
2384 R: SignalingTransport + Send + Sync + 'static,
2385 F: PeerLinkFactory + Send + Sync + 'static,
2386{
2387 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2388 self.local_store.put(hash, data).await
2389 }
2390
2391 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2392 if let Some(data) = self.local_store.get(hash).await? {
2394 return Ok(Some(data));
2395 }
2396
2397 Ok(self.request_from_mesh(hash).await)
2399 }
2400
2401 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2402 self.local_store.has(hash).await
2403 }
2404
2405 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2406 self.local_store.delete(hash).await
2407 }
2408}
2409
2410#[cfg(test)]
2411mod tests;
2412
2413pub type SimMeshStore<S> =
2415 MeshStoreCore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
2416
2417pub type ProductionMeshStore<S> =
2419 MeshStoreCore<S, crate::nostr::NostrRelayTransport, crate::real_factory::WebRtcPeerLinkFactory>;