1use async_trait::async_trait;
8use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
9use std::collections::hash_map::DefaultHasher;
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::hash::{Hash as _, Hasher};
13use std::ops::Range;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::{oneshot, Mutex, RwLock};
17
18use hashtree_core::{Hash, Store, StoreError};
19
20use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
21use crate::protocol::{
22 create_quote_request, create_quote_response_available, create_quote_response_unavailable,
23 create_request, create_request_with_quote, create_response, encode_quote_request,
24 encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
25 DataMessage, DataQuoteRequest, DataQuoteResponse,
26};
27use crate::signaling::MeshRouter;
28use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
29use crate::types::{PeerHTLConfig, SignalingMessage, MAX_HTL};
30
31const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
34
35struct PendingRequest {
37 response_tx: oneshot::Sender<Option<Vec<u8>>>,
38 started_at: Instant,
39 queried_peers: Vec<String>,
40}
41
42struct PendingQuoteRequest {
43 response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
44 preferred_mint_url: Option<String>,
45 offered_payment_sat: u64,
46}
47
48struct PendingForwardRequest {
49 requester_ids: HashSet<String>,
50}
51
52#[async_trait]
53pub trait MeshReadSource: Send + Sync {
54 fn id(&self) -> &str;
55
56 fn is_available(&self) -> bool {
57 true
58 }
59
60 async fn get(&self, hash: &Hash) -> Option<Vec<u8>>;
61}
62
63#[derive(Debug, Clone)]
64struct NegotiatedQuote {
65 peer_id: String,
66 quote_id: u64,
67 #[allow(dead_code)]
68 mint_url: Option<String>,
69}
70
71struct IssuedQuote {
72 expires_at: Instant,
73 #[allow(dead_code)]
74 payment_sat: u64,
75 #[allow(dead_code)]
76 mint_url: Option<String>,
77}
78
79#[derive(Debug, Clone, Default)]
80struct AdaptiveSourceStats {
81 requests: u64,
82 successes: u64,
83 misses: u64,
84 failures: u64,
85 timeouts: u64,
86 srtt_ms: f64,
87 rttvar_ms: f64,
88 backoff_level: u32,
89 backed_off_until: Option<Instant>,
90 last_success_at: Option<Instant>,
91 last_failure_at: Option<Instant>,
92}
93
94#[derive(Debug, Clone)]
95struct SourceFetchResult {
96 data: Vec<u8>,
97}
98
99struct InflightSourceFetch {
100 waiters: Vec<oneshot::Sender<Option<SourceFetchResult>>>,
101}
102
103enum SourceFetchOutcome {
104 Hit {
105 source_id: String,
106 data: Vec<u8>,
107 elapsed_ms: u64,
108 },
109 Miss {
110 source_id: String,
111 },
112 Failure {
113 source_id: String,
114 },
115}
116
117const INITIAL_SOURCE_BACKOFF_MS: u64 = 250;
118const MAX_SOURCE_BACKOFF_MS: u64 = 10_000;
119const SOURCE_SCORE_TIE_DELTA: f64 = 0.15;
120const RECENT_SOURCE_SUCCESS_WINDOW: Duration = Duration::from_secs(60);
121const ACTIVE_PEER_REQUEST_RANK_PENALTY: usize = 3;
122
123fn source_reliability_score(stats: &AdaptiveSourceStats) -> f64 {
124 (stats.successes as f64 + 1.0) / (stats.requests as f64 + 2.0)
125}
126
127fn source_latency_score(stats: &AdaptiveSourceStats) -> f64 {
128 if stats.srtt_ms <= 0.0 {
129 return 0.5;
130 }
131 (500.0 / (stats.srtt_ms + 50.0)).min(1.0)
132}
133
134fn source_has_history(stats: &AdaptiveSourceStats) -> bool {
135 stats.requests > 0
136 || stats.successes > 0
137 || stats.misses > 0
138 || stats.failures > 0
139 || stats.timeouts > 0
140}
141
142fn adaptive_source_score(stats: &AdaptiveSourceStats, now: Instant) -> f64 {
143 if let Some(backed_off_until) = stats.backed_off_until {
144 if backed_off_until > now {
145 return f64::NEG_INFINITY;
146 }
147 }
148
149 let miss_penalty = if stats.requests > 0 {
150 (stats.misses as f64 / stats.requests as f64) * 0.15
151 } else {
152 0.0
153 };
154 let failure_penalty = if stats.requests > 0 {
155 ((stats.failures + stats.timeouts) as f64 / stats.requests as f64) * 0.3
156 } else {
157 0.0
158 };
159 let recency_bonus = if stats
160 .last_success_at
161 .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
162 {
163 0.1
164 } else {
165 0.0
166 };
167
168 0.6 * source_reliability_score(stats) + 0.3 * source_latency_score(stats) + recency_bonus
169 - miss_penalty
170 - failure_penalty
171}
172
173#[derive(Clone)]
174enum ReadRoute {
175 Peers(Vec<String>),
176 Sources,
177}
178
179impl ReadRoute {
180 fn id(&self) -> &'static str {
181 match self {
182 Self::Peers(_) => "peers",
183 Self::Sources => "sources",
184 }
185 }
186}
187
188#[derive(Debug, Clone)]
189struct MeshReadContext {
190 exclude_peer_id: Option<String>,
191 request_htl: u8,
192}
193
194impl Default for MeshReadContext {
195 fn default() -> Self {
196 Self {
197 exclude_peer_id: None,
198 request_htl: MAX_HTL,
199 }
200 }
201}
202
203#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
205pub struct DataPumpStats {
206 pub processed: usize,
207 pub request_messages: usize,
208 pub response_messages: usize,
209 pub quote_request_messages: u64,
210 pub quote_response_messages: u64,
211 pub processed_bytes: u64,
212}
213
214#[derive(Debug, Clone, Copy)]
220pub struct RequestDispatchConfig {
221 pub initial_fanout: usize,
223 pub hedge_fanout: usize,
225 pub max_fanout: usize,
227 pub hedge_interval_ms: u64,
229}
230
231impl Default for RequestDispatchConfig {
232 fn default() -> Self {
233 Self {
234 initial_fanout: usize::MAX,
235 hedge_fanout: usize::MAX,
236 max_fanout: usize::MAX,
237 hedge_interval_ms: 0,
238 }
239 }
240}
241
242pub fn normalize_dispatch_config(
244 dispatch: RequestDispatchConfig,
245 available_peers: usize,
246) -> RequestDispatchConfig {
247 let mut cfg = dispatch;
248 let cap = if cfg.max_fanout == 0 {
249 available_peers
250 } else {
251 cfg.max_fanout.min(available_peers)
252 };
253 cfg.max_fanout = cap;
254 cfg.initial_fanout = if cfg.initial_fanout == 0 {
255 1
256 } else {
257 cfg.initial_fanout.min(cap.max(1))
258 };
259 cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
260 1
261 } else {
262 cfg.hedge_fanout.min(cap.max(1))
263 };
264 cfg
265}
266
267pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
269 if peer_count == 0 {
270 return Vec::new();
271 }
272 let cap = dispatch.max_fanout.min(peer_count);
273 if cap == 0 {
274 return Vec::new();
275 }
276
277 let mut plan = Vec::new();
278 let mut sent = 0usize;
279 let first = dispatch.initial_fanout.min(cap).max(1);
280 plan.push(first);
281 sent += first;
282
283 while sent < cap {
284 let next = dispatch.hedge_fanout.min(cap - sent).max(1);
285 plan.push(next);
286 sent += next;
287 }
288 plan
289}
290
291#[derive(Debug)]
293pub enum HedgedWaveAction<T> {
294 Continue,
295 Success(T),
296 Abort,
297}
298
299pub async fn run_hedged_waves<T, SendWave, SendWaveFut, WaitWave, WaitWaveFut>(
304 peer_count: usize,
305 dispatch: RequestDispatchConfig,
306 request_timeout: Duration,
307 mut send_wave: SendWave,
308 mut wait_wave: WaitWave,
309) -> Option<T>
310where
311 SendWave: FnMut(Range<usize>) -> SendWaveFut,
312 SendWaveFut: Future<Output = usize>,
313 WaitWave: FnMut(Duration) -> WaitWaveFut,
314 WaitWaveFut: Future<Output = HedgedWaveAction<T>>,
315{
316 let dispatch = normalize_dispatch_config(dispatch, peer_count);
317 let wave_plan = build_hedged_wave_plan(peer_count, dispatch);
318 if wave_plan.is_empty() {
319 return None;
320 }
321
322 let deadline = Instant::now() + request_timeout;
323 let mut sent_total = 0usize;
324 let mut next_peer_idx = 0usize;
325
326 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
327 let from = next_peer_idx;
328 let to = (next_peer_idx + wave_size).min(peer_count);
329 next_peer_idx = to;
330
331 if from == to {
332 continue;
333 }
334
335 sent_total += send_wave(from..to).await;
336 if sent_total == 0 {
337 if next_peer_idx >= peer_count {
338 break;
339 }
340 continue;
341 }
342
343 let now = Instant::now();
344 if now >= deadline {
345 break;
346 }
347 let remaining = deadline.saturating_duration_since(now);
348 let is_last_wave = wave_idx + 1 == wave_plan.len() || next_peer_idx >= peer_count;
349 let wait = if is_last_wave {
350 remaining
351 } else if dispatch.hedge_interval_ms == 0 {
352 Duration::ZERO
353 } else {
354 Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
355 };
356
357 if wait.is_zero() {
358 continue;
359 }
360
361 match wait_wave(wait).await {
362 HedgedWaveAction::Continue => {}
363 HedgedWaveAction::Success(value) => return Some(value),
364 HedgedWaveAction::Abort => break,
365 }
366 }
367
368 None
369}
370
371pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
373 let mut selector = selector.write().await;
374 let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
375 let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
376 for peer_id in known {
377 if !current.contains(peer_id.as_str()) {
378 selector.remove_peer(&peer_id);
379 }
380 }
381 for peer_id in current_peer_ids {
382 selector.add_peer(peer_id.clone());
383 }
384}
385
386#[derive(Debug, Clone, Copy)]
390pub struct ResponseBehaviorConfig {
391 pub drop_response_prob: f64,
393 pub corrupt_response_prob: f64,
395 pub extra_delay_ms: u64,
397}
398
399impl Default for ResponseBehaviorConfig {
400 fn default() -> Self {
401 Self {
402 drop_response_prob: 0.0,
403 corrupt_response_prob: 0.0,
404 extra_delay_ms: 0,
405 }
406 }
407}
408
409impl ResponseBehaviorConfig {
410 fn normalized(self) -> Self {
411 Self {
412 drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
413 corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
414 extra_delay_ms: self.extra_delay_ms,
415 }
416 }
417}
418
419#[derive(Debug, Clone)]
421pub struct MeshRoutingConfig {
422 pub selection_strategy: SelectionStrategy,
423 pub fairness_enabled: bool,
424 pub cashu_payment_weight: f64,
426 pub cashu_payment_default_block_threshold: u64,
429 pub cashu_accepted_mints: Vec<String>,
431 pub cashu_default_mint: Option<String>,
433 pub cashu_peer_suggested_mint_base_cap_sat: u64,
435 pub cashu_peer_suggested_mint_success_step_sat: u64,
437 pub cashu_peer_suggested_mint_receipt_step_sat: u64,
439 pub cashu_peer_suggested_mint_max_cap_sat: u64,
441 pub dispatch: RequestDispatchConfig,
442 pub response_behavior: ResponseBehaviorConfig,
443}
444
445impl Default for MeshRoutingConfig {
446 fn default() -> Self {
447 Self {
448 selection_strategy: SelectionStrategy::Weighted,
449 fairness_enabled: true,
450 cashu_payment_weight: 0.0,
451 cashu_payment_default_block_threshold: 0,
452 cashu_accepted_mints: Vec::new(),
453 cashu_default_mint: None,
454 cashu_peer_suggested_mint_base_cap_sat: 0,
455 cashu_peer_suggested_mint_success_step_sat: 0,
456 cashu_peer_suggested_mint_receipt_step_sat: 0,
457 cashu_peer_suggested_mint_max_cap_sat: 0,
458 dispatch: RequestDispatchConfig::default(),
459 response_behavior: ResponseBehaviorConfig::default(),
460 }
461 }
462}
463
464pub struct MeshStoreCore<S, R, F>
471where
472 S: Store + Send + Sync + 'static,
473 R: SignalingTransport + Send + Sync + 'static,
474 F: PeerLinkFactory + Send + Sync + 'static,
475{
476 local_store: Arc<S>,
478 signaling: Arc<MeshRouter<R, F>>,
480 htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
482 pending_requests: RwLock<HashMap<String, PendingRequest>>,
484 pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
486 pending_forward_requests: RwLock<HashMap<String, PendingForwardRequest>>,
488 issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
490 next_quote_id: RwLock<u64>,
492 read_sources: RwLock<HashMap<String, Arc<dyn MeshReadSource>>>,
494 read_source_stats: RwLock<HashMap<String, AdaptiveSourceStats>>,
496 inflight_source_fetches: Mutex<HashMap<String, InflightSourceFetch>>,
498 read_route_stats: RwLock<HashMap<String, AdaptiveSourceStats>>,
500 peer_selector: RwLock<PeerSelector>,
502 peer_active_requests: RwLock<HashMap<String, usize>>,
504 routing: MeshRoutingConfig,
506 request_timeout: Duration,
508 debug: bool,
510 running: RwLock<bool>,
512}
513
514impl<S, R, F> MeshStoreCore<S, R, F>
515where
516 S: Store + Send + Sync + 'static,
517 R: SignalingTransport + Send + Sync + 'static,
518 F: PeerLinkFactory + Send + Sync + 'static,
519{
520 pub fn new(
522 local_store: Arc<S>,
523 signaling: Arc<MeshRouter<R, F>>,
524 request_timeout: Duration,
525 debug: bool,
526 ) -> Self {
527 Self::new_with_routing(
528 local_store,
529 signaling,
530 request_timeout,
531 debug,
532 Default::default(),
533 )
534 }
535
536 pub fn new_with_routing(
538 local_store: Arc<S>,
539 signaling: Arc<MeshRouter<R, F>>,
540 request_timeout: Duration,
541 debug: bool,
542 routing: MeshRoutingConfig,
543 ) -> Self {
544 let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
545 selector.set_fairness(routing.fairness_enabled);
546 selector.set_cashu_payment_weight(routing.cashu_payment_weight);
547 Self {
548 local_store,
549 signaling,
550 htl_configs: RwLock::new(HashMap::new()),
551 pending_requests: RwLock::new(HashMap::new()),
552 pending_quotes: RwLock::new(HashMap::new()),
553 pending_forward_requests: RwLock::new(HashMap::new()),
554 issued_quotes: RwLock::new(HashMap::new()),
555 next_quote_id: RwLock::new(1),
556 read_sources: RwLock::new(HashMap::new()),
557 read_source_stats: RwLock::new(HashMap::new()),
558 inflight_source_fetches: Mutex::new(HashMap::new()),
559 read_route_stats: RwLock::new(HashMap::new()),
560 peer_selector: RwLock::new(selector),
561 peer_active_requests: RwLock::new(HashMap::new()),
562 routing,
563 request_timeout,
564 debug,
565 running: RwLock::new(false),
566 }
567 }
568
569 pub async fn start(&self) -> Result<(), TransportError> {
571 *self.running.write().await = true;
572
573 self.signaling.send_hello(vec![]).await?;
575
576 Ok(())
577 }
578
579 pub async fn stop(&self) {
581 *self.running.write().await = false;
582 }
583
584 pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
586 let peer_id = msg.peer_id().to_string();
588 {
589 let mut configs = self.htl_configs.write().await;
590 if !configs.contains_key(&peer_id) {
591 configs.insert(peer_id.clone(), PeerHTLConfig::random());
592 }
593 }
594 self.peer_selector.write().await.add_peer(peer_id);
595
596 self.signaling.handle_message(msg).await
597 }
598
599 pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
601 &self.signaling
602 }
603
604 fn response_behavior(&self) -> ResponseBehaviorConfig {
605 self.routing.response_behavior.normalized()
606 }
607
608 fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
609 let mut hasher = DefaultHasher::new();
610 peer_id.hash(&mut hasher);
611 hash.hash(&mut hasher);
612 salt.hash(&mut hasher);
613 let v = hasher.finish();
614 (v as f64) / (u64::MAX as f64)
615 }
616
617 fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
618 Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
619 }
620
621 fn peer_metadata_pointer_slot_hash() -> Hash {
622 hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
623 }
624
625 fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
626 let bytes = hex::decode(hash_hex)
627 .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
628 if bytes.len() != 32 {
629 return Err(StoreError::Other(format!(
630 "Invalid hash length {}, expected 32 bytes",
631 bytes.len()
632 )));
633 }
634 let mut hash = [0u8; 32];
635 hash.copy_from_slice(&bytes);
636 Ok(hash)
637 }
638
639 fn should_drop_response(&self, hash: &Hash) -> bool {
640 let p = self.response_behavior().drop_response_prob;
641 if p <= 0.0 {
642 return false;
643 }
644 self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
645 }
646
647 fn should_corrupt_response(&self, hash: &Hash) -> bool {
648 let p = self.response_behavior().corrupt_response_prob;
649 if p <= 0.0 {
650 return false;
651 }
652 self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
653 }
654
655 async fn ordered_connected_peers(&self, exclude_peer_id: Option<&str>) -> Vec<String> {
656 let current_peer_ids = self.signaling.peer_ids().await;
657 if current_peer_ids.is_empty() {
658 return Vec::new();
659 }
660
661 sync_selector_peers(&self.peer_selector, ¤t_peer_ids).await;
662 let mut candidate_peer_ids: Vec<String> = current_peer_ids
663 .into_iter()
664 .filter(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude))
665 .collect();
666 if candidate_peer_ids.is_empty() {
667 return Vec::new();
668 }
669
670 let current_set: HashSet<&str> = candidate_peer_ids.iter().map(String::as_str).collect();
671 let mut selector = self.peer_selector.write().await;
672 let mut selector_order = selector.select_peers();
673 selector_order.retain(|peer_id| current_set.contains(peer_id.as_str()));
674 if selector_order.is_empty() {
675 let mut fallback = candidate_peer_ids;
676 fallback.sort();
677 return fallback;
678 }
679 let backed_off: HashMap<String, bool> = candidate_peer_ids
680 .iter()
681 .map(|peer_id| (peer_id.clone(), selector.is_peer_backed_off(peer_id)))
682 .collect();
683 drop(selector);
684
685 let rank: HashMap<&str, usize> = selector_order
686 .iter()
687 .enumerate()
688 .map(|(idx, peer_id)| (peer_id.as_str(), idx))
689 .collect();
690 let active = self.peer_active_requests.read().await;
691 candidate_peer_ids.sort_by(|left, right| {
692 let left_backed_off = backed_off.get(left).copied().unwrap_or(false);
693 let right_backed_off = backed_off.get(right).copied().unwrap_or(false);
694 if left_backed_off != right_backed_off {
695 return if left_backed_off {
696 std::cmp::Ordering::Greater
697 } else {
698 std::cmp::Ordering::Less
699 };
700 }
701 let left_rank = rank.get(left.as_str()).copied().unwrap_or(usize::MAX / 2);
702 let right_rank = rank.get(right.as_str()).copied().unwrap_or(usize::MAX / 2);
703 let left_load = active.get(left).copied().unwrap_or(0);
704 let right_load = active.get(right).copied().unwrap_or(0);
705 (left_rank + left_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY))
706 .cmp(&(right_rank + right_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY)))
707 .then_with(|| left.cmp(right))
708 });
709 candidate_peer_ids
710 }
711
712 async fn reserve_peer_request(&self, peer_id: &str) {
713 let mut active = self.peer_active_requests.write().await;
714 *active.entry(peer_id.to_string()).or_insert(0) += 1;
715 }
716
717 async fn release_peer_request(&self, peer_id: &str) {
718 let mut active = self.peer_active_requests.write().await;
719 let Some(count) = active.get_mut(peer_id) else {
720 return;
721 };
722 if *count <= 1 {
723 active.remove(peer_id);
724 } else {
725 *count -= 1;
726 }
727 }
728
729 async fn release_queried_peer_requests(&self, peer_ids: &[String]) {
730 for peer_id in peer_ids {
731 self.release_peer_request(peer_id).await;
732 }
733 }
734
735 fn requested_quote_mint(&self) -> Option<&str> {
736 if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
737 if self.routing.cashu_accepted_mints.is_empty()
738 || self
739 .routing
740 .cashu_accepted_mints
741 .iter()
742 .any(|mint| mint == default_mint)
743 {
744 return Some(default_mint);
745 }
746 }
747
748 self.routing
749 .cashu_accepted_mints
750 .first()
751 .map(String::as_str)
752 }
753
754 fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
755 if let Some(requested_mint) = requested_mint {
756 if self.accepts_quote_mint(Some(requested_mint)) {
757 return Some(requested_mint.to_string());
758 }
759 }
760 if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
761 return Some(default_mint.clone());
762 }
763 if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
764 return Some(first_mint.clone());
765 }
766 requested_mint.map(str::to_string)
767 }
768
769 fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
770 if self.routing.cashu_accepted_mints.is_empty() {
771 return true;
772 }
773
774 let Some(mint_url) = mint_url else {
775 return false;
776 };
777 self.routing
778 .cashu_accepted_mints
779 .iter()
780 .any(|mint| mint == mint_url)
781 }
782
783 fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
784 let Some(mint_url) = mint_url else {
785 return self.routing.cashu_default_mint.is_none()
786 && self.routing.cashu_accepted_mints.is_empty();
787 };
788 self.routing.cashu_default_mint.as_deref() == Some(mint_url)
789 || self
790 .routing
791 .cashu_accepted_mints
792 .iter()
793 .any(|mint| mint == mint_url)
794 }
795
796 async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
797 let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
798 if base == 0 {
799 return 0;
800 }
801
802 let selector = self.peer_selector.read().await;
803 let Some(stats) = selector.get_stats(peer_id) else {
804 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
805 return if max_cap > 0 { base.min(max_cap) } else { base };
806 };
807
808 if stats.cashu_payment_defaults > 0
809 && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
810 {
811 return 0;
812 }
813
814 let success_bonus = stats
815 .successes
816 .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
817 let receipt_bonus = stats
818 .cashu_payment_receipts
819 .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
820 let mut cap = base
821 .saturating_add(success_bonus)
822 .saturating_add(receipt_bonus);
823 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
824 if max_cap > 0 {
825 cap = cap.min(max_cap);
826 }
827 cap
828 }
829
830 async fn should_accept_quote_response(
831 &self,
832 from_peer: &str,
833 preferred_mint_url: Option<&str>,
834 offered_payment_sat: u64,
835 res: &DataQuoteResponse,
836 ) -> bool {
837 let Some(payment_sat) = res.p else {
838 return false;
839 };
840 if payment_sat > offered_payment_sat {
841 return false;
842 }
843
844 let response_mint = res.m.as_deref();
845 if response_mint == preferred_mint_url {
846 return true;
847 }
848 if self.trusts_quote_mint(response_mint) {
849 return true;
850 }
851 if response_mint.is_none() {
852 return false;
853 }
854
855 payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
856 }
857
858 async fn issue_quote(
859 &self,
860 peer_id: &str,
861 hash_key: &str,
862 payment_sat: u64,
863 ttl_ms: u32,
864 mint_url: Option<&str>,
865 ) -> u64 {
866 let quote_id = {
867 let mut next = self.next_quote_id.write().await;
868 let quote_id = *next;
869 *next = next.saturating_add(1);
870 quote_id
871 };
872
873 let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
874 self.issued_quotes.write().await.insert(
875 (peer_id.to_string(), hash_key.to_string(), quote_id),
876 IssuedQuote {
877 expires_at,
878 payment_sat,
879 mint_url: mint_url.map(str::to_string),
880 },
881 );
882 quote_id
883 }
884
885 async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
886 let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
887 let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
888 return false;
889 };
890 quote.expires_at > Instant::now()
891 }
892
893 async fn send_request_to_peer(
894 &self,
895 peer_id: &str,
896 hash: &Hash,
897 request_htl: u8,
898 quote_id: Option<u64>,
899 ) -> bool {
900 let channel = match self.signaling.get_channel(peer_id).await {
901 Some(c) => c,
902 None => return false,
903 };
904
905 let htl_config = {
906 let configs = self.htl_configs.read().await;
907 configs
908 .get(peer_id)
909 .cloned()
910 .unwrap_or_else(PeerHTLConfig::random)
911 };
912
913 let send_htl = htl_config.decrement(request_htl);
914 let req = match quote_id {
915 Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
916 None => create_request(hash, send_htl),
917 };
918 let request_bytes = encode_request(&req);
919
920 {
921 let mut selector = self.peer_selector.write().await;
922 selector.record_request(peer_id, request_bytes.len() as u64);
923 }
924
925 match channel.send(request_bytes).await {
926 Ok(()) => true,
927 Err(_) => {
928 self.peer_selector.write().await.record_failure(peer_id);
929 false
930 }
931 }
932 }
933
934 async fn send_quote_request_to_peer(
935 &self,
936 peer_id: &str,
937 hash: &Hash,
938 payment_sat: u64,
939 ttl_ms: u32,
940 mint_url: Option<&str>,
941 ) -> bool {
942 let channel = match self.signaling.get_channel(peer_id).await {
943 Some(c) => c,
944 None => return false,
945 };
946
947 let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
948 let request_bytes = encode_quote_request(&req);
949
950 match channel.send(request_bytes).await {
951 Ok(()) => true,
952 Err(_) => false,
953 }
954 }
955
956 pub async fn set_read_sources(&self, sources: Vec<Arc<dyn MeshReadSource>>) {
957 let mut by_id = HashMap::new();
958 let mut stats = self.read_source_stats.write().await;
959 for source in sources {
960 let source_id = source.id().to_string();
961 by_id.insert(source_id.clone(), source);
962 stats
963 .entry(source_id)
964 .or_insert_with(AdaptiveSourceStats::default);
965 }
966 *self.read_sources.write().await = by_id;
967 }
968
969 fn route_stats_for<'a>(
970 stats: &'a mut HashMap<String, AdaptiveSourceStats>,
971 route_id: &str,
972 ) -> &'a mut AdaptiveSourceStats {
973 stats
974 .entry(route_id.to_string())
975 .or_insert_with(AdaptiveSourceStats::default)
976 }
977
978 async fn record_route_request(&self, route_id: &str) {
979 let mut stats = self.read_route_stats.write().await;
980 Self::route_stats_for(&mut stats, route_id).requests += 1;
981 }
982
983 async fn record_route_success(&self, route_id: &str, elapsed_ms: u64) {
984 let now = Instant::now();
985 let mut stats = self.read_route_stats.write().await;
986 let stats = Self::route_stats_for(&mut stats, route_id);
987 stats.successes += 1;
988 stats.last_success_at = Some(now);
989 stats.backoff_level = 0;
990 stats.backed_off_until = None;
991 if stats.srtt_ms <= 0.0 {
992 stats.srtt_ms = elapsed_ms as f64;
993 stats.rttvar_ms = elapsed_ms as f64 / 2.0;
994 return;
995 }
996 let elapsed = elapsed_ms as f64;
997 stats.rttvar_ms = 0.75 * stats.rttvar_ms + 0.25 * (stats.srtt_ms - elapsed).abs();
998 stats.srtt_ms = 0.875 * stats.srtt_ms + 0.125 * elapsed;
999 }
1000
1001 async fn record_route_miss(&self, route_id: &str) {
1002 let mut stats = self.read_route_stats.write().await;
1003 Self::route_stats_for(&mut stats, route_id).misses += 1;
1004 }
1005
1006 async fn record_read_source_request(&self, source_id: &str) {
1007 let mut stats = self.read_source_stats.write().await;
1008 stats
1009 .entry(source_id.to_string())
1010 .or_insert_with(AdaptiveSourceStats::default)
1011 .requests += 1;
1012 }
1013
1014 async fn record_read_source_miss(&self, source_id: &str) {
1015 let mut stats = self.read_source_stats.write().await;
1016 stats
1017 .entry(source_id.to_string())
1018 .or_insert_with(AdaptiveSourceStats::default)
1019 .misses += 1;
1020 }
1021
1022 async fn record_read_source_success(&self, source_id: &str, elapsed_ms: u64) {
1023 let now = Instant::now();
1024 let mut stats = self.read_source_stats.write().await;
1025 let stats = stats
1026 .entry(source_id.to_string())
1027 .or_insert_with(AdaptiveSourceStats::default);
1028 stats.successes += 1;
1029 stats.last_success_at = Some(now);
1030 stats.backoff_level = 0;
1031 stats.backed_off_until = None;
1032 if stats.srtt_ms <= 0.0 {
1033 stats.srtt_ms = elapsed_ms as f64;
1034 stats.rttvar_ms = elapsed_ms as f64 / 2.0;
1035 return;
1036 }
1037 let elapsed = elapsed_ms as f64;
1038 stats.rttvar_ms = 0.75 * stats.rttvar_ms + 0.25 * (stats.srtt_ms - elapsed).abs();
1039 stats.srtt_ms = 0.875 * stats.srtt_ms + 0.125 * elapsed;
1040 }
1041
1042 async fn record_read_source_failure(&self, source_id: &str) {
1043 let now = Instant::now();
1044 let mut stats = self.read_source_stats.write().await;
1045 let stats = stats
1046 .entry(source_id.to_string())
1047 .or_insert_with(AdaptiveSourceStats::default);
1048 stats.failures += 1;
1049 stats.last_failure_at = Some(now);
1050 Self::apply_source_backoff(stats, now);
1051 }
1052
1053 async fn record_read_source_timeout(&self, source_id: &str) {
1054 let now = Instant::now();
1055 let mut stats = self.read_source_stats.write().await;
1056 let stats = stats
1057 .entry(source_id.to_string())
1058 .or_insert_with(AdaptiveSourceStats::default);
1059 stats.timeouts += 1;
1060 stats.last_failure_at = Some(now);
1061 Self::apply_source_backoff(stats, now);
1062 }
1063
1064 fn apply_source_backoff(stats: &mut AdaptiveSourceStats, now: Instant) {
1065 stats.backoff_level = stats.backoff_level.saturating_add(1);
1066 let backoff_ms = (INITIAL_SOURCE_BACKOFF_MS
1067 .saturating_mul(2u64.saturating_pow(stats.backoff_level.saturating_sub(1))))
1068 .min(MAX_SOURCE_BACKOFF_MS);
1069 stats.backed_off_until = Some(now + Duration::from_millis(backoff_ms));
1070 }
1071
1072 async fn ordered_read_sources(&self) -> Vec<Arc<dyn MeshReadSource>> {
1073 let sources = self.read_sources.read().await;
1074 if sources.is_empty() {
1075 return Vec::new();
1076 }
1077
1078 let mut available: Vec<Arc<dyn MeshReadSource>> = sources
1079 .values()
1080 .filter(|source| source.is_available())
1081 .cloned()
1082 .collect();
1083 if available.is_empty() {
1084 return Vec::new();
1085 }
1086
1087 let now = Instant::now();
1088 let stats = self.read_source_stats.read().await;
1089 let mut healthy: Vec<Arc<dyn MeshReadSource>> = available
1090 .iter()
1091 .filter(|source| {
1092 stats
1093 .get(source.id())
1094 .and_then(|s| s.backed_off_until)
1095 .is_none_or(|until| until <= now)
1096 })
1097 .cloned()
1098 .collect();
1099 if !healthy.is_empty() {
1100 available = std::mem::take(&mut healthy);
1101 }
1102
1103 available.sort_by(|left, right| {
1104 let left_stats = stats.get(left.id()).cloned().unwrap_or_default();
1105 let right_stats = stats.get(right.id()).cloned().unwrap_or_default();
1106 adaptive_source_score(&right_stats, now)
1107 .partial_cmp(&adaptive_source_score(&left_stats, now))
1108 .unwrap_or(std::cmp::Ordering::Equal)
1109 .then_with(|| left.id().cmp(right.id()))
1110 });
1111 available
1112 }
1113
1114 async fn should_probe_multiple_read_sources(
1115 &self,
1116 ordered_sources: &[Arc<dyn MeshReadSource>],
1117 ) -> bool {
1118 if ordered_sources.len() <= 1 {
1119 return false;
1120 }
1121 let stats = self.read_source_stats.read().await;
1122 let best = stats
1123 .get(ordered_sources[0].id())
1124 .cloned()
1125 .unwrap_or_default();
1126 let second = stats
1127 .get(ordered_sources[1].id())
1128 .cloned()
1129 .unwrap_or_default();
1130 if !source_has_history(&best) || !source_has_history(&second) {
1131 return true;
1132 }
1133 let now = Instant::now();
1134 adaptive_source_score(&best, now) - adaptive_source_score(&second, now)
1135 < SOURCE_SCORE_TIE_DELTA
1136 }
1137
1138 async fn source_dispatch_for(&self, source_count: usize) -> RequestDispatchConfig {
1139 if source_count == 0 {
1140 return self.routing.dispatch;
1141 }
1142 let ordered_sources = self.ordered_read_sources().await;
1143 let probe_multiple = self
1144 .should_probe_multiple_read_sources(&ordered_sources)
1145 .await;
1146 RequestDispatchConfig {
1147 initial_fanout: if probe_multiple {
1148 source_count.min(2)
1149 } else {
1150 1
1151 },
1152 hedge_fanout: self.routing.dispatch.hedge_fanout,
1153 max_fanout: self.routing.dispatch.max_fanout.min(source_count),
1154 hedge_interval_ms: self.routing.dispatch.hedge_interval_ms,
1155 }
1156 }
1157
1158 pub async fn peer_count(&self) -> usize {
1160 self.signaling.peer_count().await
1161 }
1162
1163 pub async fn needs_peers(&self) -> bool {
1165 self.signaling.needs_peers().await
1166 }
1167
1168 pub async fn send_hello(&self) -> Result<(), TransportError> {
1170 self.signaling.send_hello(vec![]).await
1171 }
1172
1173 pub async fn drain_available_data_messages(self: &Arc<Self>) -> DataPumpStats {
1178 let mut stats = DataPumpStats::default();
1179 let peer_ids = self.signaling.peer_ids().await;
1180 for peer_id in peer_ids {
1181 let Some(channel) = self.signaling.get_channel(&peer_id).await else {
1182 continue;
1183 };
1184
1185 while let Some(data) = channel.try_recv() {
1186 stats.processed += 1;
1187 stats.processed_bytes += data.len() as u64;
1188 if let Some(msg) = parse_message(&data) {
1189 match msg {
1190 DataMessage::Request(_) => stats.request_messages += 1,
1191 DataMessage::Response(_) => stats.response_messages += 1,
1192 DataMessage::QuoteRequest(_) => stats.quote_request_messages += 1,
1193 DataMessage::QuoteResponse(_) => stats.quote_response_messages += 1,
1194 DataMessage::Payment(_)
1195 | DataMessage::PaymentAck(_)
1196 | DataMessage::Chunk(_) => {}
1197 }
1198 }
1199 self.handle_data_message(&peer_id, &data).await;
1200 }
1201 }
1202 stats
1203 }
1204
1205 pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
1207 self.peer_selector
1208 .write()
1209 .await
1210 .record_cashu_payment(peer_id, amount_sat);
1211 }
1212
1213 pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
1215 self.peer_selector
1216 .write()
1217 .await
1218 .record_cashu_receipt(peer_id, amount_sat);
1219 }
1220
1221 pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
1223 self.peer_selector
1224 .write()
1225 .await
1226 .record_cashu_payment_default(peer_id);
1227 }
1228
1229 pub async fn selector_summary(&self) -> crate::peer_selector::SelectorSummary {
1231 self.peer_selector.read().await.summary()
1232 }
1233
1234 fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
1235 selector.is_peer_blocked_for_payment_defaults(
1236 peer_id,
1237 self.routing.cashu_payment_default_block_threshold,
1238 )
1239 }
1240
1241 pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
1243 self.peer_selector
1244 .read()
1245 .await
1246 .export_peer_metadata_snapshot()
1247 }
1248
1249 pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
1254 let snapshot = self
1255 .peer_selector
1256 .read()
1257 .await
1258 .export_peer_metadata_snapshot();
1259 let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
1260 StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
1261 })?;
1262 let snapshot_hash = hashtree_core::sha256(&bytes);
1263 let _ = self.local_store.put(snapshot_hash, bytes).await?;
1264
1265 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
1266 let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
1267 let _ = self.local_store.delete(&pointer_slot).await?;
1268 let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
1269
1270 Ok(snapshot_hash)
1271 }
1272
1273 pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
1275 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
1276 let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
1277 return Ok(false);
1278 };
1279 let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
1280 StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
1281 })?;
1282 let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
1283
1284 let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
1285 return Ok(false);
1286 };
1287 let snapshot: PeerMetadataSnapshot =
1288 serde_json::from_slice(&snapshot_bytes).map_err(|e| {
1289 StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
1290 })?;
1291 self.peer_selector
1292 .write()
1293 .await
1294 .import_peer_metadata_snapshot(&snapshot);
1295 Ok(true)
1296 }
1297
1298 pub async fn get_with_quote(
1303 &self,
1304 hash: &Hash,
1305 payment_sat: u64,
1306 quote_ttl: Duration,
1307 ) -> Result<Option<Vec<u8>>, StoreError> {
1308 if let Some(data) = self.local_store.get(hash).await? {
1309 return Ok(Some(data));
1310 }
1311 Ok(self
1312 .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
1313 .await)
1314 }
1315
1316 async fn request_from_peers_with_quote(
1317 &self,
1318 hash: &Hash,
1319 payment_sat: u64,
1320 quote_ttl: Duration,
1321 ) -> Option<Vec<u8>> {
1322 let ordered_peer_ids = self.ordered_connected_peers(None).await;
1323 if ordered_peer_ids.is_empty() {
1324 return None;
1325 }
1326
1327 if let Some(quote) = self
1328 .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
1329 .await
1330 {
1331 if let Some(data) = self
1332 .request_from_single_peer(hash, "e.peer_id, MAX_HTL, Some(quote.quote_id))
1333 .await
1334 {
1335 return Some(data);
1336 }
1337 }
1338
1339 self.request_from_mesh(hash).await
1340 }
1341
1342 async fn request_quote_from_peers(
1343 &self,
1344 hash: &Hash,
1345 payment_sat: u64,
1346 quote_ttl: Duration,
1347 ordered_peer_ids: &[String],
1348 ) -> Option<NegotiatedQuote> {
1349 if ordered_peer_ids.is_empty() {
1350 return None;
1351 }
1352 let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
1353 if ttl_ms == 0 {
1354 return None;
1355 }
1356 let requested_mint = self.requested_quote_mint().map(str::to_string);
1357
1358 let hash_key = hash_to_key(hash);
1359 let (tx, rx) = oneshot::channel();
1360 self.pending_quotes.write().await.insert(
1361 hash_key.clone(),
1362 PendingQuoteRequest {
1363 response_tx: tx,
1364 preferred_mint_url: requested_mint.clone(),
1365 offered_payment_sat: payment_sat,
1366 },
1367 );
1368
1369 let rx = Arc::new(Mutex::new(rx));
1370 let result = run_hedged_waves(
1371 ordered_peer_ids.len(),
1372 self.routing.dispatch,
1373 self.request_timeout,
1374 |range| {
1375 let wave_peer_ids = ordered_peer_ids[range].to_vec();
1376 let requested_mint = requested_mint.clone();
1377 let hash = *hash;
1378 async move {
1379 let mut sent = 0usize;
1380 for peer_id in wave_peer_ids {
1381 if self
1382 .send_quote_request_to_peer(
1383 &peer_id,
1384 &hash,
1385 payment_sat,
1386 ttl_ms,
1387 requested_mint.as_deref(),
1388 )
1389 .await
1390 {
1391 sent += 1;
1392 }
1393 }
1394 sent
1395 }
1396 },
1397 |wait| {
1398 let rx = rx.clone();
1399 async move {
1400 let mut rx = rx.lock().await;
1401 match tokio::time::timeout(wait, &mut *rx).await {
1402 Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
1403 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1404 Err(_) => HedgedWaveAction::Continue,
1405 }
1406 }
1407 },
1408 )
1409 .await;
1410 let _ = self.pending_quotes.write().await.remove(&hash_key);
1411 result
1412 }
1413
1414 async fn request_from_single_peer(
1415 &self,
1416 hash: &Hash,
1417 peer_id: &str,
1418 request_htl: u8,
1419 quote_id: Option<u64>,
1420 ) -> Option<Vec<u8>> {
1421 let hash_key = hash_to_key(hash);
1422 let (tx, rx) = oneshot::channel();
1423 self.pending_requests.write().await.insert(
1424 hash_key.clone(),
1425 PendingRequest {
1426 response_tx: tx,
1427 started_at: Instant::now(),
1428 queried_peers: vec![peer_id.to_string()],
1429 },
1430 );
1431
1432 let mut rx = rx;
1433 if !self
1434 .send_request_to_peer(peer_id, hash, request_htl, quote_id)
1435 .await
1436 {
1437 let _ = self.pending_requests.write().await.remove(&hash_key);
1438 return None;
1439 }
1440 self.reserve_peer_request(peer_id).await;
1441
1442 if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
1443 if hashtree_core::sha256(&data) == *hash {
1444 let _ = self.local_store.put(*hash, data.clone()).await;
1445 return Some(data);
1446 }
1447 }
1448
1449 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1450 self.release_queried_peer_requests(&pending.queried_peers)
1451 .await;
1452 for peer_id in pending.queried_peers {
1453 self.peer_selector.write().await.record_timeout(&peer_id);
1454 }
1455 }
1456 let _ = self.take_forward_requesters(&hash_key).await;
1457 None
1458 }
1459
1460 async fn request_from_ordered_peers(
1461 &self,
1462 hash: &Hash,
1463 ordered_peer_ids: &[String],
1464 request_htl: u8,
1465 ) -> Option<Vec<u8>> {
1466 let hash_key = hash_to_key(hash);
1467 let (tx, rx) = oneshot::channel();
1468 self.pending_requests.write().await.insert(
1469 hash_key.clone(),
1470 PendingRequest {
1471 response_tx: tx,
1472 started_at: Instant::now(),
1473 queried_peers: Vec::new(),
1474 },
1475 );
1476
1477 let rx = Arc::new(Mutex::new(rx));
1478 let result = run_hedged_waves(
1479 ordered_peer_ids.len(),
1480 self.routing.dispatch,
1481 self.request_timeout,
1482 |range| {
1483 let wave_peer_ids = ordered_peer_ids[range].to_vec();
1484 let hash = *hash;
1485 let hash_key = hash_key.clone();
1486 async move {
1487 let mut sent = 0usize;
1488 for peer_id in wave_peer_ids {
1489 if self
1490 .send_request_to_peer(&peer_id, &hash, request_htl, None)
1491 .await
1492 {
1493 sent += 1;
1494 self.reserve_peer_request(&peer_id).await;
1495 if let Some(pending) =
1496 self.pending_requests.write().await.get_mut(&hash_key)
1497 {
1498 pending.queried_peers.push(peer_id);
1499 }
1500 }
1501 }
1502 sent
1503 }
1504 },
1505 |wait| {
1506 let rx = rx.clone();
1507 async move {
1508 let mut rx = rx.lock().await;
1509 match tokio::time::timeout(wait, &mut *rx).await {
1510 Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == *hash => {
1511 HedgedWaveAction::Success(data)
1512 }
1513 Ok(Ok(Some(_))) => HedgedWaveAction::Continue,
1514 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1515 Err(_) => HedgedWaveAction::Continue,
1516 }
1517 }
1518 },
1519 )
1520 .await;
1521
1522 let Some(data) = result else {
1523 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1524 self.release_queried_peer_requests(&pending.queried_peers)
1525 .await;
1526 for peer_id in pending.queried_peers {
1527 self.peer_selector.write().await.record_timeout(&peer_id);
1528 }
1529 }
1530 let _ = self.take_forward_requesters(&hash_key).await;
1531 return None;
1532 };
1533
1534 let _ = self.local_store.put(*hash, data.clone()).await;
1535 Some(data)
1536 }
1537
1538 async fn request_from_read_sources_inner(&self, hash: &Hash) -> Option<SourceFetchResult> {
1539 let ordered_sources = self.ordered_read_sources().await;
1540 if ordered_sources.is_empty() {
1541 return None;
1542 }
1543
1544 let dispatch = normalize_dispatch_config(
1545 self.source_dispatch_for(ordered_sources.len()).await,
1546 ordered_sources.len(),
1547 );
1548 let wave_plan = build_hedged_wave_plan(ordered_sources.len(), dispatch);
1549 if wave_plan.is_empty() {
1550 return None;
1551 }
1552
1553 let deadline = Instant::now() + self.request_timeout;
1554 let mut pending = FuturesUnordered::new();
1555 let mut pending_source_ids = HashSet::new();
1556 let mut next_source_idx = 0usize;
1557
1558 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
1559 let from = next_source_idx;
1560 let to = (next_source_idx + wave_size).min(ordered_sources.len());
1561 next_source_idx = to;
1562
1563 for source in ordered_sources[from..to].iter().cloned() {
1564 let source_id = source.id().to_string();
1565 self.record_read_source_request(&source_id).await;
1566 pending_source_ids.insert(source_id.clone());
1567 let hash = *hash;
1568 pending.push(tokio::spawn(async move {
1569 let started_at = Instant::now();
1570 let result = std::panic::AssertUnwindSafe(source.get(&hash))
1571 .catch_unwind()
1572 .await;
1573 match result {
1574 Ok(Some(data)) => SourceFetchOutcome::Hit {
1575 source_id,
1576 data,
1577 elapsed_ms: started_at.elapsed().as_millis().max(1) as u64,
1578 },
1579 Ok(None) => SourceFetchOutcome::Miss { source_id },
1580 Err(_) => SourceFetchOutcome::Failure { source_id },
1581 }
1582 }));
1583 }
1584
1585 let is_last_wave =
1586 wave_idx + 1 == wave_plan.len() || next_source_idx >= ordered_sources.len();
1587 let window_end = if is_last_wave {
1588 deadline
1589 } else {
1590 (Instant::now() + Duration::from_millis(dispatch.hedge_interval_ms)).min(deadline)
1591 };
1592
1593 while Instant::now() < window_end {
1594 let remaining = window_end.saturating_duration_since(Instant::now());
1595 let Some(result) = tokio::time::timeout(remaining, pending.next())
1596 .await
1597 .ok()
1598 .flatten()
1599 else {
1600 break;
1601 };
1602 let Ok(outcome) = result else {
1603 continue;
1604 };
1605 match outcome {
1606 SourceFetchOutcome::Hit {
1607 source_id,
1608 data,
1609 elapsed_ms,
1610 } => {
1611 pending_source_ids.remove(&source_id);
1612 self.record_read_source_success(&source_id, elapsed_ms)
1613 .await;
1614 return Some(SourceFetchResult { data });
1615 }
1616 SourceFetchOutcome::Miss { source_id } => {
1617 pending_source_ids.remove(&source_id);
1618 self.record_read_source_miss(&source_id).await;
1619 }
1620 SourceFetchOutcome::Failure { source_id } => {
1621 pending_source_ids.remove(&source_id);
1622 self.record_read_source_failure(&source_id).await;
1623 }
1624 }
1625 }
1626
1627 if Instant::now() >= deadline {
1628 break;
1629 }
1630 }
1631
1632 for source_id in pending_source_ids {
1633 self.record_read_source_timeout(&source_id).await;
1634 }
1635 None
1636 }
1637
1638 async fn request_from_read_sources(&self, hash: &Hash) -> Option<Vec<u8>> {
1639 let hash_key = hash_to_key(hash);
1640 let existing_wait = {
1641 let mut inflight = self.inflight_source_fetches.lock().await;
1642 if let Some(existing) = inflight.get_mut(&hash_key) {
1643 let (tx, rx) = oneshot::channel();
1644 existing.waiters.push(tx);
1645 Some(rx)
1646 } else {
1647 inflight.insert(
1648 hash_key.clone(),
1649 InflightSourceFetch {
1650 waiters: Vec::new(),
1651 },
1652 );
1653 None
1654 }
1655 };
1656
1657 if let Some(wait) = existing_wait {
1658 return wait.await.ok().flatten().map(|result| result.data);
1659 }
1660
1661 let result = self.request_from_read_sources_inner(hash).await;
1662 if let Some(hit) = result.as_ref() {
1663 let _ = self.local_store.put(*hash, hit.data.clone()).await;
1664 }
1665
1666 let waiters = self
1667 .inflight_source_fetches
1668 .lock()
1669 .await
1670 .remove(&hash_key)
1671 .map(|inflight| inflight.waiters)
1672 .unwrap_or_default();
1673 for waiter in waiters {
1674 let _ = waiter.send(result.clone());
1675 }
1676
1677 result.map(|hit| hit.data)
1678 }
1679
1680 async fn available_read_routes(&self, context: &MeshReadContext) -> Vec<ReadRoute> {
1681 let mut routes = Vec::new();
1682 let ordered_peers = self
1683 .ordered_connected_peers(context.exclude_peer_id.as_deref())
1684 .await;
1685 if !ordered_peers.is_empty() {
1686 routes.push(ReadRoute::Peers(ordered_peers));
1687 }
1688 if !self.ordered_read_sources().await.is_empty() {
1689 routes.push(ReadRoute::Sources);
1690 }
1691 if routes.len() <= 1 {
1692 return routes;
1693 }
1694
1695 let now = Instant::now();
1696 let stats = self.read_route_stats.read().await;
1697 routes.sort_by(|left, right| {
1698 let left_stats = stats.get(left.id()).cloned().unwrap_or_default();
1699 let right_stats = stats.get(right.id()).cloned().unwrap_or_default();
1700 adaptive_source_score(&right_stats, now)
1701 .partial_cmp(&adaptive_source_score(&left_stats, now))
1702 .unwrap_or(std::cmp::Ordering::Equal)
1703 .then_with(|| left.id().cmp(right.id()))
1704 });
1705 routes
1706 }
1707
1708 async fn should_probe_multiple_routes(&self, routes: &[ReadRoute]) -> bool {
1709 if routes.len() <= 1 {
1710 return false;
1711 }
1712 let stats = self.read_route_stats.read().await;
1713 let best = stats.get(routes[0].id()).cloned().unwrap_or_default();
1714 let second = stats.get(routes[1].id()).cloned().unwrap_or_default();
1715 if !source_has_history(&best) || !source_has_history(&second) {
1716 return true;
1717 }
1718 let now = Instant::now();
1719 adaptive_source_score(&best, now) - adaptive_source_score(&second, now)
1720 < SOURCE_SCORE_TIE_DELTA
1721 }
1722
1723 async fn run_read_route(
1724 &self,
1725 hash: &Hash,
1726 route: &ReadRoute,
1727 context: &MeshReadContext,
1728 ) -> Option<Vec<u8>> {
1729 let route_id = route.id();
1730 self.record_route_request(route_id).await;
1731 let started_at = Instant::now();
1732 let result = match route {
1733 ReadRoute::Peers(peer_ids) => {
1734 self.request_from_ordered_peers(hash, peer_ids, context.request_htl)
1735 .await
1736 }
1737 ReadRoute::Sources => self.request_from_read_sources(hash).await,
1738 };
1739 if result.is_some() {
1740 self.record_route_success(route_id, started_at.elapsed().as_millis().max(1) as u64)
1741 .await;
1742 } else {
1743 self.record_route_miss(route_id).await;
1744 }
1745 result
1746 }
1747
1748 async fn request_from_mesh_with_context(
1749 &self,
1750 hash: &Hash,
1751 context: &MeshReadContext,
1752 ) -> Option<Vec<u8>> {
1753 let routes = self.available_read_routes(context).await;
1754 match routes.as_slice() {
1755 [] => None,
1756 [route] => self.run_read_route(hash, route, context).await,
1757 [first, second, ..] => {
1758 if self.should_probe_multiple_routes(&routes).await {
1759 let first_fut = self.run_read_route(hash, first, context);
1760 let second_fut = self.run_read_route(hash, second, context);
1761 tokio::pin!(first_fut);
1762 tokio::pin!(second_fut);
1763 let mut first_done = false;
1764 let mut second_done = false;
1765 loop {
1766 tokio::select! {
1767 result = &mut first_fut, if !first_done => {
1768 first_done = true;
1769 if result.is_some() {
1770 return result;
1771 }
1772 }
1773 result = &mut second_fut, if !second_done => {
1774 second_done = true;
1775 if result.is_some() {
1776 return result;
1777 }
1778 }
1779 else => break,
1780 }
1781 if first_done && second_done {
1782 break;
1783 }
1784 }
1785 None
1786 } else {
1787 if let Some(data) = self.run_read_route(hash, first, context).await {
1788 return Some(data);
1789 }
1790 self.run_read_route(hash, second, context).await
1791 }
1792 }
1793 }
1794 }
1795
1796 async fn request_from_mesh(&self, hash: &Hash) -> Option<Vec<u8>> {
1797 self.request_from_mesh_with_context(hash, &MeshReadContext::default())
1798 .await
1799 }
1800
1801 async fn begin_forward_request(&self, hash_key: &str, requester_id: &str) -> bool {
1802 let mut pending = self.pending_forward_requests.write().await;
1803 if let Some(existing) = pending.get_mut(hash_key) {
1804 existing.requester_ids.insert(requester_id.to_string());
1805 return false;
1806 }
1807
1808 let mut requester_ids = HashSet::new();
1809 requester_ids.insert(requester_id.to_string());
1810 pending.insert(
1811 hash_key.to_string(),
1812 PendingForwardRequest { requester_ids },
1813 );
1814 true
1815 }
1816
1817 async fn take_forward_requesters(&self, hash_key: &str) -> Vec<String> {
1818 self.pending_forward_requests
1819 .write()
1820 .await
1821 .remove(hash_key)
1822 .map(|pending| pending.requester_ids.into_iter().collect())
1823 .unwrap_or_default()
1824 }
1825
1826 async fn complete_pending_response(
1827 &self,
1828 from_peer: &str,
1829 hash: &Hash,
1830 hash_key: String,
1831 payload: Vec<u8>,
1832 ) {
1833 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1834 self.release_queried_peer_requests(&pending.queried_peers)
1835 .await;
1836 let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
1837 self.peer_selector.write().await.record_success(
1838 from_peer,
1839 rtt_ms,
1840 payload.len() as u64,
1841 );
1842 let forward_requesters = self.take_forward_requesters(&hash_key).await;
1843 let response_bytes = if forward_requesters.is_empty() {
1844 None
1845 } else {
1846 Some(encode_response(&create_response(hash, payload.clone())))
1847 };
1848 let _ = pending.response_tx.send(Some(payload));
1849 if let Some(response_bytes) = response_bytes {
1850 for requester_id in forward_requesters {
1851 if let Some(channel) = self.signaling.get_channel(&requester_id).await {
1852 let _ = channel.send(response_bytes.clone()).await;
1853 }
1854 }
1855 }
1856 }
1857 }
1858
1859 async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
1860 if !res.a {
1861 return;
1862 }
1863
1864 let Some(quote_id) = res.q else {
1865 return;
1866 };
1867
1868 let hash_key = hash_to_key(&res.h);
1869 let (preferred_mint_url, offered_payment_sat) = {
1870 let pending_quotes = self.pending_quotes.read().await;
1871 let Some(pending) = pending_quotes.get(&hash_key) else {
1872 return;
1873 };
1874 (
1875 pending.preferred_mint_url.clone(),
1876 pending.offered_payment_sat,
1877 )
1878 };
1879 if !self
1880 .should_accept_quote_response(
1881 from_peer,
1882 preferred_mint_url.as_deref(),
1883 offered_payment_sat,
1884 &res,
1885 )
1886 .await
1887 {
1888 return;
1889 }
1890 let mut pending_quotes = self.pending_quotes.write().await;
1891 if let Some(pending) = pending_quotes.remove(&hash_key) {
1892 let _ = pending.response_tx.send(Some(NegotiatedQuote {
1893 peer_id: from_peer.to_string(),
1894 quote_id,
1895 mint_url: res.m,
1896 }));
1897 }
1898 }
1899
1900 async fn handle_response_message(&self, from_peer: &str, res: crate::protocol::DataResponse) {
1901 let hash_key = hash_to_key(&res.h);
1902 let hash = match crate::protocol::bytes_to_hash(&res.h) {
1903 Some(h) => h,
1904 None => return,
1905 };
1906
1907 if hashtree_core::sha256(&res.d) != hash {
1909 self.peer_selector.write().await.record_failure(from_peer);
1910 if self.debug {
1911 println!("[MeshStoreCore] Ignoring invalid response payload for {hash_key}");
1912 }
1913 return;
1914 }
1915
1916 self.complete_pending_response(from_peer, &hash, hash_key, res.d)
1917 .await;
1918 }
1919
1920 async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
1921 let hash = match crate::protocol::bytes_to_hash(&req.h) {
1922 Some(h) => h,
1923 None => return,
1924 };
1925 let hash_key = hash_to_key(&hash);
1926
1927 {
1928 let selector = self.peer_selector.read().await;
1929 if self.should_refuse_requests_from_peer(&selector, from_peer) {
1930 if self.debug {
1931 println!(
1932 "[MeshStoreCore] Refusing quote request from delinquent peer {}",
1933 from_peer
1934 );
1935 }
1936 return;
1937 }
1938 }
1939
1940 let chosen_mint = self.choose_quote_mint(req.m.as_deref());
1941 let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
1942 && !self.should_drop_response(&hash)
1943 && !self.should_corrupt_response(&hash);
1944
1945 let res = if can_serve {
1946 let quote_id = self
1947 .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
1948 .await;
1949 create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
1950 } else {
1951 create_quote_response_unavailable(&hash)
1952 };
1953 let response_bytes = encode_quote_response(&res);
1954 if let Some(channel) = self.signaling.get_channel(from_peer).await {
1955 let _ = channel.send(response_bytes).await;
1956 }
1957 }
1958
1959 async fn handle_request_message(
1960 self: &Arc<Self>,
1961 from_peer: &str,
1962 req: crate::protocol::DataRequest,
1963 ) {
1964 let hash = match crate::protocol::bytes_to_hash(&req.h) {
1965 Some(h) => h,
1966 None => return,
1967 };
1968 let hash_key = hash_to_key(&hash);
1969
1970 {
1971 let selector = self.peer_selector.read().await;
1972 if self.should_refuse_requests_from_peer(&selector, from_peer) {
1973 if self.debug {
1974 println!(
1975 "[MeshStoreCore] Refusing request from delinquent peer {}",
1976 from_peer
1977 );
1978 }
1979 return;
1980 }
1981 }
1982
1983 if let Some(quote_id) = req.q {
1984 if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
1985 if self.debug {
1986 println!(
1987 "[MeshStoreCore] Refusing request with invalid or expired quote {} from {}",
1988 quote_id, from_peer
1989 );
1990 }
1991 return;
1992 }
1993 }
1994
1995 if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
1997 if self.should_drop_response(&hash) {
1998 if self.debug {
1999 println!(
2000 "[MeshStoreCore] Dropping response for {} due to actor profile",
2001 hash_to_key(&hash)
2002 );
2003 }
2004 return;
2005 }
2006
2007 let behavior = self.response_behavior();
2008 if behavior.extra_delay_ms > 0 {
2009 tokio::time::sleep(Duration::from_millis(behavior.extra_delay_ms)).await;
2010 }
2011
2012 if self.should_corrupt_response(&hash) {
2013 if data.is_empty() {
2014 data.push(0x80);
2015 } else {
2016 data[0] ^= 0x80;
2017 }
2018 }
2019
2020 let res = create_response(&hash, data);
2022 let response_bytes = encode_response(&res);
2023 if let Some(channel) = self.signaling.get_channel(from_peer).await {
2024 let _ = channel.send(response_bytes).await;
2025 }
2026 return;
2027 }
2028
2029 if self.pending_requests.read().await.contains_key(&hash_key) {
2030 let _ = self.begin_forward_request(&hash_key, from_peer).await;
2031 return;
2032 }
2033
2034 if !self.begin_forward_request(&hash_key, from_peer).await {
2035 return;
2036 }
2037
2038 let from_peer = from_peer.to_string();
2039 let this = Arc::clone(self);
2040 tokio::spawn(async move {
2041 let context = MeshReadContext {
2042 exclude_peer_id: Some(from_peer.clone()),
2043 request_htl: req.htl,
2044 };
2045 let result = this.request_from_mesh_with_context(&hash, &context).await;
2046 let requester_ids = this.take_forward_requesters(&hash_key).await;
2047 if let Some(data) = result {
2048 let res = create_response(&hash, data);
2049 let response_bytes = encode_response(&res);
2050 for requester_id in requester_ids {
2051 if let Some(channel) = this.signaling.get_channel(&requester_id).await {
2052 let _ = channel.send(response_bytes.clone()).await;
2053 }
2054 }
2055 }
2056 });
2057 }
2058
2059 pub async fn handle_data_message(self: &Arc<Self>, from_peer: &str, data: &[u8]) {
2061 let parsed = match parse_message(data) {
2062 Some(m) => m,
2063 None => return,
2064 };
2065
2066 match parsed {
2067 DataMessage::Request(req) => {
2068 self.handle_request_message(from_peer, req).await;
2069 }
2070 DataMessage::Response(res) => {
2071 self.handle_response_message(from_peer, res).await;
2072 }
2073 DataMessage::QuoteRequest(req) => {
2074 self.handle_quote_request_message(from_peer, req).await;
2075 }
2076 DataMessage::QuoteResponse(res) => {
2077 self.handle_quote_response_message(from_peer, res).await;
2078 }
2079 DataMessage::Payment(_) | DataMessage::PaymentAck(_) | DataMessage::Chunk(_) => {}
2080 }
2081 }
2082}
2083
2084#[async_trait]
2085impl<S, R, F> Store for MeshStoreCore<S, R, F>
2086where
2087 S: Store + Send + Sync + 'static,
2088 R: SignalingTransport + Send + Sync + 'static,
2089 F: PeerLinkFactory + Send + Sync + 'static,
2090{
2091 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2092 self.local_store.put(hash, data).await
2093 }
2094
2095 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2096 if let Some(data) = self.local_store.get(hash).await? {
2098 return Ok(Some(data));
2099 }
2100
2101 Ok(self.request_from_mesh(hash).await)
2103 }
2104
2105 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2106 self.local_store.has(hash).await
2107 }
2108
2109 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2110 self.local_store.delete(hash).await
2111 }
2112}
2113
2114#[cfg(test)]
2115mod tests;
2116
2117pub type SimMeshStore<S> =
2119 MeshStoreCore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
2120
2121pub type ProductionMeshStore<S> =
2123 MeshStoreCore<S, crate::nostr::NostrRelayTransport, crate::real_factory::WebRtcPeerLinkFactory>;