1use async_trait::async_trait;
8use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
9use std::collections::hash_map::DefaultHasher;
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::hash::{Hash as _, Hasher};
13use std::ops::Range;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::{oneshot, Mutex, RwLock};
17
18use hashtree_core::{Hash, Store, StoreError};
19
20use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
21use crate::protocol::{
22 create_quote_request, create_quote_response_available, create_quote_response_unavailable,
23 create_request, create_request_with_quote, create_response, encode_quote_request,
24 encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
25 DataMessage, DataQuoteRequest, DataQuoteResponse,
26};
27use crate::signaling::MeshRouter;
28use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
29use crate::types::{PeerHTLConfig, SignalingMessage, MAX_HTL};
30
31const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
34
35struct PendingRequest {
37 response_tx: oneshot::Sender<Option<Vec<u8>>>,
38 started_at: Instant,
39 queried_peers: Vec<String>,
40}
41
42struct PendingQuoteRequest {
43 response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
44 preferred_mint_url: Option<String>,
45 offered_payment_sat: u64,
46}
47
48struct PendingForwardRequest {
49 requester_ids: HashSet<String>,
50}
51
52#[async_trait]
53pub trait MeshReadSource: Send + Sync {
54 fn id(&self) -> &str;
55
56 fn is_available(&self) -> bool {
57 true
58 }
59
60 async fn get(&self, hash: &Hash) -> Option<Vec<u8>>;
61}
62
63#[derive(Debug, Clone)]
64struct NegotiatedQuote {
65 peer_id: String,
66 quote_id: u64,
67 #[allow(dead_code)]
68 mint_url: Option<String>,
69}
70
71struct IssuedQuote {
72 expires_at: Instant,
73 #[allow(dead_code)]
74 payment_sat: u64,
75 #[allow(dead_code)]
76 mint_url: Option<String>,
77}
78
79#[derive(Debug, Clone, Default)]
80struct AdaptiveSourceStats {
81 requests: u64,
82 successes: u64,
83 misses: u64,
84 failures: u64,
85 timeouts: u64,
86 srtt_ms: f64,
87 rttvar_ms: f64,
88 backoff_level: u32,
89 backed_off_until: Option<Instant>,
90 last_success_at: Option<Instant>,
91 last_failure_at: Option<Instant>,
92}
93
94#[derive(Debug, Clone)]
95enum RouteFetchOutcome {
96 Hit(Vec<u8>),
97 Miss,
98 Timeout,
99}
100
101struct InflightSourceFetch {
102 waiters: Vec<oneshot::Sender<RouteFetchOutcome>>,
103}
104
105enum SourceFetchOutcome {
106 Hit {
107 source_id: String,
108 data: Vec<u8>,
109 elapsed_ms: u64,
110 },
111 Miss {
112 source_id: String,
113 },
114 Failure {
115 source_id: String,
116 },
117}
118
119const INITIAL_SOURCE_BACKOFF_MS: u64 = 250;
120const MAX_SOURCE_BACKOFF_MS: u64 = 10_000;
121const SOURCE_SCORE_TIE_DELTA: f64 = 0.15;
122const RECENT_SOURCE_SUCCESS_WINDOW: Duration = Duration::from_secs(60);
123const ACTIVE_PEER_REQUEST_RANK_PENALTY: usize = 3;
124
125fn source_reliability_score(stats: &AdaptiveSourceStats) -> f64 {
126 (stats.successes as f64 + 1.0) / (stats.requests as f64 + 2.0)
127}
128
129fn source_latency_score(stats: &AdaptiveSourceStats) -> f64 {
130 if stats.srtt_ms <= 0.0 {
131 return 0.5;
132 }
133 (500.0 / (stats.srtt_ms + 50.0)).min(1.0)
134}
135
136fn source_has_history(stats: &AdaptiveSourceStats) -> bool {
137 stats.requests > 0
138 || stats.successes > 0
139 || stats.misses > 0
140 || stats.failures > 0
141 || stats.timeouts > 0
142}
143
144fn adaptive_source_score(stats: &AdaptiveSourceStats, now: Instant) -> f64 {
145 if let Some(backed_off_until) = stats.backed_off_until {
146 if backed_off_until > now {
147 return f64::NEG_INFINITY;
148 }
149 }
150
151 let miss_penalty = if stats.requests > 0 {
152 (stats.misses as f64 / stats.requests as f64) * 0.15
153 } else {
154 0.0
155 };
156 let failure_penalty = if stats.requests > 0 {
157 ((stats.failures + stats.timeouts) as f64 / stats.requests as f64) * 0.3
158 } else {
159 0.0
160 };
161 let recency_bonus = if stats
162 .last_success_at
163 .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
164 {
165 0.1
166 } else {
167 0.0
168 };
169
170 0.6 * source_reliability_score(stats) + 0.3 * source_latency_score(stats) + recency_bonus
171 - miss_penalty
172 - failure_penalty
173}
174
175#[derive(Clone)]
176enum ReadRoute {
177 Peers(Vec<String>),
178 Sources,
179}
180
181impl ReadRoute {
182 fn id(&self) -> &'static str {
183 match self {
184 Self::Peers(_) => "peers",
185 Self::Sources => "sources",
186 }
187 }
188}
189
190#[derive(Debug, Clone)]
191struct MeshReadContext {
192 exclude_peer_id: Option<String>,
193 request_htl: u8,
194}
195
196impl Default for MeshReadContext {
197 fn default() -> Self {
198 Self {
199 exclude_peer_id: None,
200 request_htl: MAX_HTL,
201 }
202 }
203}
204
205#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
207pub struct DataPumpStats {
208 pub processed: usize,
209 pub request_messages: usize,
210 pub response_messages: usize,
211 pub quote_request_messages: u64,
212 pub quote_response_messages: u64,
213 pub processed_bytes: u64,
214}
215
216#[derive(Debug, Clone, Copy)]
222pub struct RequestDispatchConfig {
223 pub initial_fanout: usize,
225 pub hedge_fanout: usize,
227 pub max_fanout: usize,
229 pub hedge_interval_ms: u64,
231}
232
233impl Default for RequestDispatchConfig {
234 fn default() -> Self {
235 Self {
236 initial_fanout: usize::MAX,
237 hedge_fanout: usize::MAX,
238 max_fanout: usize::MAX,
239 hedge_interval_ms: 0,
240 }
241 }
242}
243
244pub fn normalize_dispatch_config(
246 dispatch: RequestDispatchConfig,
247 available_peers: usize,
248) -> RequestDispatchConfig {
249 let mut cfg = dispatch;
250 let cap = if cfg.max_fanout == 0 {
251 available_peers
252 } else {
253 cfg.max_fanout.min(available_peers)
254 };
255 cfg.max_fanout = cap;
256 cfg.initial_fanout = if cfg.initial_fanout == 0 {
257 1
258 } else {
259 cfg.initial_fanout.min(cap.max(1))
260 };
261 cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
262 1
263 } else {
264 cfg.hedge_fanout.min(cap.max(1))
265 };
266 cfg
267}
268
269pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
271 if peer_count == 0 {
272 return Vec::new();
273 }
274 let cap = dispatch.max_fanout.min(peer_count);
275 if cap == 0 {
276 return Vec::new();
277 }
278
279 let mut plan = Vec::new();
280 let mut sent = 0usize;
281 let first = dispatch.initial_fanout.min(cap).max(1);
282 plan.push(first);
283 sent += first;
284
285 while sent < cap {
286 let next = dispatch.hedge_fanout.min(cap - sent).max(1);
287 plan.push(next);
288 sent += next;
289 }
290 plan
291}
292
293#[derive(Debug)]
295pub enum HedgedWaveAction<T> {
296 Continue,
297 Success(T),
298 Abort,
299}
300
301pub async fn run_hedged_waves<T, SendWave, SendWaveFut, WaitWave, WaitWaveFut>(
306 peer_count: usize,
307 dispatch: RequestDispatchConfig,
308 request_timeout: Duration,
309 mut send_wave: SendWave,
310 mut wait_wave: WaitWave,
311) -> Option<T>
312where
313 SendWave: FnMut(Range<usize>) -> SendWaveFut,
314 SendWaveFut: Future<Output = usize>,
315 WaitWave: FnMut(Duration) -> WaitWaveFut,
316 WaitWaveFut: Future<Output = HedgedWaveAction<T>>,
317{
318 let dispatch = normalize_dispatch_config(dispatch, peer_count);
319 let wave_plan = build_hedged_wave_plan(peer_count, dispatch);
320 if wave_plan.is_empty() {
321 return None;
322 }
323
324 let deadline = Instant::now() + request_timeout;
325 let mut sent_total = 0usize;
326 let mut next_peer_idx = 0usize;
327
328 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
329 let from = next_peer_idx;
330 let to = (next_peer_idx + wave_size).min(peer_count);
331 next_peer_idx = to;
332
333 if from == to {
334 continue;
335 }
336
337 sent_total += send_wave(from..to).await;
338 if sent_total == 0 {
339 if next_peer_idx >= peer_count {
340 break;
341 }
342 continue;
343 }
344
345 let now = Instant::now();
346 if now >= deadline {
347 break;
348 }
349 let remaining = deadline.saturating_duration_since(now);
350 let is_last_wave = wave_idx + 1 == wave_plan.len() || next_peer_idx >= peer_count;
351 let wait = if is_last_wave {
352 remaining
353 } else if dispatch.hedge_interval_ms == 0 {
354 Duration::ZERO
355 } else {
356 Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
357 };
358
359 if wait.is_zero() {
360 continue;
361 }
362
363 match wait_wave(wait).await {
364 HedgedWaveAction::Continue => {}
365 HedgedWaveAction::Success(value) => return Some(value),
366 HedgedWaveAction::Abort => break,
367 }
368 }
369
370 None
371}
372
373pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
375 let mut selector = selector.write().await;
376 let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
377 let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
378 for peer_id in known {
379 if !current.contains(peer_id.as_str()) {
380 selector.remove_peer(&peer_id);
381 }
382 }
383 for peer_id in current_peer_ids {
384 selector.add_peer(peer_id.clone());
385 }
386}
387
388#[derive(Debug, Clone, Copy)]
392pub struct ResponseBehaviorConfig {
393 pub drop_response_prob: f64,
395 pub corrupt_response_prob: f64,
397 pub extra_delay_ms: u64,
399 pub first_byte_delay_ms: u64,
401 pub bytes_per_second: u64,
403 pub stall_response_prob: f64,
405 pub stall_delay_ms: u64,
407}
408
409impl Default for ResponseBehaviorConfig {
410 fn default() -> Self {
411 Self {
412 drop_response_prob: 0.0,
413 corrupt_response_prob: 0.0,
414 extra_delay_ms: 0,
415 first_byte_delay_ms: 0,
416 bytes_per_second: 0,
417 stall_response_prob: 0.0,
418 stall_delay_ms: 0,
419 }
420 }
421}
422
423impl ResponseBehaviorConfig {
424 fn normalized(self) -> Self {
425 Self {
426 drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
427 corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
428 extra_delay_ms: self.extra_delay_ms,
429 first_byte_delay_ms: self.first_byte_delay_ms,
430 bytes_per_second: self.bytes_per_second,
431 stall_response_prob: self.stall_response_prob.clamp(0.0, 1.0),
432 stall_delay_ms: self.stall_delay_ms,
433 }
434 }
435}
436
437#[derive(Debug, Clone)]
439pub struct MeshRoutingConfig {
440 pub selection_strategy: SelectionStrategy,
441 pub fairness_enabled: bool,
442 pub cashu_payment_weight: f64,
444 pub cashu_payment_default_block_threshold: u64,
447 pub cashu_accepted_mints: Vec<String>,
449 pub cashu_default_mint: Option<String>,
451 pub cashu_peer_suggested_mint_base_cap_sat: u64,
453 pub cashu_peer_suggested_mint_success_step_sat: u64,
455 pub cashu_peer_suggested_mint_receipt_step_sat: u64,
457 pub cashu_peer_suggested_mint_max_cap_sat: u64,
459 pub dispatch: RequestDispatchConfig,
460 pub response_behavior: ResponseBehaviorConfig,
461}
462
463impl Default for MeshRoutingConfig {
464 fn default() -> Self {
465 Self {
466 selection_strategy: SelectionStrategy::Weighted,
467 fairness_enabled: true,
468 cashu_payment_weight: 0.0,
469 cashu_payment_default_block_threshold: 0,
470 cashu_accepted_mints: Vec::new(),
471 cashu_default_mint: None,
472 cashu_peer_suggested_mint_base_cap_sat: 0,
473 cashu_peer_suggested_mint_success_step_sat: 0,
474 cashu_peer_suggested_mint_receipt_step_sat: 0,
475 cashu_peer_suggested_mint_max_cap_sat: 0,
476 dispatch: RequestDispatchConfig::default(),
477 response_behavior: ResponseBehaviorConfig::default(),
478 }
479 }
480}
481
482pub struct MeshStoreCore<S, R, F>
489where
490 S: Store + Send + Sync + 'static,
491 R: SignalingTransport + Send + Sync + 'static,
492 F: PeerLinkFactory + Send + Sync + 'static,
493{
494 local_store: Arc<S>,
496 signaling: Arc<MeshRouter<R, F>>,
498 htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
500 pending_requests: RwLock<HashMap<String, PendingRequest>>,
502 pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
504 pending_forward_requests: RwLock<HashMap<String, PendingForwardRequest>>,
506 issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
508 next_quote_id: RwLock<u64>,
510 read_sources: RwLock<HashMap<String, Arc<dyn MeshReadSource>>>,
512 read_source_stats: RwLock<HashMap<String, AdaptiveSourceStats>>,
514 inflight_source_fetches: Mutex<HashMap<String, InflightSourceFetch>>,
516 read_route_stats: RwLock<HashMap<String, AdaptiveSourceStats>>,
518 peer_selector: RwLock<PeerSelector>,
520 peer_active_requests: RwLock<HashMap<String, usize>>,
522 routing: MeshRoutingConfig,
524 request_timeout: Duration,
526 debug: bool,
528 running: RwLock<bool>,
530}
531
532impl<S, R, F> MeshStoreCore<S, R, F>
533where
534 S: Store + Send + Sync + 'static,
535 R: SignalingTransport + Send + Sync + 'static,
536 F: PeerLinkFactory + Send + Sync + 'static,
537{
538 pub fn new(
540 local_store: Arc<S>,
541 signaling: Arc<MeshRouter<R, F>>,
542 request_timeout: Duration,
543 debug: bool,
544 ) -> Self {
545 Self::new_with_routing(
546 local_store,
547 signaling,
548 request_timeout,
549 debug,
550 Default::default(),
551 )
552 }
553
554 pub fn new_with_routing(
556 local_store: Arc<S>,
557 signaling: Arc<MeshRouter<R, F>>,
558 request_timeout: Duration,
559 debug: bool,
560 routing: MeshRoutingConfig,
561 ) -> Self {
562 let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
563 selector.set_fairness(routing.fairness_enabled);
564 selector.set_cashu_payment_weight(routing.cashu_payment_weight);
565 Self {
566 local_store,
567 signaling,
568 htl_configs: RwLock::new(HashMap::new()),
569 pending_requests: RwLock::new(HashMap::new()),
570 pending_quotes: RwLock::new(HashMap::new()),
571 pending_forward_requests: RwLock::new(HashMap::new()),
572 issued_quotes: RwLock::new(HashMap::new()),
573 next_quote_id: RwLock::new(1),
574 read_sources: RwLock::new(HashMap::new()),
575 read_source_stats: RwLock::new(HashMap::new()),
576 inflight_source_fetches: Mutex::new(HashMap::new()),
577 read_route_stats: RwLock::new(HashMap::new()),
578 peer_selector: RwLock::new(selector),
579 peer_active_requests: RwLock::new(HashMap::new()),
580 routing,
581 request_timeout,
582 debug,
583 running: RwLock::new(false),
584 }
585 }
586
587 pub async fn start(&self) -> Result<(), TransportError> {
589 *self.running.write().await = true;
590
591 self.signaling.send_hello(vec![]).await?;
593
594 Ok(())
595 }
596
597 pub async fn stop(&self) {
599 *self.running.write().await = false;
600 }
601
602 pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
604 let peer_id = msg.peer_id().to_string();
606 {
607 let mut configs = self.htl_configs.write().await;
608 if !configs.contains_key(&peer_id) {
609 configs.insert(peer_id.clone(), PeerHTLConfig::random());
610 }
611 }
612 self.peer_selector.write().await.add_peer(peer_id);
613
614 self.signaling.handle_message(msg).await
615 }
616
617 pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
619 &self.signaling
620 }
621
622 fn response_behavior(&self) -> ResponseBehaviorConfig {
623 self.routing.response_behavior.normalized()
624 }
625
626 fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
627 let mut hasher = DefaultHasher::new();
628 peer_id.hash(&mut hasher);
629 hash.hash(&mut hasher);
630 salt.hash(&mut hasher);
631 let v = hasher.finish();
632 (v as f64) / (u64::MAX as f64)
633 }
634
635 fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
636 Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
637 }
638
639 fn peer_metadata_pointer_slot_hash() -> Hash {
640 hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
641 }
642
643 fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
644 let bytes = hex::decode(hash_hex)
645 .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
646 if bytes.len() != 32 {
647 return Err(StoreError::Other(format!(
648 "Invalid hash length {}, expected 32 bytes",
649 bytes.len()
650 )));
651 }
652 let mut hash = [0u8; 32];
653 hash.copy_from_slice(&bytes);
654 Ok(hash)
655 }
656
657 fn should_drop_response(&self, hash: &Hash) -> bool {
658 let p = self.response_behavior().drop_response_prob;
659 if p <= 0.0 {
660 return false;
661 }
662 self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
663 }
664
665 fn should_corrupt_response(&self, hash: &Hash) -> bool {
666 let p = self.response_behavior().corrupt_response_prob;
667 if p <= 0.0 {
668 return false;
669 }
670 self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
671 }
672
673 fn should_stall_response(&self, hash: &Hash) -> bool {
674 let p = self.response_behavior().stall_response_prob;
675 if p <= 0.0 {
676 return false;
677 }
678 self.deterministic_actor_draw(hash, 0x5A_11_5A_11_5A_11_5A_11) < p
679 }
680
681 fn response_send_delay(&self, hash: &Hash, payload_len: usize) -> Duration {
682 let behavior = self.response_behavior();
683 let mut total_ms = behavior
684 .extra_delay_ms
685 .saturating_add(behavior.first_byte_delay_ms);
686
687 if behavior.bytes_per_second > 0 && payload_len > 0 {
688 let throughput_ms = ((payload_len as u128) * 1000)
689 .div_ceil(behavior.bytes_per_second as u128)
690 .min(u64::MAX as u128) as u64;
691 total_ms = total_ms.saturating_add(throughput_ms);
692 }
693
694 if behavior.stall_delay_ms > 0 && self.should_stall_response(hash) {
695 total_ms = total_ms.saturating_add(behavior.stall_delay_ms);
696 }
697
698 Duration::from_millis(total_ms)
699 }
700
701 async fn ordered_connected_peers(&self, exclude_peer_id: Option<&str>) -> Vec<String> {
702 let current_peer_ids = self.signaling.peer_ids().await;
703 if current_peer_ids.is_empty() {
704 return Vec::new();
705 }
706
707 sync_selector_peers(&self.peer_selector, ¤t_peer_ids).await;
708 let mut candidate_peer_ids: Vec<String> = current_peer_ids
709 .into_iter()
710 .filter(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude))
711 .collect();
712 if candidate_peer_ids.is_empty() {
713 return Vec::new();
714 }
715
716 let current_set: HashSet<&str> = candidate_peer_ids.iter().map(String::as_str).collect();
717 let mut selector = self.peer_selector.write().await;
718 let mut selector_order = selector.select_peers();
719 selector_order.retain(|peer_id| current_set.contains(peer_id.as_str()));
720 if selector_order.is_empty() {
721 let mut fallback = candidate_peer_ids;
722 fallback.sort();
723 return fallback;
724 }
725 let backed_off: HashMap<String, bool> = candidate_peer_ids
726 .iter()
727 .map(|peer_id| (peer_id.clone(), selector.is_peer_backed_off(peer_id)))
728 .collect();
729 drop(selector);
730
731 let rank: HashMap<&str, usize> = selector_order
732 .iter()
733 .enumerate()
734 .map(|(idx, peer_id)| (peer_id.as_str(), idx))
735 .collect();
736 let active = self.peer_active_requests.read().await;
737 candidate_peer_ids.sort_by(|left, right| {
738 let left_backed_off = backed_off.get(left).copied().unwrap_or(false);
739 let right_backed_off = backed_off.get(right).copied().unwrap_or(false);
740 if left_backed_off != right_backed_off {
741 return if left_backed_off {
742 std::cmp::Ordering::Greater
743 } else {
744 std::cmp::Ordering::Less
745 };
746 }
747 let left_rank = rank.get(left.as_str()).copied().unwrap_or(usize::MAX / 2);
748 let right_rank = rank.get(right.as_str()).copied().unwrap_or(usize::MAX / 2);
749 let left_load = active.get(left).copied().unwrap_or(0);
750 let right_load = active.get(right).copied().unwrap_or(0);
751 (left_rank + left_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY))
752 .cmp(&(right_rank + right_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY)))
753 .then_with(|| left.cmp(right))
754 });
755 candidate_peer_ids
756 }
757
758 async fn reserve_peer_request(&self, peer_id: &str) {
759 let mut active = self.peer_active_requests.write().await;
760 *active.entry(peer_id.to_string()).or_insert(0) += 1;
761 }
762
763 async fn release_peer_request(&self, peer_id: &str) {
764 let mut active = self.peer_active_requests.write().await;
765 let Some(count) = active.get_mut(peer_id) else {
766 return;
767 };
768 if *count <= 1 {
769 active.remove(peer_id);
770 } else {
771 *count -= 1;
772 }
773 }
774
775 async fn release_queried_peer_requests(&self, peer_ids: &[String]) {
776 for peer_id in peer_ids {
777 self.release_peer_request(peer_id).await;
778 }
779 }
780
781 fn requested_quote_mint(&self) -> Option<&str> {
782 if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
783 if self.routing.cashu_accepted_mints.is_empty()
784 || self
785 .routing
786 .cashu_accepted_mints
787 .iter()
788 .any(|mint| mint == default_mint)
789 {
790 return Some(default_mint);
791 }
792 }
793
794 self.routing
795 .cashu_accepted_mints
796 .first()
797 .map(String::as_str)
798 }
799
800 fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
801 if let Some(requested_mint) = requested_mint {
802 if self.accepts_quote_mint(Some(requested_mint)) {
803 return Some(requested_mint.to_string());
804 }
805 }
806 if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
807 return Some(default_mint.clone());
808 }
809 if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
810 return Some(first_mint.clone());
811 }
812 requested_mint.map(str::to_string)
813 }
814
815 fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
816 if self.routing.cashu_accepted_mints.is_empty() {
817 return true;
818 }
819
820 let Some(mint_url) = mint_url else {
821 return false;
822 };
823 self.routing
824 .cashu_accepted_mints
825 .iter()
826 .any(|mint| mint == mint_url)
827 }
828
829 fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
830 let Some(mint_url) = mint_url else {
831 return self.routing.cashu_default_mint.is_none()
832 && self.routing.cashu_accepted_mints.is_empty();
833 };
834 self.routing.cashu_default_mint.as_deref() == Some(mint_url)
835 || self
836 .routing
837 .cashu_accepted_mints
838 .iter()
839 .any(|mint| mint == mint_url)
840 }
841
842 async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
843 let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
844 if base == 0 {
845 return 0;
846 }
847
848 let selector = self.peer_selector.read().await;
849 let Some(stats) = selector.get_stats(peer_id) else {
850 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
851 return if max_cap > 0 { base.min(max_cap) } else { base };
852 };
853
854 if stats.cashu_payment_defaults > 0
855 && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
856 {
857 return 0;
858 }
859
860 let success_bonus = stats
861 .successes
862 .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
863 let receipt_bonus = stats
864 .cashu_payment_receipts
865 .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
866 let mut cap = base
867 .saturating_add(success_bonus)
868 .saturating_add(receipt_bonus);
869 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
870 if max_cap > 0 {
871 cap = cap.min(max_cap);
872 }
873 cap
874 }
875
876 async fn should_accept_quote_response(
877 &self,
878 from_peer: &str,
879 preferred_mint_url: Option<&str>,
880 offered_payment_sat: u64,
881 res: &DataQuoteResponse,
882 ) -> bool {
883 let Some(payment_sat) = res.p else {
884 return false;
885 };
886 if payment_sat > offered_payment_sat {
887 return false;
888 }
889
890 let response_mint = res.m.as_deref();
891 if response_mint == preferred_mint_url {
892 return true;
893 }
894 if self.trusts_quote_mint(response_mint) {
895 return true;
896 }
897 if response_mint.is_none() {
898 return false;
899 }
900
901 payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
902 }
903
904 async fn issue_quote(
905 &self,
906 peer_id: &str,
907 hash_key: &str,
908 payment_sat: u64,
909 ttl_ms: u32,
910 mint_url: Option<&str>,
911 ) -> u64 {
912 let quote_id = {
913 let mut next = self.next_quote_id.write().await;
914 let quote_id = *next;
915 *next = next.saturating_add(1);
916 quote_id
917 };
918
919 let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
920 self.issued_quotes.write().await.insert(
921 (peer_id.to_string(), hash_key.to_string(), quote_id),
922 IssuedQuote {
923 expires_at,
924 payment_sat,
925 mint_url: mint_url.map(str::to_string),
926 },
927 );
928 quote_id
929 }
930
931 async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
932 let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
933 let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
934 return false;
935 };
936 quote.expires_at > Instant::now()
937 }
938
939 async fn send_request_to_peer(
940 &self,
941 peer_id: &str,
942 hash: &Hash,
943 request_htl: u8,
944 quote_id: Option<u64>,
945 ) -> bool {
946 let channel = match self.signaling.get_channel(peer_id).await {
947 Some(c) => c,
948 None => return false,
949 };
950
951 let htl_config = {
952 let configs = self.htl_configs.read().await;
953 configs
954 .get(peer_id)
955 .cloned()
956 .unwrap_or_else(PeerHTLConfig::random)
957 };
958
959 let send_htl = htl_config.decrement(request_htl);
960 let req = match quote_id {
961 Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
962 None => create_request(hash, send_htl),
963 };
964 let request_bytes = encode_request(&req);
965
966 {
967 let mut selector = self.peer_selector.write().await;
968 selector.record_request(peer_id, request_bytes.len() as u64);
969 }
970
971 match channel.send(request_bytes).await {
972 Ok(()) => true,
973 Err(_) => {
974 self.peer_selector.write().await.record_failure(peer_id);
975 false
976 }
977 }
978 }
979
980 async fn send_quote_request_to_peer(
981 &self,
982 peer_id: &str,
983 hash: &Hash,
984 payment_sat: u64,
985 ttl_ms: u32,
986 mint_url: Option<&str>,
987 ) -> bool {
988 let channel = match self.signaling.get_channel(peer_id).await {
989 Some(c) => c,
990 None => return false,
991 };
992
993 let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
994 let request_bytes = encode_quote_request(&req);
995
996 match channel.send(request_bytes).await {
997 Ok(()) => true,
998 Err(_) => false,
999 }
1000 }
1001
1002 pub async fn set_read_sources(&self, sources: Vec<Arc<dyn MeshReadSource>>) {
1003 let mut by_id = HashMap::new();
1004 let mut stats = self.read_source_stats.write().await;
1005 for source in sources {
1006 let source_id = source.id().to_string();
1007 by_id.insert(source_id.clone(), source);
1008 stats
1009 .entry(source_id)
1010 .or_insert_with(AdaptiveSourceStats::default);
1011 }
1012 *self.read_sources.write().await = by_id;
1013 }
1014
1015 fn route_stats_for<'a>(
1016 stats: &'a mut HashMap<String, AdaptiveSourceStats>,
1017 route_id: &str,
1018 ) -> &'a mut AdaptiveSourceStats {
1019 stats
1020 .entry(route_id.to_string())
1021 .or_insert_with(AdaptiveSourceStats::default)
1022 }
1023
1024 async fn record_route_request(&self, route_id: &str) {
1025 let mut stats = self.read_route_stats.write().await;
1026 Self::route_stats_for(&mut stats, route_id).requests += 1;
1027 }
1028
1029 async fn record_route_success(&self, route_id: &str, elapsed_ms: u64) {
1030 let now = Instant::now();
1031 let mut stats = self.read_route_stats.write().await;
1032 let stats = Self::route_stats_for(&mut stats, route_id);
1033 stats.successes += 1;
1034 stats.last_success_at = Some(now);
1035 stats.backoff_level = 0;
1036 stats.backed_off_until = None;
1037 if stats.srtt_ms <= 0.0 {
1038 stats.srtt_ms = elapsed_ms as f64;
1039 stats.rttvar_ms = elapsed_ms as f64 / 2.0;
1040 return;
1041 }
1042 let elapsed = elapsed_ms as f64;
1043 stats.rttvar_ms = 0.75 * stats.rttvar_ms + 0.25 * (stats.srtt_ms - elapsed).abs();
1044 stats.srtt_ms = 0.875 * stats.srtt_ms + 0.125 * elapsed;
1045 }
1046
1047 async fn record_route_miss(&self, route_id: &str) {
1048 let mut stats = self.read_route_stats.write().await;
1049 Self::route_stats_for(&mut stats, route_id).misses += 1;
1050 }
1051
1052 async fn record_route_timeout(&self, route_id: &str) {
1053 let mut stats = self.read_route_stats.write().await;
1054 Self::route_stats_for(&mut stats, route_id).timeouts += 1;
1055 }
1056
1057 async fn record_read_source_request(&self, source_id: &str) {
1058 let mut stats = self.read_source_stats.write().await;
1059 stats
1060 .entry(source_id.to_string())
1061 .or_insert_with(AdaptiveSourceStats::default)
1062 .requests += 1;
1063 }
1064
1065 async fn record_read_source_miss(&self, source_id: &str) {
1066 let mut stats = self.read_source_stats.write().await;
1067 stats
1068 .entry(source_id.to_string())
1069 .or_insert_with(AdaptiveSourceStats::default)
1070 .misses += 1;
1071 }
1072
1073 async fn record_read_source_success(&self, source_id: &str, elapsed_ms: u64) {
1074 let now = Instant::now();
1075 let mut stats = self.read_source_stats.write().await;
1076 let stats = stats
1077 .entry(source_id.to_string())
1078 .or_insert_with(AdaptiveSourceStats::default);
1079 stats.successes += 1;
1080 stats.last_success_at = Some(now);
1081 stats.backoff_level = 0;
1082 stats.backed_off_until = None;
1083 if stats.srtt_ms <= 0.0 {
1084 stats.srtt_ms = elapsed_ms as f64;
1085 stats.rttvar_ms = elapsed_ms as f64 / 2.0;
1086 return;
1087 }
1088 let elapsed = elapsed_ms as f64;
1089 stats.rttvar_ms = 0.75 * stats.rttvar_ms + 0.25 * (stats.srtt_ms - elapsed).abs();
1090 stats.srtt_ms = 0.875 * stats.srtt_ms + 0.125 * elapsed;
1091 }
1092
1093 async fn record_read_source_failure(&self, source_id: &str) {
1094 let now = Instant::now();
1095 let mut stats = self.read_source_stats.write().await;
1096 let stats = stats
1097 .entry(source_id.to_string())
1098 .or_insert_with(AdaptiveSourceStats::default);
1099 stats.failures += 1;
1100 stats.last_failure_at = Some(now);
1101 Self::apply_source_backoff(stats, now);
1102 }
1103
1104 async fn record_read_source_timeout(&self, source_id: &str) {
1105 let now = Instant::now();
1106 let mut stats = self.read_source_stats.write().await;
1107 let stats = stats
1108 .entry(source_id.to_string())
1109 .or_insert_with(AdaptiveSourceStats::default);
1110 stats.timeouts += 1;
1111 stats.last_failure_at = Some(now);
1112 Self::apply_source_backoff(stats, now);
1113 }
1114
1115 fn apply_source_backoff(stats: &mut AdaptiveSourceStats, now: Instant) {
1116 stats.backoff_level = stats.backoff_level.saturating_add(1);
1117 let backoff_ms = (INITIAL_SOURCE_BACKOFF_MS
1118 .saturating_mul(2u64.saturating_pow(stats.backoff_level.saturating_sub(1))))
1119 .min(MAX_SOURCE_BACKOFF_MS);
1120 stats.backed_off_until = Some(now + Duration::from_millis(backoff_ms));
1121 }
1122
1123 async fn ordered_read_sources(&self) -> Vec<Arc<dyn MeshReadSource>> {
1124 let sources = self.read_sources.read().await;
1125 if sources.is_empty() {
1126 return Vec::new();
1127 }
1128
1129 let mut available: Vec<Arc<dyn MeshReadSource>> = sources
1130 .values()
1131 .filter(|source| source.is_available())
1132 .cloned()
1133 .collect();
1134 if available.is_empty() {
1135 return Vec::new();
1136 }
1137
1138 let now = Instant::now();
1139 let stats = self.read_source_stats.read().await;
1140 let mut healthy: Vec<Arc<dyn MeshReadSource>> = available
1141 .iter()
1142 .filter(|source| {
1143 stats
1144 .get(source.id())
1145 .and_then(|s| s.backed_off_until)
1146 .is_none_or(|until| until <= now)
1147 })
1148 .cloned()
1149 .collect();
1150 if !healthy.is_empty() {
1151 available = std::mem::take(&mut healthy);
1152 }
1153
1154 available.sort_by(|left, right| {
1155 let left_stats = stats.get(left.id()).cloned().unwrap_or_default();
1156 let right_stats = stats.get(right.id()).cloned().unwrap_or_default();
1157 adaptive_source_score(&right_stats, now)
1158 .partial_cmp(&adaptive_source_score(&left_stats, now))
1159 .unwrap_or(std::cmp::Ordering::Equal)
1160 .then_with(|| left.id().cmp(right.id()))
1161 });
1162 available
1163 }
1164
1165 async fn should_probe_multiple_read_sources(
1166 &self,
1167 ordered_sources: &[Arc<dyn MeshReadSource>],
1168 ) -> bool {
1169 if ordered_sources.len() <= 1 {
1170 return false;
1171 }
1172 let stats = self.read_source_stats.read().await;
1173 let best = stats
1174 .get(ordered_sources[0].id())
1175 .cloned()
1176 .unwrap_or_default();
1177 let second = stats
1178 .get(ordered_sources[1].id())
1179 .cloned()
1180 .unwrap_or_default();
1181 if !source_has_history(&best) || !source_has_history(&second) {
1182 return true;
1183 }
1184 let now = Instant::now();
1185 adaptive_source_score(&best, now) - adaptive_source_score(&second, now)
1186 < SOURCE_SCORE_TIE_DELTA
1187 }
1188
1189 async fn source_dispatch_for(&self, source_count: usize) -> RequestDispatchConfig {
1190 if source_count == 0 {
1191 return self.routing.dispatch;
1192 }
1193 let ordered_sources = self.ordered_read_sources().await;
1194 let probe_multiple = self
1195 .should_probe_multiple_read_sources(&ordered_sources)
1196 .await;
1197 RequestDispatchConfig {
1198 initial_fanout: if probe_multiple {
1199 source_count.min(2)
1200 } else {
1201 1
1202 },
1203 hedge_fanout: self.routing.dispatch.hedge_fanout,
1204 max_fanout: self.routing.dispatch.max_fanout.min(source_count),
1205 hedge_interval_ms: self.routing.dispatch.hedge_interval_ms,
1206 }
1207 }
1208
1209 pub async fn peer_count(&self) -> usize {
1211 self.signaling.peer_count().await
1212 }
1213
1214 pub async fn needs_peers(&self) -> bool {
1216 self.signaling.needs_peers().await
1217 }
1218
1219 pub async fn send_hello(&self) -> Result<(), TransportError> {
1221 self.signaling.send_hello(vec![]).await
1222 }
1223
1224 pub async fn drain_available_data_messages(self: &Arc<Self>) -> DataPumpStats {
1229 let mut stats = DataPumpStats::default();
1230 let peer_ids = self.signaling.peer_ids().await;
1231 for peer_id in peer_ids {
1232 let Some(channel) = self.signaling.get_channel(&peer_id).await else {
1233 continue;
1234 };
1235
1236 while let Some(data) = channel.try_recv() {
1237 stats.processed += 1;
1238 stats.processed_bytes += data.len() as u64;
1239 if let Some(msg) = parse_message(&data) {
1240 match msg {
1241 DataMessage::Request(_) => stats.request_messages += 1,
1242 DataMessage::Response(_) => stats.response_messages += 1,
1243 DataMessage::QuoteRequest(_) => stats.quote_request_messages += 1,
1244 DataMessage::QuoteResponse(_) => stats.quote_response_messages += 1,
1245 DataMessage::Payment(_)
1246 | DataMessage::PaymentAck(_)
1247 | DataMessage::Chunk(_) => {}
1248 }
1249 }
1250 self.handle_data_message(&peer_id, &data).await;
1251 }
1252 }
1253 stats
1254 }
1255
1256 pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
1258 self.peer_selector
1259 .write()
1260 .await
1261 .record_cashu_payment(peer_id, amount_sat);
1262 }
1263
1264 pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
1266 self.peer_selector
1267 .write()
1268 .await
1269 .record_cashu_receipt(peer_id, amount_sat);
1270 }
1271
1272 pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
1274 self.peer_selector
1275 .write()
1276 .await
1277 .record_cashu_payment_default(peer_id);
1278 }
1279
1280 pub async fn selector_summary(&self) -> crate::peer_selector::SelectorSummary {
1282 self.peer_selector.read().await.summary()
1283 }
1284
1285 fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
1286 selector.is_peer_blocked_for_payment_defaults(
1287 peer_id,
1288 self.routing.cashu_payment_default_block_threshold,
1289 )
1290 }
1291
1292 pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
1294 self.peer_selector
1295 .read()
1296 .await
1297 .export_peer_metadata_snapshot()
1298 }
1299
1300 pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
1305 let snapshot = self
1306 .peer_selector
1307 .read()
1308 .await
1309 .export_peer_metadata_snapshot();
1310 let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
1311 StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
1312 })?;
1313 let snapshot_hash = hashtree_core::sha256(&bytes);
1314 let _ = self.local_store.put(snapshot_hash, bytes).await?;
1315
1316 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
1317 let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
1318 let _ = self.local_store.delete(&pointer_slot).await?;
1319 let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
1320
1321 Ok(snapshot_hash)
1322 }
1323
1324 pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
1326 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
1327 let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
1328 return Ok(false);
1329 };
1330 let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
1331 StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
1332 })?;
1333 let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
1334
1335 let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
1336 return Ok(false);
1337 };
1338 let snapshot: PeerMetadataSnapshot =
1339 serde_json::from_slice(&snapshot_bytes).map_err(|e| {
1340 StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
1341 })?;
1342 self.peer_selector
1343 .write()
1344 .await
1345 .import_peer_metadata_snapshot(&snapshot);
1346 Ok(true)
1347 }
1348
1349 pub async fn get_with_quote(
1354 &self,
1355 hash: &Hash,
1356 payment_sat: u64,
1357 quote_ttl: Duration,
1358 ) -> Result<Option<Vec<u8>>, StoreError> {
1359 if let Some(data) = self.local_store.get(hash).await? {
1360 return Ok(Some(data));
1361 }
1362 Ok(self
1363 .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
1364 .await)
1365 }
1366
1367 async fn request_from_peers_with_quote(
1368 &self,
1369 hash: &Hash,
1370 payment_sat: u64,
1371 quote_ttl: Duration,
1372 ) -> Option<Vec<u8>> {
1373 let ordered_peer_ids = self.ordered_connected_peers(None).await;
1374 if ordered_peer_ids.is_empty() {
1375 return None;
1376 }
1377
1378 if let Some(quote) = self
1379 .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
1380 .await
1381 {
1382 if let Some(data) = self
1383 .request_from_single_peer(hash, "e.peer_id, MAX_HTL, Some(quote.quote_id))
1384 .await
1385 {
1386 return Some(data);
1387 }
1388 }
1389
1390 self.request_from_mesh(hash).await
1391 }
1392
1393 async fn request_quote_from_peers(
1394 &self,
1395 hash: &Hash,
1396 payment_sat: u64,
1397 quote_ttl: Duration,
1398 ordered_peer_ids: &[String],
1399 ) -> Option<NegotiatedQuote> {
1400 if ordered_peer_ids.is_empty() {
1401 return None;
1402 }
1403 let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
1404 if ttl_ms == 0 {
1405 return None;
1406 }
1407 let requested_mint = self.requested_quote_mint().map(str::to_string);
1408
1409 let hash_key = hash_to_key(hash);
1410 let (tx, rx) = oneshot::channel();
1411 self.pending_quotes.write().await.insert(
1412 hash_key.clone(),
1413 PendingQuoteRequest {
1414 response_tx: tx,
1415 preferred_mint_url: requested_mint.clone(),
1416 offered_payment_sat: payment_sat,
1417 },
1418 );
1419
1420 let rx = Arc::new(Mutex::new(rx));
1421 let result = run_hedged_waves(
1422 ordered_peer_ids.len(),
1423 self.routing.dispatch,
1424 self.request_timeout,
1425 |range| {
1426 let wave_peer_ids = ordered_peer_ids[range].to_vec();
1427 let requested_mint = requested_mint.clone();
1428 let hash = *hash;
1429 async move {
1430 let mut sent = 0usize;
1431 for peer_id in wave_peer_ids {
1432 if self
1433 .send_quote_request_to_peer(
1434 &peer_id,
1435 &hash,
1436 payment_sat,
1437 ttl_ms,
1438 requested_mint.as_deref(),
1439 )
1440 .await
1441 {
1442 sent += 1;
1443 }
1444 }
1445 sent
1446 }
1447 },
1448 |wait| {
1449 let rx = rx.clone();
1450 async move {
1451 let mut rx = rx.lock().await;
1452 match tokio::time::timeout(wait, &mut *rx).await {
1453 Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
1454 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1455 Err(_) => HedgedWaveAction::Continue,
1456 }
1457 }
1458 },
1459 )
1460 .await;
1461 let _ = self.pending_quotes.write().await.remove(&hash_key);
1462 result
1463 }
1464
1465 async fn request_from_single_peer(
1466 &self,
1467 hash: &Hash,
1468 peer_id: &str,
1469 request_htl: u8,
1470 quote_id: Option<u64>,
1471 ) -> Option<Vec<u8>> {
1472 let hash_key = hash_to_key(hash);
1473 let (tx, rx) = oneshot::channel();
1474 self.pending_requests.write().await.insert(
1475 hash_key.clone(),
1476 PendingRequest {
1477 response_tx: tx,
1478 started_at: Instant::now(),
1479 queried_peers: vec![peer_id.to_string()],
1480 },
1481 );
1482
1483 let mut rx = rx;
1484 if !self
1485 .send_request_to_peer(peer_id, hash, request_htl, quote_id)
1486 .await
1487 {
1488 let _ = self.pending_requests.write().await.remove(&hash_key);
1489 return None;
1490 }
1491 self.reserve_peer_request(peer_id).await;
1492
1493 if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
1494 if hashtree_core::sha256(&data) == *hash {
1495 let _ = self.local_store.put(*hash, data.clone()).await;
1496 return Some(data);
1497 }
1498 }
1499
1500 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1501 self.release_queried_peer_requests(&pending.queried_peers)
1502 .await;
1503 for peer_id in pending.queried_peers {
1504 self.peer_selector.write().await.record_timeout(&peer_id);
1505 }
1506 }
1507 let _ = self.take_forward_requesters(&hash_key).await;
1508 None
1509 }
1510
1511 async fn request_from_ordered_peers(
1512 &self,
1513 hash: &Hash,
1514 ordered_peer_ids: &[String],
1515 request_htl: u8,
1516 ) -> RouteFetchOutcome {
1517 let hash_key = hash_to_key(hash);
1518 let (tx, rx) = oneshot::channel();
1519 self.pending_requests.write().await.insert(
1520 hash_key.clone(),
1521 PendingRequest {
1522 response_tx: tx,
1523 started_at: Instant::now(),
1524 queried_peers: Vec::new(),
1525 },
1526 );
1527
1528 let rx = Arc::new(Mutex::new(rx));
1529 let result = run_hedged_waves(
1530 ordered_peer_ids.len(),
1531 self.routing.dispatch,
1532 self.request_timeout,
1533 |range| {
1534 let wave_peer_ids = ordered_peer_ids[range].to_vec();
1535 let hash = *hash;
1536 let hash_key = hash_key.clone();
1537 async move {
1538 let mut sent = 0usize;
1539 for peer_id in wave_peer_ids {
1540 if self
1541 .send_request_to_peer(&peer_id, &hash, request_htl, None)
1542 .await
1543 {
1544 sent += 1;
1545 self.reserve_peer_request(&peer_id).await;
1546 if let Some(pending) =
1547 self.pending_requests.write().await.get_mut(&hash_key)
1548 {
1549 pending.queried_peers.push(peer_id);
1550 }
1551 }
1552 }
1553 sent
1554 }
1555 },
1556 |wait| {
1557 let rx = rx.clone();
1558 async move {
1559 let mut rx = rx.lock().await;
1560 match tokio::time::timeout(wait, &mut *rx).await {
1561 Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == *hash => {
1562 HedgedWaveAction::Success(data)
1563 }
1564 Ok(Ok(Some(_))) => HedgedWaveAction::Continue,
1565 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1566 Err(_) => HedgedWaveAction::Continue,
1567 }
1568 }
1569 },
1570 )
1571 .await;
1572
1573 let Some(data) = result else {
1574 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1575 self.release_queried_peer_requests(&pending.queried_peers)
1576 .await;
1577 for peer_id in pending.queried_peers {
1578 self.peer_selector.write().await.record_timeout(&peer_id);
1579 }
1580 }
1581 let _ = self.take_forward_requesters(&hash_key).await;
1582 return RouteFetchOutcome::Timeout;
1583 };
1584
1585 let _ = self.local_store.put(*hash, data.clone()).await;
1586 RouteFetchOutcome::Hit(data)
1587 }
1588
1589 async fn request_from_read_sources_inner(&self, hash: &Hash) -> RouteFetchOutcome {
1590 let ordered_sources = self.ordered_read_sources().await;
1591 if ordered_sources.is_empty() {
1592 return RouteFetchOutcome::Miss;
1593 }
1594
1595 let dispatch = normalize_dispatch_config(
1596 self.source_dispatch_for(ordered_sources.len()).await,
1597 ordered_sources.len(),
1598 );
1599 let wave_plan = build_hedged_wave_plan(ordered_sources.len(), dispatch);
1600 if wave_plan.is_empty() {
1601 return RouteFetchOutcome::Miss;
1602 }
1603
1604 let deadline = Instant::now() + self.request_timeout;
1605 let mut pending = FuturesUnordered::new();
1606 let mut pending_source_ids = HashSet::new();
1607 let mut saw_timeout = false;
1608 let mut next_source_idx = 0usize;
1609
1610 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
1611 let from = next_source_idx;
1612 let to = (next_source_idx + wave_size).min(ordered_sources.len());
1613 next_source_idx = to;
1614
1615 for source in ordered_sources[from..to].iter().cloned() {
1616 let source_id = source.id().to_string();
1617 self.record_read_source_request(&source_id).await;
1618 pending_source_ids.insert(source_id.clone());
1619 let hash = *hash;
1620 pending.push(tokio::spawn(async move {
1621 let started_at = Instant::now();
1622 let result = std::panic::AssertUnwindSafe(source.get(&hash))
1623 .catch_unwind()
1624 .await;
1625 match result {
1626 Ok(Some(data)) => SourceFetchOutcome::Hit {
1627 source_id,
1628 data,
1629 elapsed_ms: started_at.elapsed().as_millis().max(1) as u64,
1630 },
1631 Ok(None) => SourceFetchOutcome::Miss { source_id },
1632 Err(_) => SourceFetchOutcome::Failure { source_id },
1633 }
1634 }));
1635 }
1636
1637 let is_last_wave =
1638 wave_idx + 1 == wave_plan.len() || next_source_idx >= ordered_sources.len();
1639 let window_end = if is_last_wave {
1640 deadline
1641 } else {
1642 (Instant::now() + Duration::from_millis(dispatch.hedge_interval_ms)).min(deadline)
1643 };
1644
1645 while Instant::now() < window_end {
1646 let remaining = window_end.saturating_duration_since(Instant::now());
1647 let Some(result) = tokio::time::timeout(remaining, pending.next())
1648 .await
1649 .ok()
1650 .flatten()
1651 else {
1652 break;
1653 };
1654 let Ok(outcome) = result else {
1655 continue;
1656 };
1657 match outcome {
1658 SourceFetchOutcome::Hit {
1659 source_id,
1660 data,
1661 elapsed_ms,
1662 } => {
1663 pending_source_ids.remove(&source_id);
1664 self.record_read_source_success(&source_id, elapsed_ms)
1665 .await;
1666 return RouteFetchOutcome::Hit(data);
1667 }
1668 SourceFetchOutcome::Miss { source_id } => {
1669 pending_source_ids.remove(&source_id);
1670 self.record_read_source_miss(&source_id).await;
1671 }
1672 SourceFetchOutcome::Failure { source_id } => {
1673 pending_source_ids.remove(&source_id);
1674 self.record_read_source_failure(&source_id).await;
1675 }
1676 }
1677 }
1678
1679 if Instant::now() >= deadline {
1680 break;
1681 }
1682 }
1683
1684 for source_id in pending_source_ids {
1685 saw_timeout = true;
1686 self.record_read_source_timeout(&source_id).await;
1687 }
1688 if saw_timeout {
1689 RouteFetchOutcome::Timeout
1690 } else {
1691 RouteFetchOutcome::Miss
1692 }
1693 }
1694
1695 async fn request_from_read_sources(&self, hash: &Hash) -> RouteFetchOutcome {
1696 let hash_key = hash_to_key(hash);
1697 let existing_wait = {
1698 let mut inflight = self.inflight_source_fetches.lock().await;
1699 if let Some(existing) = inflight.get_mut(&hash_key) {
1700 let (tx, rx) = oneshot::channel();
1701 existing.waiters.push(tx);
1702 Some(rx)
1703 } else {
1704 inflight.insert(
1705 hash_key.clone(),
1706 InflightSourceFetch {
1707 waiters: Vec::new(),
1708 },
1709 );
1710 None
1711 }
1712 };
1713
1714 if let Some(wait) = existing_wait {
1715 return wait.await.unwrap_or(RouteFetchOutcome::Timeout);
1716 }
1717
1718 let result = self.request_from_read_sources_inner(hash).await;
1719 if let RouteFetchOutcome::Hit(hit) = &result {
1720 let _ = self.local_store.put(*hash, hit.clone()).await;
1721 }
1722
1723 let waiters = self
1724 .inflight_source_fetches
1725 .lock()
1726 .await
1727 .remove(&hash_key)
1728 .map(|inflight| inflight.waiters)
1729 .unwrap_or_default();
1730 for waiter in waiters {
1731 let _ = waiter.send(result.clone());
1732 }
1733
1734 result
1735 }
1736
1737 async fn available_read_routes(&self, context: &MeshReadContext) -> Vec<ReadRoute> {
1738 let mut routes = Vec::new();
1739 let ordered_peers = self
1740 .ordered_connected_peers(context.exclude_peer_id.as_deref())
1741 .await;
1742 if !ordered_peers.is_empty() {
1743 routes.push(ReadRoute::Peers(ordered_peers));
1744 }
1745 if !self.ordered_read_sources().await.is_empty() {
1746 routes.push(ReadRoute::Sources);
1747 }
1748 if routes.len() <= 1 {
1749 return routes;
1750 }
1751
1752 let now = Instant::now();
1753 let stats = self.read_route_stats.read().await;
1754 routes.sort_by(|left, right| {
1755 let left_stats = stats.get(left.id()).cloned().unwrap_or_default();
1756 let right_stats = stats.get(right.id()).cloned().unwrap_or_default();
1757 adaptive_source_score(&right_stats, now)
1758 .partial_cmp(&adaptive_source_score(&left_stats, now))
1759 .unwrap_or(std::cmp::Ordering::Equal)
1760 .then_with(|| left.id().cmp(right.id()))
1761 });
1762 routes
1763 }
1764
1765 async fn should_probe_multiple_routes(&self, routes: &[ReadRoute]) -> bool {
1766 if routes.len() <= 1 {
1767 return false;
1768 }
1769 let stats = self.read_route_stats.read().await;
1770 let best = stats.get(routes[0].id()).cloned().unwrap_or_default();
1771 let second = stats.get(routes[1].id()).cloned().unwrap_or_default();
1772 if !source_has_history(&best) || !source_has_history(&second) {
1773 return true;
1774 }
1775 let now = Instant::now();
1776 adaptive_source_score(&best, now) - adaptive_source_score(&second, now)
1777 < SOURCE_SCORE_TIE_DELTA
1778 }
1779
1780 async fn run_read_route(
1781 &self,
1782 hash: &Hash,
1783 route: &ReadRoute,
1784 context: &MeshReadContext,
1785 ) -> RouteFetchOutcome {
1786 let route_id = route.id();
1787 self.record_route_request(route_id).await;
1788 let started_at = Instant::now();
1789 let result = match route {
1790 ReadRoute::Peers(peer_ids) => {
1791 self.request_from_ordered_peers(hash, peer_ids, context.request_htl)
1792 .await
1793 }
1794 ReadRoute::Sources => self.request_from_read_sources(hash).await,
1795 };
1796 match &result {
1797 RouteFetchOutcome::Hit(_) => {
1798 self.record_route_success(route_id, started_at.elapsed().as_millis().max(1) as u64)
1799 .await;
1800 }
1801 RouteFetchOutcome::Miss => {
1802 self.record_route_miss(route_id).await;
1803 }
1804 RouteFetchOutcome::Timeout => {
1805 self.record_route_timeout(route_id).await;
1806 }
1807 }
1808 result
1809 }
1810
1811 async fn request_from_mesh_with_context(
1812 &self,
1813 hash: &Hash,
1814 context: &MeshReadContext,
1815 ) -> Option<Vec<u8>> {
1816 let routes = self.available_read_routes(context).await;
1817 match routes.as_slice() {
1818 [] => None,
1819 [route] => match self.run_read_route(hash, route, context).await {
1820 RouteFetchOutcome::Hit(data) => Some(data),
1821 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
1822 },
1823 [first, second, ..] => {
1824 if self.should_probe_multiple_routes(&routes).await {
1825 let first_fut = self.run_read_route(hash, first, context);
1826 let second_fut = self.run_read_route(hash, second, context);
1827 tokio::pin!(first_fut);
1828 tokio::pin!(second_fut);
1829 let mut first_done = false;
1830 let mut second_done = false;
1831 loop {
1832 tokio::select! {
1833 result = &mut first_fut, if !first_done => {
1834 first_done = true;
1835 if let RouteFetchOutcome::Hit(data) = result {
1836 return Some(data);
1837 }
1838 }
1839 result = &mut second_fut, if !second_done => {
1840 second_done = true;
1841 if let RouteFetchOutcome::Hit(data) = result {
1842 return Some(data);
1843 }
1844 }
1845 else => break,
1846 }
1847 if first_done && second_done {
1848 break;
1849 }
1850 }
1851 None
1852 } else {
1853 match self.run_read_route(hash, first, context).await {
1854 RouteFetchOutcome::Hit(data) => return Some(data),
1855 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
1856 }
1857 match self.run_read_route(hash, second, context).await {
1858 RouteFetchOutcome::Hit(data) => Some(data),
1859 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
1860 }
1861 }
1862 }
1863 }
1864 }
1865
1866 async fn request_from_mesh(&self, hash: &Hash) -> Option<Vec<u8>> {
1867 self.request_from_mesh_with_context(hash, &MeshReadContext::default())
1868 .await
1869 }
1870
1871 async fn begin_forward_request(&self, hash_key: &str, requester_id: &str) -> bool {
1872 let mut pending = self.pending_forward_requests.write().await;
1873 if let Some(existing) = pending.get_mut(hash_key) {
1874 existing.requester_ids.insert(requester_id.to_string());
1875 return false;
1876 }
1877
1878 let mut requester_ids = HashSet::new();
1879 requester_ids.insert(requester_id.to_string());
1880 pending.insert(
1881 hash_key.to_string(),
1882 PendingForwardRequest { requester_ids },
1883 );
1884 true
1885 }
1886
1887 async fn take_forward_requesters(&self, hash_key: &str) -> Vec<String> {
1888 self.pending_forward_requests
1889 .write()
1890 .await
1891 .remove(hash_key)
1892 .map(|pending| pending.requester_ids.into_iter().collect())
1893 .unwrap_or_default()
1894 }
1895
1896 async fn complete_pending_response(
1897 &self,
1898 from_peer: &str,
1899 hash: &Hash,
1900 hash_key: String,
1901 payload: Vec<u8>,
1902 ) {
1903 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1904 self.release_queried_peer_requests(&pending.queried_peers)
1905 .await;
1906 let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
1907 self.peer_selector.write().await.record_success(
1908 from_peer,
1909 rtt_ms,
1910 payload.len() as u64,
1911 );
1912 let forward_requesters = self.take_forward_requesters(&hash_key).await;
1913 let response_bytes = if forward_requesters.is_empty() {
1914 None
1915 } else {
1916 Some(encode_response(&create_response(hash, payload.clone())))
1917 };
1918 let _ = pending.response_tx.send(Some(payload));
1919 if let Some(response_bytes) = response_bytes {
1920 for requester_id in forward_requesters {
1921 if let Some(channel) = self.signaling.get_channel(&requester_id).await {
1922 let _ = channel.send(response_bytes.clone()).await;
1923 }
1924 }
1925 }
1926 }
1927 }
1928
1929 async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
1930 if !res.a {
1931 return;
1932 }
1933
1934 let Some(quote_id) = res.q else {
1935 return;
1936 };
1937
1938 let hash_key = hash_to_key(&res.h);
1939 let (preferred_mint_url, offered_payment_sat) = {
1940 let pending_quotes = self.pending_quotes.read().await;
1941 let Some(pending) = pending_quotes.get(&hash_key) else {
1942 return;
1943 };
1944 (
1945 pending.preferred_mint_url.clone(),
1946 pending.offered_payment_sat,
1947 )
1948 };
1949 if !self
1950 .should_accept_quote_response(
1951 from_peer,
1952 preferred_mint_url.as_deref(),
1953 offered_payment_sat,
1954 &res,
1955 )
1956 .await
1957 {
1958 return;
1959 }
1960 let mut pending_quotes = self.pending_quotes.write().await;
1961 if let Some(pending) = pending_quotes.remove(&hash_key) {
1962 let _ = pending.response_tx.send(Some(NegotiatedQuote {
1963 peer_id: from_peer.to_string(),
1964 quote_id,
1965 mint_url: res.m,
1966 }));
1967 }
1968 }
1969
1970 async fn handle_response_message(&self, from_peer: &str, res: crate::protocol::DataResponse) {
1971 let hash_key = hash_to_key(&res.h);
1972 let hash = match crate::protocol::bytes_to_hash(&res.h) {
1973 Some(h) => h,
1974 None => return,
1975 };
1976
1977 if hashtree_core::sha256(&res.d) != hash {
1979 self.peer_selector.write().await.record_failure(from_peer);
1980 if self.debug {
1981 println!("[MeshStoreCore] Ignoring invalid response payload for {hash_key}");
1982 }
1983 return;
1984 }
1985
1986 self.complete_pending_response(from_peer, &hash, hash_key, res.d)
1987 .await;
1988 }
1989
1990 async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
1991 let hash = match crate::protocol::bytes_to_hash(&req.h) {
1992 Some(h) => h,
1993 None => return,
1994 };
1995 let hash_key = hash_to_key(&hash);
1996
1997 {
1998 let selector = self.peer_selector.read().await;
1999 if self.should_refuse_requests_from_peer(&selector, from_peer) {
2000 if self.debug {
2001 println!(
2002 "[MeshStoreCore] Refusing quote request from delinquent peer {}",
2003 from_peer
2004 );
2005 }
2006 return;
2007 }
2008 }
2009
2010 let chosen_mint = self.choose_quote_mint(req.m.as_deref());
2011 let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
2012 && !self.should_drop_response(&hash)
2013 && !self.should_corrupt_response(&hash);
2014
2015 let res = if can_serve {
2016 let quote_id = self
2017 .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
2018 .await;
2019 create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
2020 } else {
2021 create_quote_response_unavailable(&hash)
2022 };
2023 let response_bytes = encode_quote_response(&res);
2024 if let Some(channel) = self.signaling.get_channel(from_peer).await {
2025 let _ = channel.send(response_bytes).await;
2026 }
2027 }
2028
2029 async fn handle_request_message(
2030 self: &Arc<Self>,
2031 from_peer: &str,
2032 req: crate::protocol::DataRequest,
2033 ) {
2034 let hash = match crate::protocol::bytes_to_hash(&req.h) {
2035 Some(h) => h,
2036 None => return,
2037 };
2038 let hash_key = hash_to_key(&hash);
2039
2040 if let Some(quote_id) = req.q {
2041 if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
2042 if self.debug {
2043 println!(
2044 "[MeshStoreCore] Refusing request with invalid or expired quote {} from {}",
2045 quote_id, from_peer
2046 );
2047 }
2048 return;
2049 }
2050 }
2051
2052 let allow_peer_forwarding = {
2053 let selector = self.peer_selector.read().await;
2054 !self.should_refuse_requests_from_peer(&selector, from_peer)
2055 };
2056
2057 if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
2059 if self.should_drop_response(&hash) {
2060 if self.debug {
2061 println!(
2062 "[MeshStoreCore] Dropping response for {} due to actor profile",
2063 hash_to_key(&hash)
2064 );
2065 }
2066 return;
2067 }
2068
2069 let response_delay = self.response_send_delay(&hash, data.len());
2070 if !response_delay.is_zero() {
2071 tokio::time::sleep(response_delay).await;
2072 }
2073
2074 if self.should_corrupt_response(&hash) {
2075 if data.is_empty() {
2076 data.push(0x80);
2077 } else {
2078 data[0] ^= 0x80;
2079 }
2080 }
2081
2082 let res = create_response(&hash, data);
2084 let response_bytes = encode_response(&res);
2085 if let Some(channel) = self.signaling.get_channel(from_peer).await {
2086 let _ = channel.send(response_bytes).await;
2087 }
2088 return;
2089 }
2090
2091 if self.pending_requests.read().await.contains_key(&hash_key) {
2092 let _ = self.begin_forward_request(&hash_key, from_peer).await;
2093 return;
2094 }
2095
2096 if !self.begin_forward_request(&hash_key, from_peer).await {
2097 return;
2098 }
2099
2100 let from_peer = from_peer.to_string();
2101 let this = Arc::clone(self);
2102 let request_htl = req.htl;
2103 tokio::spawn(async move {
2104 let result = if allow_peer_forwarding {
2105 let context = MeshReadContext {
2106 exclude_peer_id: Some(from_peer.clone()),
2107 request_htl,
2108 };
2109 this.request_from_mesh_with_context(&hash, &context).await
2110 } else {
2111 if this.debug {
2112 println!(
2113 "[MeshStoreCore] Serving request from delinquent peer {} via read sources only",
2114 from_peer
2115 );
2116 }
2117 match this.request_from_read_sources(&hash).await {
2118 RouteFetchOutcome::Hit(data) => Some(data),
2119 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
2120 }
2121 };
2122 let requester_ids = this.take_forward_requesters(&hash_key).await;
2123 if let Some(data) = result {
2124 let res = create_response(&hash, data);
2125 let response_bytes = encode_response(&res);
2126 for requester_id in requester_ids {
2127 if let Some(channel) = this.signaling.get_channel(&requester_id).await {
2128 let _ = channel.send(response_bytes.clone()).await;
2129 }
2130 }
2131 }
2132 });
2133 }
2134
2135 pub async fn handle_data_message(self: &Arc<Self>, from_peer: &str, data: &[u8]) {
2137 let parsed = match parse_message(data) {
2138 Some(m) => m,
2139 None => return,
2140 };
2141
2142 match parsed {
2143 DataMessage::Request(req) => {
2144 self.handle_request_message(from_peer, req).await;
2145 }
2146 DataMessage::Response(res) => {
2147 self.handle_response_message(from_peer, res).await;
2148 }
2149 DataMessage::QuoteRequest(req) => {
2150 self.handle_quote_request_message(from_peer, req).await;
2151 }
2152 DataMessage::QuoteResponse(res) => {
2153 self.handle_quote_response_message(from_peer, res).await;
2154 }
2155 DataMessage::Payment(_) | DataMessage::PaymentAck(_) | DataMessage::Chunk(_) => {}
2156 }
2157 }
2158}
2159
2160#[async_trait]
2161impl<S, R, F> Store for MeshStoreCore<S, R, F>
2162where
2163 S: Store + Send + Sync + 'static,
2164 R: SignalingTransport + Send + Sync + 'static,
2165 F: PeerLinkFactory + Send + Sync + 'static,
2166{
2167 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2168 self.local_store.put(hash, data).await
2169 }
2170
2171 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2172 if let Some(data) = self.local_store.get(hash).await? {
2174 return Ok(Some(data));
2175 }
2176
2177 Ok(self.request_from_mesh(hash).await)
2179 }
2180
2181 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2182 self.local_store.has(hash).await
2183 }
2184
2185 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2186 self.local_store.delete(hash).await
2187 }
2188}
2189
2190#[cfg(test)]
2191mod tests;
2192
2193pub type SimMeshStore<S> =
2195 MeshStoreCore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
2196
2197pub type ProductionMeshStore<S> =
2199 MeshStoreCore<S, crate::nostr::NostrRelayTransport, crate::real_factory::WebRtcPeerLinkFactory>;