1use async_trait::async_trait;
8use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
9use std::collections::hash_map::DefaultHasher;
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::future::Future;
12use std::hash::{Hash as _, Hasher};
13use std::ops::Range;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{oneshot, Mutex, RwLock};
18use tokio::time::Instant;
19
20use hashtree_core::{Hash, Store, StoreError};
21
22use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
23use crate::protocol::{
24 create_pubsub_frame, create_pubsub_interest, create_pubsub_inventory, create_pubsub_want,
25 create_quote_request, create_quote_response_available, create_quote_response_unavailable,
26 create_request, create_request_with_quote, create_response, encode_pubsub_frame,
27 encode_pubsub_interest, encode_pubsub_inventory, encode_pubsub_want, encode_quote_request,
28 encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
29 DataMessage, DataQuoteRequest, DataQuoteResponse, PubsubFrame, PubsubInterest, PubsubInventory,
30 PubsubWant,
31};
32use crate::pubsub_strategy::{
33 reciprocal_virtual_finish, select_reciprocal_outbound_job, OutboundJobCandidate,
34 PeerTrafficSnapshot, PubsubCandidate, PubsubSchedulerConfig,
35};
36use crate::signaling::MeshRouter;
37use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
38use crate::types::{
39 should_forward_htl, PeerHTLConfig, SignalingMessage, TimedSeenSet, MAX_HTL, MESH_EVENT_POLICY,
40};
41
42const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-mesh/peer-metadata/latest/v1";
45const RECENT_FORWARD_MISS_CAPACITY: usize = 4096;
46const MIN_RECENT_FORWARD_MISS_TTL_MS: u64 = 250;
47const PUBSUB_SEEN_CAPACITY: usize = 16_384;
48const PUBSUB_INBOX_CAPACITY: usize = 4_096;
49const PUBSUB_FRAME_CACHE_CAPACITY: usize = 4_096;
50const PUBSUB_SEEN_TTL: Duration = Duration::from_secs(120);
51
52struct PendingRequest {
54 response_tx: oneshot::Sender<Option<Vec<u8>>>,
55 started_at: Instant,
56 queried_peers: Vec<String>,
57}
58
59struct PendingQuoteRequest {
60 response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
61 preferred_mint_url: Option<String>,
62 offered_payment_sat: u64,
63}
64
65struct PendingForwardRequest {
66 requester_ids: HashSet<String>,
67}
68
69type PeerWireStats = PeerTrafficSnapshot;
70
71struct PendingResponseSend {
72 job_id: u64,
73 peer_id: String,
74 bytes: Vec<u8>,
75 ready_at: Instant,
76 queue_sequence: u64,
77}
78
79#[async_trait]
80pub trait MeshReadSource: Send + Sync {
81 fn id(&self) -> &str;
82
83 fn is_available(&self) -> bool {
84 true
85 }
86
87 async fn get(&self, hash: &Hash) -> Option<Vec<u8>>;
88}
89
90#[derive(Debug, Clone)]
91struct NegotiatedQuote {
92 peer_id: String,
93 quote_id: u64,
94 #[allow(dead_code)]
95 mint_url: Option<String>,
96}
97
98struct IssuedQuote {
99 expires_at: Instant,
100 #[allow(dead_code)]
101 payment_sat: u64,
102 #[allow(dead_code)]
103 mint_url: Option<String>,
104}
105
106#[derive(Debug, Clone, Default)]
107struct AdaptiveSourceStats {
108 requests: u64,
109 successes: u64,
110 misses: u64,
111 failures: u64,
112 timeouts: u64,
113 srtt_ms: f64,
114 rttvar_ms: f64,
115 backoff_level: u32,
116 backed_off_until: Option<Instant>,
117 last_success_at: Option<Instant>,
118 last_failure_at: Option<Instant>,
119}
120
121#[derive(Debug, Clone)]
122enum RouteFetchOutcome {
123 Hit(Vec<u8>),
124 Miss,
125 Timeout,
126}
127
128struct InflightSourceFetch {
129 waiters: Vec<oneshot::Sender<RouteFetchOutcome>>,
130}
131
132enum SourceFetchOutcome {
133 Hit {
134 source_id: String,
135 data: Vec<u8>,
136 elapsed_ms: u64,
137 },
138 Miss {
139 source_id: String,
140 },
141 Failure {
142 source_id: String,
143 },
144}
145
146const INITIAL_SOURCE_BACKOFF_MS: u64 = 250;
147const MAX_SOURCE_BACKOFF_MS: u64 = 10_000;
148const SOURCE_SCORE_TIE_DELTA: f64 = 0.15;
149const RECENT_SOURCE_SUCCESS_WINDOW: Duration = Duration::from_secs(60);
150const ACTIVE_PEER_REQUEST_RANK_PENALTY: usize = 3;
151
152fn source_reliability_score(stats: &AdaptiveSourceStats) -> f64 {
153 (stats.successes as f64 + 1.0) / (stats.requests as f64 + 2.0)
154}
155
156fn source_latency_score(stats: &AdaptiveSourceStats) -> f64 {
157 if stats.srtt_ms <= 0.0 {
158 return 0.5;
159 }
160 (500.0 / (stats.srtt_ms + 50.0)).min(1.0)
161}
162
163fn source_has_history(stats: &AdaptiveSourceStats) -> bool {
164 stats.requests > 0
165 || stats.successes > 0
166 || stats.misses > 0
167 || stats.failures > 0
168 || stats.timeouts > 0
169}
170
171fn adaptive_source_score(stats: &AdaptiveSourceStats, now: Instant) -> f64 {
172 if let Some(backed_off_until) = stats.backed_off_until {
173 if backed_off_until > now {
174 return f64::NEG_INFINITY;
175 }
176 }
177
178 let miss_penalty = if stats.requests > 0 {
179 (stats.misses as f64 / stats.requests as f64) * 0.15
180 } else {
181 0.0
182 };
183 let failure_penalty = if stats.requests > 0 {
184 ((stats.failures + stats.timeouts) as f64 / stats.requests as f64) * 0.3
185 } else {
186 0.0
187 };
188 let recency_bonus = if stats
189 .last_success_at
190 .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
191 {
192 0.1
193 } else {
194 0.0
195 };
196
197 0.6 * source_reliability_score(stats) + 0.3 * source_latency_score(stats) + recency_bonus
198 - miss_penalty
199 - failure_penalty
200}
201
202fn peer_endpoint_has_history(stats: &crate::peer_selector::PeerStats) -> bool {
203 stats.requests_sent > 0 || stats.successes > 0 || stats.failures > 0 || stats.timeouts > 0
204}
205
206fn peer_endpoint_score(stats: &crate::peer_selector::PeerStats, now: Instant) -> f64 {
207 if stats.backed_off_until.is_some_and(|until| until > now) {
208 return f64::NEG_INFINITY;
209 }
210
211 let miss_penalty = 0.0;
212 let failure_penalty = if stats.requests_sent > 0 {
213 ((stats.failures + stats.timeouts) as f64 / stats.requests_sent as f64) * 0.3
214 } else {
215 0.0
216 };
217 let recency_bonus = if stats
218 .last_success
219 .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
220 {
221 0.1
222 } else {
223 0.0
224 };
225
226 0.6 * stats.success_rate()
227 + 0.3
228 * source_latency_score(&AdaptiveSourceStats {
229 srtt_ms: stats.srtt_ms,
230 ..AdaptiveSourceStats::default()
231 })
232 + recency_bonus
233 - miss_penalty
234 - failure_penalty
235}
236
237#[derive(Clone)]
238enum ReadRoute {
239 Peers(Vec<String>),
240 Sources,
241}
242
243impl ReadRoute {
244 fn id(&self) -> &'static str {
245 match self {
246 Self::Peers(_) => "peers",
247 Self::Sources => "sources",
248 }
249 }
250}
251
252struct RankedReadRoute {
253 route: ReadRoute,
254 best_endpoint_id: String,
255 score: f64,
256 has_history: bool,
257}
258
259fn ranked_route_kind(route: &ReadRoute) -> u8 {
260 match route {
261 ReadRoute::Sources => 0,
262 ReadRoute::Peers(_) => 1,
263 }
264}
265
266#[derive(Debug, Clone)]
267struct MeshReadContext {
268 exclude_peer_id: Option<String>,
269 request_htl: u8,
270}
271
272impl Default for MeshReadContext {
273 fn default() -> Self {
274 Self {
275 exclude_peer_id: None,
276 request_htl: MAX_HTL,
277 }
278 }
279}
280
281#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
283pub struct DataPumpStats {
284 pub processed: usize,
285 pub request_messages: usize,
286 pub response_messages: usize,
287 pub quote_request_messages: u64,
288 pub quote_response_messages: u64,
289 pub pubsub_interest_messages: u64,
290 pub pubsub_frame_messages: u64,
291 pub pubsub_inventory_messages: u64,
292 pub pubsub_want_messages: u64,
293 pub processed_bytes: u64,
294}
295
296#[derive(Debug, Clone, PartialEq, Eq)]
298pub struct PubsubEvent {
299 pub stream_id: String,
300 pub seq: u64,
301 pub origin_peer_id: String,
302 pub from_peer_id: String,
303 pub payload: Vec<u8>,
304}
305
306#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
308pub struct PubsubPublishStats {
309 pub selected_peers: usize,
310 pub sent_peers: usize,
311 pub sent_bytes: u64,
312 pub deferred_peers: usize,
313}
314
315#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
317pub enum PubsubDeliveryMode {
318 InterestPush,
320 #[default]
322 HtlInvWant,
323}
324
325#[derive(Debug, Clone, Copy)]
331pub struct RequestDispatchConfig {
332 pub initial_fanout: usize,
334 pub hedge_fanout: usize,
336 pub max_fanout: usize,
338 pub hedge_interval_ms: u64,
340}
341
342impl Default for RequestDispatchConfig {
343 fn default() -> Self {
344 Self {
345 initial_fanout: usize::MAX,
346 hedge_fanout: usize::MAX,
347 max_fanout: usize::MAX,
348 hedge_interval_ms: 0,
349 }
350 }
351}
352
353pub fn normalize_dispatch_config(
355 dispatch: RequestDispatchConfig,
356 available_peers: usize,
357) -> RequestDispatchConfig {
358 let mut cfg = dispatch;
359 let cap = if cfg.max_fanout == 0 {
360 available_peers
361 } else {
362 cfg.max_fanout.min(available_peers)
363 };
364 cfg.max_fanout = cap;
365 cfg.initial_fanout = if cfg.initial_fanout == 0 {
366 1
367 } else {
368 cfg.initial_fanout.min(cap.max(1))
369 };
370 cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
371 1
372 } else {
373 cfg.hedge_fanout.min(cap.max(1))
374 };
375 cfg
376}
377
378pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
380 if peer_count == 0 {
381 return Vec::new();
382 }
383 let cap = dispatch.max_fanout.min(peer_count);
384 if cap == 0 {
385 return Vec::new();
386 }
387
388 let mut plan = Vec::new();
389 let mut sent = 0usize;
390 let first = dispatch.initial_fanout.min(cap).max(1);
391 plan.push(first);
392 sent += first;
393
394 while sent < cap {
395 let next = dispatch.hedge_fanout.min(cap - sent).max(1);
396 plan.push(next);
397 sent += next;
398 }
399 plan
400}
401
402#[derive(Debug)]
404pub enum HedgedWaveAction<T> {
405 Continue,
406 Success(T),
407 Abort,
408}
409
410pub async fn run_hedged_waves<T, SendWave, SendWaveFut, WaitWave, WaitWaveFut>(
415 peer_count: usize,
416 dispatch: RequestDispatchConfig,
417 request_timeout: Duration,
418 mut send_wave: SendWave,
419 mut wait_wave: WaitWave,
420) -> Option<T>
421where
422 SendWave: FnMut(Range<usize>) -> SendWaveFut,
423 SendWaveFut: Future<Output = usize>,
424 WaitWave: FnMut(Duration) -> WaitWaveFut,
425 WaitWaveFut: Future<Output = HedgedWaveAction<T>>,
426{
427 let dispatch = normalize_dispatch_config(dispatch, peer_count);
428 let wave_plan = build_hedged_wave_plan(peer_count, dispatch);
429 if wave_plan.is_empty() {
430 return None;
431 }
432
433 let deadline = Instant::now() + request_timeout;
434 let mut sent_total = 0usize;
435 let mut next_peer_idx = 0usize;
436
437 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
438 let from = next_peer_idx;
439 let to = (next_peer_idx + wave_size).min(peer_count);
440 next_peer_idx = to;
441
442 if from == to {
443 continue;
444 }
445
446 sent_total += send_wave(from..to).await;
447 if sent_total == 0 {
448 if next_peer_idx >= peer_count {
449 break;
450 }
451 continue;
452 }
453
454 let now = Instant::now();
455 if now >= deadline {
456 break;
457 }
458 let remaining = deadline.saturating_duration_since(now);
459 let is_last_wave = wave_idx + 1 == wave_plan.len() || next_peer_idx >= peer_count;
460 let wait = if is_last_wave {
461 remaining
462 } else if dispatch.hedge_interval_ms == 0 {
463 Duration::ZERO
464 } else {
465 Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
466 };
467
468 if wait.is_zero() {
469 continue;
470 }
471
472 match wait_wave(wait).await {
473 HedgedWaveAction::Continue => {}
474 HedgedWaveAction::Success(value) => return Some(value),
475 HedgedWaveAction::Abort => break,
476 }
477 }
478
479 None
480}
481
482pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
484 let mut selector = selector.write().await;
485 let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
486 let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
487 for peer_id in known {
488 if !current.contains(peer_id.as_str()) {
489 selector.remove_peer(&peer_id);
490 }
491 }
492 for peer_id in current_peer_ids {
493 selector.add_peer(peer_id.clone());
494 }
495}
496
497#[derive(Debug, Clone, Copy)]
501pub struct ResponseBehaviorConfig {
502 pub drop_response_prob: f64,
504 pub corrupt_response_prob: f64,
506 pub extra_delay_ms: u64,
508 pub first_byte_delay_ms: u64,
510 pub bytes_per_second: u64,
512 pub stall_response_prob: f64,
514 pub stall_delay_ms: u64,
516}
517
518impl Default for ResponseBehaviorConfig {
519 fn default() -> Self {
520 Self {
521 drop_response_prob: 0.0,
522 corrupt_response_prob: 0.0,
523 extra_delay_ms: 0,
524 first_byte_delay_ms: 0,
525 bytes_per_second: 0,
526 stall_response_prob: 0.0,
527 stall_delay_ms: 0,
528 }
529 }
530}
531
532impl ResponseBehaviorConfig {
533 fn normalized(self) -> Self {
534 Self {
535 drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
536 corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
537 extra_delay_ms: self.extra_delay_ms,
538 first_byte_delay_ms: self.first_byte_delay_ms,
539 bytes_per_second: self.bytes_per_second,
540 stall_response_prob: self.stall_response_prob.clamp(0.0, 1.0),
541 stall_delay_ms: self.stall_delay_ms,
542 }
543 }
544}
545
546#[derive(Debug, Clone)]
548pub struct MeshRoutingConfig {
549 pub selection_strategy: SelectionStrategy,
550 pub fairness_enabled: bool,
551 pub cashu_payment_weight: f64,
553 pub cashu_payment_default_block_threshold: u64,
556 pub cashu_accepted_mints: Vec<String>,
558 pub cashu_default_mint: Option<String>,
560 pub cashu_peer_suggested_mint_base_cap_sat: u64,
562 pub cashu_peer_suggested_mint_success_step_sat: u64,
564 pub cashu_peer_suggested_mint_receipt_step_sat: u64,
566 pub cashu_peer_suggested_mint_max_cap_sat: u64,
568 pub dispatch: RequestDispatchConfig,
569 pub response_behavior: ResponseBehaviorConfig,
570 pub pubsub_scheduler: PubsubSchedulerConfig,
571 pub pubsub_delivery_mode: PubsubDeliveryMode,
572}
573
574impl Default for MeshRoutingConfig {
575 fn default() -> Self {
576 Self {
577 selection_strategy: SelectionStrategy::Weighted,
578 fairness_enabled: true,
579 cashu_payment_weight: 0.0,
580 cashu_payment_default_block_threshold: 0,
581 cashu_accepted_mints: Vec::new(),
582 cashu_default_mint: None,
583 cashu_peer_suggested_mint_base_cap_sat: 0,
584 cashu_peer_suggested_mint_success_step_sat: 0,
585 cashu_peer_suggested_mint_receipt_step_sat: 0,
586 cashu_peer_suggested_mint_max_cap_sat: 0,
587 dispatch: RequestDispatchConfig::default(),
588 response_behavior: ResponseBehaviorConfig::default(),
589 pubsub_scheduler: PubsubSchedulerConfig::default(),
590 pubsub_delivery_mode: PubsubDeliveryMode::HtlInvWant,
591 }
592 }
593}
594
595pub struct MeshStoreCore<S, R, F>
602where
603 S: Store + Send + Sync + 'static,
604 R: SignalingTransport + Send + Sync + 'static,
605 F: PeerLinkFactory + Send + Sync + 'static,
606{
607 local_store: Arc<S>,
609 signaling: Arc<MeshRouter<R, F>>,
611 htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
613 pending_requests: RwLock<HashMap<String, PendingRequest>>,
615 pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
617 pending_forward_requests: RwLock<HashMap<String, PendingForwardRequest>>,
619 recent_forward_misses: Mutex<TimedSeenSet>,
621 issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
623 next_quote_id: RwLock<u64>,
625 read_sources: RwLock<HashMap<String, Arc<dyn MeshReadSource>>>,
627 read_source_stats: RwLock<HashMap<String, AdaptiveSourceStats>>,
629 inflight_source_fetches: Mutex<HashMap<String, InflightSourceFetch>>,
631 peer_selector: RwLock<PeerSelector>,
633 peer_active_requests: RwLock<HashMap<String, usize>>,
635 peer_wire_stats: RwLock<HashMap<String, PeerWireStats>>,
637 pubsub_local_interests: RwLock<HashSet<String>>,
639 pubsub_local_interest_versions: RwLock<HashMap<String, u64>>,
641 pubsub_peer_interests: RwLock<HashMap<String, HashSet<String>>>,
643 pubsub_interest_routes: RwLock<HashMap<(String, String), String>>,
645 pubsub_interest_versions: RwLock<HashMap<(String, String), u64>>,
647 pubsub_seen_interests: Mutex<TimedSeenSet>,
649 pubsub_seen_frames: Mutex<TimedSeenSet>,
651 pubsub_seen_inventories: Mutex<TimedSeenSet>,
653 pubsub_seen_wants: Mutex<TimedSeenSet>,
655 pubsub_inventory_routes: RwLock<HashMap<String, String>>,
657 pubsub_want_routes: RwLock<HashMap<String, HashSet<String>>>,
659 pubsub_upstream_wants: Mutex<TimedSeenSet>,
661 pubsub_frame_cache: Mutex<VecDeque<(String, PubsubFrame)>>,
663 pubsub_inbox: Mutex<VecDeque<PubsubEvent>>,
665 pubsub_deferred_counts: RwLock<HashMap<(String, String), u64>>,
667 next_pubsub_interest_seq: AtomicU64,
669 pending_response_sends: Mutex<Vec<PendingResponseSend>>,
671 response_scheduler_running: AtomicBool,
673 next_response_job_id: AtomicU64,
675 routing: MeshRoutingConfig,
677 request_timeout: Duration,
679 debug: bool,
681 running: RwLock<bool>,
683}
684
685impl<S, R, F> MeshStoreCore<S, R, F>
686where
687 S: Store + Send + Sync + 'static,
688 R: SignalingTransport + Send + Sync + 'static,
689 F: PeerLinkFactory + Send + Sync + 'static,
690{
691 pub fn new(
693 local_store: Arc<S>,
694 signaling: Arc<MeshRouter<R, F>>,
695 request_timeout: Duration,
696 debug: bool,
697 ) -> Self {
698 Self::new_with_routing(
699 local_store,
700 signaling,
701 request_timeout,
702 debug,
703 Default::default(),
704 )
705 }
706
707 pub fn new_with_routing(
709 local_store: Arc<S>,
710 signaling: Arc<MeshRouter<R, F>>,
711 request_timeout: Duration,
712 debug: bool,
713 routing: MeshRoutingConfig,
714 ) -> Self {
715 let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
716 selector.set_fairness(routing.fairness_enabled);
717 selector.set_cashu_payment_weight(routing.cashu_payment_weight);
718 Self {
719 local_store,
720 signaling,
721 htl_configs: RwLock::new(HashMap::new()),
722 pending_requests: RwLock::new(HashMap::new()),
723 pending_quotes: RwLock::new(HashMap::new()),
724 pending_forward_requests: RwLock::new(HashMap::new()),
725 recent_forward_misses: Mutex::new(TimedSeenSet::new(
726 RECENT_FORWARD_MISS_CAPACITY,
727 Self::recent_forward_miss_ttl(request_timeout),
728 )),
729 issued_quotes: RwLock::new(HashMap::new()),
730 next_quote_id: RwLock::new(1),
731 read_sources: RwLock::new(HashMap::new()),
732 read_source_stats: RwLock::new(HashMap::new()),
733 inflight_source_fetches: Mutex::new(HashMap::new()),
734 peer_selector: RwLock::new(selector),
735 peer_active_requests: RwLock::new(HashMap::new()),
736 peer_wire_stats: RwLock::new(HashMap::new()),
737 pubsub_local_interests: RwLock::new(HashSet::new()),
738 pubsub_local_interest_versions: RwLock::new(HashMap::new()),
739 pubsub_peer_interests: RwLock::new(HashMap::new()),
740 pubsub_interest_routes: RwLock::new(HashMap::new()),
741 pubsub_interest_versions: RwLock::new(HashMap::new()),
742 pubsub_seen_interests: Mutex::new(TimedSeenSet::new(
743 PUBSUB_SEEN_CAPACITY,
744 PUBSUB_SEEN_TTL,
745 )),
746 pubsub_seen_frames: Mutex::new(TimedSeenSet::new(
747 PUBSUB_SEEN_CAPACITY,
748 PUBSUB_SEEN_TTL,
749 )),
750 pubsub_seen_inventories: Mutex::new(TimedSeenSet::new(
751 PUBSUB_SEEN_CAPACITY,
752 PUBSUB_SEEN_TTL,
753 )),
754 pubsub_seen_wants: Mutex::new(TimedSeenSet::new(PUBSUB_SEEN_CAPACITY, PUBSUB_SEEN_TTL)),
755 pubsub_inventory_routes: RwLock::new(HashMap::new()),
756 pubsub_want_routes: RwLock::new(HashMap::new()),
757 pubsub_upstream_wants: Mutex::new(TimedSeenSet::new(
758 PUBSUB_SEEN_CAPACITY,
759 PUBSUB_SEEN_TTL,
760 )),
761 pubsub_frame_cache: Mutex::new(VecDeque::new()),
762 pubsub_inbox: Mutex::new(VecDeque::new()),
763 pubsub_deferred_counts: RwLock::new(HashMap::new()),
764 next_pubsub_interest_seq: AtomicU64::new(1),
765 pending_response_sends: Mutex::new(Vec::new()),
766 response_scheduler_running: AtomicBool::new(false),
767 next_response_job_id: AtomicU64::new(1),
768 routing,
769 request_timeout,
770 debug,
771 running: RwLock::new(false),
772 }
773 }
774
775 fn recent_forward_miss_ttl(request_timeout: Duration) -> Duration {
776 let ttl_ms = request_timeout
777 .as_millis()
778 .saturating_mul(2)
779 .max(MIN_RECENT_FORWARD_MISS_TTL_MS as u128)
780 .min(u64::MAX as u128) as u64;
781 Duration::from_millis(ttl_ms)
782 }
783
784 pub async fn start(&self) -> Result<(), TransportError> {
786 *self.running.write().await = true;
787
788 self.signaling.send_hello(vec![]).await?;
790
791 Ok(())
792 }
793
794 pub async fn stop(&self) {
796 *self.running.write().await = false;
797 }
798
799 pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
801 let peer_id = msg.peer_id().to_string();
803 {
804 let mut configs = self.htl_configs.write().await;
805 if !configs.contains_key(&peer_id) {
806 configs.insert(peer_id.clone(), PeerHTLConfig::random());
807 }
808 }
809 self.peer_selector.write().await.add_peer(peer_id.clone());
810
811 let result = self.signaling.handle_message(msg).await;
812 if result.is_ok() {
813 self.announce_pubsub_interests_to_peer(&peer_id).await;
814 }
815 result
816 }
817
818 pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
820 &self.signaling
821 }
822
823 fn response_behavior(&self) -> ResponseBehaviorConfig {
824 self.routing.response_behavior.normalized()
825 }
826
827 async fn record_peer_wire_sent(&self, peer_id: &str, bytes: u64) {
828 if bytes == 0 {
829 return;
830 }
831 let mut stats = self.peer_wire_stats.write().await;
832 let entry = stats.entry(peer_id.to_string()).or_default();
833 entry.bytes_sent = entry.bytes_sent.saturating_add(bytes);
834 }
835
836 async fn record_peer_wire_received(&self, peer_id: &str, bytes: u64) {
837 if bytes == 0 {
838 return;
839 }
840 let mut stats = self.peer_wire_stats.write().await;
841 let entry = stats.entry(peer_id.to_string()).or_default();
842 entry.bytes_received = entry.bytes_received.saturating_add(bytes);
843 }
844
845 pub async fn record_useful_bytes_received_from_peer(&self, peer_id: &str, bytes: u64) {
850 if bytes == 0 {
851 return;
852 }
853 let mut stats = self.peer_wire_stats.write().await;
854 let entry = stats.entry(peer_id.to_string()).or_default();
855 entry.useful_bytes_received = entry.useful_bytes_received.saturating_add(bytes);
856 }
857
858 pub async fn peer_traffic_snapshot(&self, peer_id: &str) -> PeerTrafficSnapshot {
860 self.peer_wire_stats
861 .read()
862 .await
863 .get(peer_id)
864 .copied()
865 .unwrap_or_default()
866 }
867
868 pub async fn peer_traffic_snapshots(&self) -> HashMap<String, PeerTrafficSnapshot> {
870 self.peer_wire_stats.read().await.clone()
871 }
872
873 fn pubsub_key(origin_peer_id: &str, stream_id: &str, seq: u64) -> String {
874 format!("{origin_peer_id}:{stream_id}:{seq}")
875 }
876
877 fn pubsub_frame_key(frame: &PubsubFrame) -> String {
878 Self::pubsub_key(&frame.origin_peer_id, &frame.stream_id, frame.seq)
879 }
880
881 fn pubsub_interest_key(interest: &PubsubInterest) -> String {
882 format!(
883 "{}:{}:{}:{}",
884 interest.subscriber_peer_id, interest.stream_id, interest.seq, interest.active
885 )
886 }
887
888 fn next_pubsub_interest_seq(&self) -> u64 {
889 self.next_pubsub_interest_seq
890 .fetch_add(1, Ordering::Relaxed)
891 }
892
893 async fn record_peer_pubsub_wire_sent(&self, peer_id: &str, bytes: u64, bandwidth_debt: f64) {
894 if bytes == 0 {
895 return;
896 }
897 let mut stats = self.peer_wire_stats.write().await;
898 let entry = stats.entry(peer_id.to_string()).or_default();
899 entry.bytes_sent = entry.bytes_sent.saturating_add(bytes);
900 entry.bandwidth_debt = bandwidth_debt;
901 }
902
903 async fn send_pubsub_interest_to_peers(
904 &self,
905 interest: &PubsubInterest,
906 exclude_peer_id: Option<&str>,
907 ) -> PubsubPublishStats {
908 if !should_forward_htl(interest.htl) {
909 return PubsubPublishStats::default();
910 }
911
912 let mut peer_ids = self.signaling.peer_ids().await;
913 peer_ids.sort();
914 peer_ids.retain(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude));
915
916 let bytes = encode_pubsub_interest(interest);
917 let mut stats = PubsubPublishStats {
918 selected_peers: peer_ids.len(),
919 ..Default::default()
920 };
921 for peer_id in peer_ids {
922 let Some(channel) = self.signaling.get_channel(&peer_id).await else {
923 continue;
924 };
925 if channel.send(bytes.clone()).await.is_ok() {
926 stats.sent_peers += 1;
927 stats.sent_bytes = stats.sent_bytes.saturating_add(bytes.len() as u64);
928 self.record_peer_wire_sent(&peer_id, bytes.len() as u64)
929 .await;
930 }
931 }
932 stats
933 }
934
935 async fn announce_pubsub_interests_to_peer(&self, peer_id: &str) {
936 let mut interests = self
937 .pubsub_local_interests
938 .read()
939 .await
940 .iter()
941 .cloned()
942 .collect::<Vec<_>>();
943 interests.sort();
944 if interests.is_empty() {
945 return;
946 }
947
948 let interests = {
949 let versions = self.pubsub_local_interest_versions.read().await;
950 interests
951 .into_iter()
952 .filter_map(|stream_id| {
953 versions
954 .get(&stream_id)
955 .copied()
956 .map(|seq| (stream_id, seq))
957 })
958 .collect::<Vec<_>>()
959 };
960
961 for (stream_id, seq) in interests {
962 let interest = create_pubsub_interest(
963 stream_id,
964 self.signaling.peer_id().to_string(),
965 seq,
966 true,
967 MAX_HTL,
968 );
969 let Some(channel) = self.signaling.get_channel(peer_id).await else {
970 continue;
971 };
972 let bytes = encode_pubsub_interest(&interest);
973 if channel.send(bytes.clone()).await.is_ok() {
974 self.record_peer_wire_sent(peer_id, bytes.len() as u64)
975 .await;
976 }
977 }
978 }
979
980 fn remove_pubsub_peer_interest(
981 peer_interests: &mut HashMap<String, HashSet<String>>,
982 routes: &HashMap<(String, String), String>,
983 stream_id: &str,
984 peer_id: &str,
985 ) {
986 let still_has_route = routes
987 .iter()
988 .any(|((stream, _subscriber), peer)| stream == stream_id && peer == peer_id);
989 if still_has_route {
990 return;
991 }
992 if let Some(peers) = peer_interests.get_mut(stream_id) {
993 peers.remove(peer_id);
994 if peers.is_empty() {
995 peer_interests.remove(stream_id);
996 }
997 }
998 }
999
1000 async fn apply_pubsub_interest_route(
1001 &self,
1002 from_peer: &str,
1003 interest: &PubsubInterest,
1004 ) -> bool {
1005 if interest.stream_id.is_empty() || interest.subscriber_peer_id.is_empty() {
1006 return false;
1007 }
1008 if interest.subscriber_peer_id == self.signaling.peer_id() {
1009 return false;
1010 }
1011
1012 let interest_key = Self::pubsub_interest_key(interest);
1013 if !self
1014 .pubsub_seen_interests
1015 .lock()
1016 .await
1017 .insert_if_new(interest_key)
1018 {
1019 return false;
1020 }
1021
1022 let route_key = (
1023 interest.stream_id.clone(),
1024 interest.subscriber_peer_id.clone(),
1025 );
1026 {
1027 let mut versions = self.pubsub_interest_versions.write().await;
1028 if versions
1029 .get(&route_key)
1030 .is_some_and(|latest| *latest >= interest.seq)
1031 {
1032 return false;
1033 }
1034 versions.insert(route_key.clone(), interest.seq);
1035 }
1036
1037 let mut peer_interests = self.pubsub_peer_interests.write().await;
1038 let mut routes = self.pubsub_interest_routes.write().await;
1039 if interest.active {
1040 if let Some(previous_peer) = routes.insert(route_key, from_peer.to_string()) {
1041 if previous_peer != from_peer {
1042 Self::remove_pubsub_peer_interest(
1043 &mut peer_interests,
1044 &routes,
1045 &interest.stream_id,
1046 &previous_peer,
1047 );
1048 }
1049 }
1050 peer_interests
1051 .entry(interest.stream_id.clone())
1052 .or_default()
1053 .insert(from_peer.to_string());
1054 } else if let Some(previous_peer) = routes.remove(&route_key) {
1055 Self::remove_pubsub_peer_interest(
1056 &mut peer_interests,
1057 &routes,
1058 &interest.stream_id,
1059 &previous_peer,
1060 );
1061 } else {
1062 Self::remove_pubsub_peer_interest(
1063 &mut peer_interests,
1064 &routes,
1065 &interest.stream_id,
1066 from_peer,
1067 );
1068 }
1069
1070 true
1071 }
1072
1073 async fn interested_pubsub_peers(
1074 &self,
1075 stream_id: &str,
1076 exclude_peer_id: Option<&str>,
1077 ) -> Vec<String> {
1078 let connected = self
1079 .signaling
1080 .peer_ids()
1081 .await
1082 .into_iter()
1083 .collect::<HashSet<_>>();
1084 let mut peers = self
1085 .pubsub_peer_interests
1086 .read()
1087 .await
1088 .get(stream_id)
1089 .map(|peers| peers.iter().cloned().collect::<Vec<_>>())
1090 .unwrap_or_default();
1091 peers.retain(|peer_id| {
1092 connected.contains(peer_id) && exclude_peer_id.is_none_or(|exclude| peer_id != exclude)
1093 });
1094 peers.sort();
1095 peers
1096 }
1097
1098 async fn decrement_pubsub_htl_for_peer(&self, peer_id: &str, htl: u8) -> u8 {
1099 let htl_config = {
1100 let configs = self.htl_configs.read().await;
1101 configs
1102 .get(peer_id)
1103 .cloned()
1104 .unwrap_or_else(PeerHTLConfig::random)
1105 };
1106 htl_config.decrement_with_policy(htl, &MESH_EVENT_POLICY)
1107 }
1108
1109 async fn send_pubsub_inventory_to_peers(
1110 &self,
1111 inv: &PubsubInventory,
1112 peer_ids: &[String],
1113 ) -> PubsubPublishStats {
1114 if peer_ids.is_empty() || !should_forward_htl(inv.htl) {
1115 return PubsubPublishStats::default();
1116 }
1117
1118 let mut stats = PubsubPublishStats {
1119 selected_peers: peer_ids.len(),
1120 ..Default::default()
1121 };
1122 for peer_id in peer_ids {
1123 let send_htl = self.decrement_pubsub_htl_for_peer(peer_id, inv.htl).await;
1124 if !should_forward_htl(send_htl) {
1125 continue;
1126 }
1127 let Some(channel) = self.signaling.get_channel(peer_id).await else {
1128 continue;
1129 };
1130 let mut outgoing = inv.clone();
1131 outgoing.htl = send_htl;
1132 let bytes = encode_pubsub_inventory(&outgoing);
1133 let message_bytes = bytes.len() as u64;
1134 if channel.send(bytes).await.is_ok() {
1135 stats.sent_peers += 1;
1136 stats.sent_bytes = stats.sent_bytes.saturating_add(message_bytes);
1137 self.record_peer_wire_sent(peer_id, message_bytes).await;
1138 }
1139 }
1140 stats
1141 }
1142
1143 async fn flood_pubsub_inventory(
1144 &self,
1145 inv: &PubsubInventory,
1146 exclude_peer_id: Option<&str>,
1147 ) -> PubsubPublishStats {
1148 let mut peer_ids = self.signaling.peer_ids().await;
1149 peer_ids.sort();
1150 peer_ids.retain(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude));
1151 self.send_pubsub_inventory_to_peers(inv, &peer_ids).await
1152 }
1153
1154 async fn send_pubsub_want_to_peer(&self, want: &PubsubWant, peer_id: &str) -> bool {
1155 let Some(channel) = self.signaling.get_channel(peer_id).await else {
1156 return false;
1157 };
1158 let bytes = encode_pubsub_want(want);
1159 let message_bytes = bytes.len() as u64;
1160 match channel.send(bytes).await {
1161 Ok(()) => {
1162 self.record_peer_wire_sent(peer_id, message_bytes).await;
1163 true
1164 }
1165 Err(_) => false,
1166 }
1167 }
1168
1169 async fn send_pubsub_want_upstream(
1170 &self,
1171 key: &str,
1172 want: &PubsubWant,
1173 exclude_peer_id: Option<&str>,
1174 ) -> bool {
1175 let upstream = {
1176 let routes = self.pubsub_inventory_routes.read().await;
1177 routes.get(key).cloned()
1178 };
1179 let Some(upstream) = upstream else {
1180 return false;
1181 };
1182 if exclude_peer_id.is_some_and(|exclude| exclude == upstream) {
1183 return false;
1184 }
1185 let want_key = format!("{key}:{upstream}");
1186 if !self
1187 .pubsub_upstream_wants
1188 .lock()
1189 .await
1190 .insert_if_new(want_key)
1191 {
1192 return false;
1193 }
1194 self.send_pubsub_want_to_peer(want, &upstream).await
1195 }
1196
1197 async fn cache_pubsub_frame(&self, key: String, frame: PubsubFrame) {
1198 let mut cache = self.pubsub_frame_cache.lock().await;
1199 if let Some(index) = cache.iter().position(|(cached_key, _)| cached_key == &key) {
1200 cache.remove(index);
1201 }
1202 cache.push_back((key, frame));
1203 while cache.len() > PUBSUB_FRAME_CACHE_CAPACITY {
1204 cache.pop_front();
1205 }
1206 }
1207
1208 async fn cached_pubsub_frame(&self, key: &str) -> Option<PubsubFrame> {
1209 self.pubsub_frame_cache
1210 .lock()
1211 .await
1212 .iter()
1213 .find_map(|(cached_key, frame)| {
1214 if cached_key == key {
1215 Some(frame.clone())
1216 } else {
1217 None
1218 }
1219 })
1220 }
1221
1222 async fn remember_pubsub_want_peer(&self, key: String, from_peer: &str) -> bool {
1223 let mut routes = self.pubsub_want_routes.write().await;
1224 routes.entry(key).or_default().insert(from_peer.to_string())
1225 }
1226
1227 async fn take_pubsub_want_peers(
1228 &self,
1229 key: &str,
1230 exclude_peer_id: Option<&str>,
1231 ) -> Vec<String> {
1232 let connected = self
1233 .signaling
1234 .peer_ids()
1235 .await
1236 .into_iter()
1237 .collect::<HashSet<_>>();
1238 let mut peers = self
1239 .pubsub_want_routes
1240 .write()
1241 .await
1242 .remove(key)
1243 .map(|peers| peers.into_iter().collect::<Vec<_>>())
1244 .unwrap_or_default();
1245 peers.retain(|peer_id| {
1246 connected.contains(peer_id) && exclude_peer_id.is_none_or(|exclude| peer_id != exclude)
1247 });
1248 peers.sort();
1249 peers
1250 }
1251
1252 async fn select_pubsub_peers(
1253 &self,
1254 stream_id: &str,
1255 seq: u64,
1256 message_bytes: u64,
1257 peer_ids: &[String],
1258 ) -> (Vec<String>, Vec<String>) {
1259 let traffic = self.peer_wire_stats.read().await;
1260 let deferred_counts = self.pubsub_deferred_counts.read().await;
1261 let candidates = peer_ids
1262 .iter()
1263 .map(|peer_id| PubsubCandidate {
1264 peer_id: peer_id.clone(),
1265 traffic: traffic.get(peer_id).copied().unwrap_or_default(),
1266 deferred_count: deferred_counts
1267 .get(&(stream_id.to_string(), peer_id.clone()))
1268 .copied()
1269 .unwrap_or_default(),
1270 })
1271 .collect::<Vec<_>>();
1272 drop(deferred_counts);
1273 drop(traffic);
1274
1275 let selection = self.routing.pubsub_scheduler.select(
1276 stream_id,
1277 seq,
1278 self.signaling.peer_id(),
1279 message_bytes,
1280 &candidates,
1281 );
1282
1283 {
1284 let mut deferred_counts = self.pubsub_deferred_counts.write().await;
1285 for peer_id in &selection.deferred {
1286 *deferred_counts
1287 .entry((stream_id.to_string(), peer_id.clone()))
1288 .or_insert(0) += 1;
1289 }
1290 for peer_id in &selection.selected {
1291 deferred_counts.remove(&(stream_id.to_string(), peer_id.clone()));
1292 }
1293 }
1294
1295 (selection.selected, selection.deferred)
1296 }
1297
1298 async fn send_pubsub_frame_to_peers(
1299 &self,
1300 frame: &PubsubFrame,
1301 peer_ids: &[String],
1302 ) -> PubsubPublishStats {
1303 if peer_ids.is_empty() || !should_forward_htl(frame.htl) {
1304 return PubsubPublishStats::default();
1305 }
1306
1307 let bytes = encode_pubsub_frame(frame);
1308 let message_bytes = bytes.len() as u64;
1309 let (selected, deferred) = self
1310 .select_pubsub_peers(&frame.stream_id, frame.seq, message_bytes, peer_ids)
1311 .await;
1312 let mut stats = PubsubPublishStats {
1313 selected_peers: selected.len(),
1314 deferred_peers: deferred.len(),
1315 ..Default::default()
1316 };
1317
1318 for peer_id in selected {
1319 let Some(channel) = self.signaling.get_channel(&peer_id).await else {
1320 continue;
1321 };
1322 let snapshot = self.peer_traffic_snapshot(&peer_id).await;
1323 let bandwidth_debt = reciprocal_virtual_finish(snapshot, message_bytes);
1324 if channel.send(bytes.clone()).await.is_ok() {
1325 stats.sent_peers += 1;
1326 stats.sent_bytes = stats.sent_bytes.saturating_add(message_bytes);
1327 self.record_peer_pubsub_wire_sent(&peer_id, message_bytes, bandwidth_debt)
1328 .await;
1329 }
1330 }
1331
1332 stats
1333 }
1334
1335 async fn enqueue_pubsub_event(&self, event: PubsubEvent) {
1336 let mut inbox = self.pubsub_inbox.lock().await;
1337 inbox.push_back(event);
1338 while inbox.len() > PUBSUB_INBOX_CAPACITY {
1339 inbox.pop_front();
1340 }
1341 }
1342
1343 pub async fn subscribe_pubsub(
1345 self: &Arc<Self>,
1346 stream_id: impl Into<String>,
1347 ) -> PubsubPublishStats {
1348 let stream_id = stream_id.into();
1349 if stream_id.is_empty() {
1350 return PubsubPublishStats::default();
1351 }
1352 self.pubsub_local_interests
1353 .write()
1354 .await
1355 .insert(stream_id.clone());
1356 let seq = {
1357 let mut versions = self.pubsub_local_interest_versions.write().await;
1358 match versions.get(&stream_id).copied() {
1359 Some(seq) => seq,
1360 None => {
1361 let seq = self.next_pubsub_interest_seq();
1362 versions.insert(stream_id.clone(), seq);
1363 seq
1364 }
1365 }
1366 };
1367 let interest = create_pubsub_interest(
1368 stream_id,
1369 self.signaling.peer_id().to_string(),
1370 seq,
1371 true,
1372 MAX_HTL,
1373 );
1374 self.send_pubsub_interest_to_peers(&interest, None).await
1375 }
1376
1377 pub async fn unsubscribe_pubsub(
1379 self: &Arc<Self>,
1380 stream_id: impl Into<String>,
1381 ) -> PubsubPublishStats {
1382 let stream_id = stream_id.into();
1383 if stream_id.is_empty() {
1384 return PubsubPublishStats::default();
1385 }
1386 self.pubsub_local_interests.write().await.remove(&stream_id);
1387 self.pubsub_local_interest_versions
1388 .write()
1389 .await
1390 .remove(&stream_id);
1391 let interest = create_pubsub_interest(
1392 stream_id,
1393 self.signaling.peer_id().to_string(),
1394 self.next_pubsub_interest_seq(),
1395 false,
1396 MAX_HTL,
1397 );
1398 self.send_pubsub_interest_to_peers(&interest, None).await
1399 }
1400
1401 pub async fn publish_pubsub(
1403 self: &Arc<Self>,
1404 stream_id: impl Into<String>,
1405 seq: u64,
1406 payload: Vec<u8>,
1407 ) -> PubsubPublishStats {
1408 let stream_id = stream_id.into();
1409 if stream_id.is_empty() {
1410 return PubsubPublishStats::default();
1411 }
1412 let payload_bytes = payload.len() as u64;
1413 let frame = create_pubsub_frame(
1414 stream_id.clone(),
1415 seq,
1416 self.signaling.peer_id().to_string(),
1417 payload.clone(),
1418 MAX_HTL,
1419 );
1420 let frame_key = Self::pubsub_frame_key(&frame);
1421 self.pubsub_seen_frames
1422 .lock()
1423 .await
1424 .insert_if_new(frame_key.clone());
1425 self.cache_pubsub_frame(frame_key, frame.clone()).await;
1426
1427 if self
1428 .pubsub_local_interests
1429 .read()
1430 .await
1431 .contains(&stream_id)
1432 {
1433 self.enqueue_pubsub_event(PubsubEvent {
1434 stream_id: stream_id.clone(),
1435 seq,
1436 origin_peer_id: self.signaling.peer_id().to_string(),
1437 from_peer_id: self.signaling.peer_id().to_string(),
1438 payload,
1439 })
1440 .await;
1441 }
1442
1443 match self.routing.pubsub_delivery_mode {
1444 PubsubDeliveryMode::InterestPush => {
1445 let peers = self.interested_pubsub_peers(&stream_id, None).await;
1446 self.send_pubsub_frame_to_peers(&frame, &peers).await
1447 }
1448 PubsubDeliveryMode::HtlInvWant => {
1449 let inv = create_pubsub_inventory(
1450 stream_id,
1451 seq,
1452 self.signaling.peer_id().to_string(),
1453 payload_bytes,
1454 MESH_EVENT_POLICY.max_htl,
1455 );
1456 self.flood_pubsub_inventory(&inv, None).await
1457 }
1458 }
1459 }
1460
1461 pub async fn drain_pubsub_events(&self) -> Vec<PubsubEvent> {
1463 self.pubsub_inbox.lock().await.drain(..).collect()
1464 }
1465
1466 pub async fn pubsub_interest_peers(&self, stream_id: &str) -> Vec<String> {
1468 self.interested_pubsub_peers(stream_id, None).await
1469 }
1470
1471 fn choose_ready_response_job(
1472 ready_jobs: &[(u64, String, usize, Instant, u64)],
1473 stats: &HashMap<String, PeerWireStats>,
1474 ) -> Option<(u64, f64)> {
1475 let jobs = ready_jobs
1476 .iter()
1477 .map(|job| OutboundJobCandidate {
1478 job_id: job.0,
1479 peer_id: job.1.clone(),
1480 message_bytes: job.2 as u64,
1481 queue_sequence: job.4,
1482 })
1483 .collect::<Vec<_>>();
1484 select_reciprocal_outbound_job(&jobs, |peer_id| {
1485 stats.get(peer_id).copied().unwrap_or_default()
1486 })
1487 .map(|choice| (choice.job_id, choice.virtual_finish))
1488 }
1489
1490 async fn enqueue_response_send(
1491 self: &Arc<Self>,
1492 peer_id: String,
1493 bytes: Vec<u8>,
1494 ready_at: Instant,
1495 ) {
1496 let job_id = self.next_response_job_id.fetch_add(1, Ordering::Relaxed);
1497 {
1498 let mut queue = self.pending_response_sends.lock().await;
1499 queue.push(PendingResponseSend {
1500 job_id,
1501 peer_id,
1502 bytes,
1503 ready_at,
1504 queue_sequence: job_id,
1505 });
1506 }
1507
1508 if self
1509 .response_scheduler_running
1510 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1511 .is_ok()
1512 {
1513 let this = Arc::clone(self);
1514 tokio::spawn(async move {
1515 this.run_response_scheduler().await;
1516 });
1517 }
1518 }
1519
1520 async fn run_response_scheduler(self: Arc<Self>) {
1521 loop {
1522 let snapshot = {
1523 let queue = self.pending_response_sends.lock().await;
1524 if queue.is_empty() {
1525 self.response_scheduler_running
1526 .store(false, Ordering::Release);
1527 return;
1528 }
1529 queue
1530 .iter()
1531 .map(|job| {
1532 (
1533 job.job_id,
1534 job.peer_id.clone(),
1535 job.bytes.len(),
1536 job.ready_at,
1537 job.queue_sequence,
1538 )
1539 })
1540 .collect::<Vec<_>>()
1541 };
1542
1543 let now = Instant::now();
1544 let mut earliest_ready_at: Option<Instant> = None;
1545 let mut ready_jobs = Vec::new();
1546 for job in &snapshot {
1547 if job.3 <= now {
1548 ready_jobs.push(job.clone());
1549 } else {
1550 earliest_ready_at = Some(match earliest_ready_at {
1551 Some(current) => current.min(job.3),
1552 None => job.3,
1553 });
1554 }
1555 }
1556
1557 if ready_jobs.is_empty() {
1558 if let Some(ready_at) = earliest_ready_at {
1559 tokio::time::sleep(ready_at.saturating_duration_since(Instant::now())).await;
1560 continue;
1561 }
1562 self.response_scheduler_running
1563 .store(false, Ordering::Release);
1564 return;
1565 }
1566
1567 let (selected_job_id, selected_finish) = {
1568 let stats = self.peer_wire_stats.read().await;
1569 Self::choose_ready_response_job(&ready_jobs, &stats).expect("ready response job")
1570 };
1571
1572 let selected = {
1573 let mut queue = self.pending_response_sends.lock().await;
1574 let Some(index) = queue.iter().position(|job| job.job_id == selected_job_id) else {
1575 continue;
1576 };
1577 queue.swap_remove(index)
1578 };
1579
1580 let sent = if let Some(channel) = self.signaling.get_channel(&selected.peer_id).await {
1581 channel.send(selected.bytes.clone()).await.is_ok()
1582 } else {
1583 false
1584 };
1585
1586 let queued_peers = {
1587 let queue = self.pending_response_sends.lock().await;
1588 queue
1589 .iter()
1590 .map(|job| job.peer_id.clone())
1591 .collect::<HashSet<_>>()
1592 };
1593 let mut stats = self.peer_wire_stats.write().await;
1594 let entry = stats.entry(selected.peer_id.clone()).or_default();
1595 if sent {
1596 entry.bytes_sent = entry.bytes_sent.saturating_add(selected.bytes.len() as u64);
1597 entry.bandwidth_debt = selected_finish;
1598 }
1599 if queued_peers.is_empty() {
1600 for peer_stats in stats.values_mut() {
1601 peer_stats.bandwidth_debt = 0.0;
1602 }
1603 } else {
1604 let floor = queued_peers
1605 .iter()
1606 .filter_map(|peer_id| stats.get(peer_id).map(|peer| peer.bandwidth_debt))
1607 .fold(f64::INFINITY, f64::min);
1608 if floor.is_finite() && floor > 0.0 {
1609 for peer_id in queued_peers {
1610 if let Some(peer_stats) = stats.get_mut(&peer_id) {
1611 peer_stats.bandwidth_debt =
1612 (peer_stats.bandwidth_debt - floor).max(0.0);
1613 }
1614 }
1615 }
1616 }
1617 }
1618 }
1619
1620 fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
1621 let mut hasher = DefaultHasher::new();
1622 peer_id.hash(&mut hasher);
1623 hash.hash(&mut hasher);
1624 salt.hash(&mut hasher);
1625 let v = hasher.finish();
1626 (v as f64) / (u64::MAX as f64)
1627 }
1628
1629 fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
1630 Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
1631 }
1632
1633 fn peer_metadata_pointer_slot_hash() -> Hash {
1634 hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
1635 }
1636
1637 fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
1638 let bytes = hex::decode(hash_hex)
1639 .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
1640 if bytes.len() != 32 {
1641 return Err(StoreError::Other(format!(
1642 "Invalid hash length {}, expected 32 bytes",
1643 bytes.len()
1644 )));
1645 }
1646 let mut hash = [0u8; 32];
1647 hash.copy_from_slice(&bytes);
1648 Ok(hash)
1649 }
1650
1651 fn should_drop_response(&self, hash: &Hash) -> bool {
1652 let p = self.response_behavior().drop_response_prob;
1653 if p <= 0.0 {
1654 return false;
1655 }
1656 self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
1657 }
1658
1659 fn should_corrupt_response(&self, hash: &Hash) -> bool {
1660 let p = self.response_behavior().corrupt_response_prob;
1661 if p <= 0.0 {
1662 return false;
1663 }
1664 self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
1665 }
1666
1667 fn should_stall_response(&self, hash: &Hash) -> bool {
1668 let p = self.response_behavior().stall_response_prob;
1669 if p <= 0.0 {
1670 return false;
1671 }
1672 self.deterministic_actor_draw(hash, 0x5A_11_5A_11_5A_11_5A_11) < p
1673 }
1674
1675 fn response_send_delay(&self, hash: &Hash, payload_len: usize) -> Duration {
1676 let behavior = self.response_behavior();
1677 let mut total_ms = behavior
1678 .extra_delay_ms
1679 .saturating_add(behavior.first_byte_delay_ms);
1680
1681 if behavior.bytes_per_second > 0 && payload_len > 0 {
1682 let throughput_ms = ((payload_len as u128) * 1000)
1683 .div_ceil(behavior.bytes_per_second as u128)
1684 .min(u64::MAX as u128) as u64;
1685 total_ms = total_ms.saturating_add(throughput_ms);
1686 }
1687
1688 if behavior.stall_delay_ms > 0 && self.should_stall_response(hash) {
1689 total_ms = total_ms.saturating_add(behavior.stall_delay_ms);
1690 }
1691
1692 Duration::from_millis(total_ms)
1693 }
1694
1695 async fn ordered_connected_peers(&self, exclude_peer_id: Option<&str>) -> Vec<String> {
1696 let current_peer_ids = self.signaling.peer_ids().await;
1697 if current_peer_ids.is_empty() {
1698 return Vec::new();
1699 }
1700
1701 sync_selector_peers(&self.peer_selector, ¤t_peer_ids).await;
1702 let hash_get_peer_ids: HashSet<String> = self
1703 .signaling
1704 .hash_get_peer_ids()
1705 .await
1706 .into_iter()
1707 .collect();
1708 let mut candidate_peer_ids: Vec<String> = current_peer_ids
1709 .into_iter()
1710 .filter(|peer_id| hash_get_peer_ids.contains(peer_id))
1711 .filter(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude))
1712 .collect();
1713 if candidate_peer_ids.is_empty() {
1714 return Vec::new();
1715 }
1716
1717 let current_set: HashSet<&str> = candidate_peer_ids.iter().map(String::as_str).collect();
1718 let mut selector = self.peer_selector.write().await;
1719 let mut selector_order = selector.select_peers();
1720 selector_order.retain(|peer_id| current_set.contains(peer_id.as_str()));
1721 if selector_order.is_empty() {
1722 let mut fallback = candidate_peer_ids;
1723 fallback.sort();
1724 return fallback;
1725 }
1726 let backed_off: HashMap<String, bool> = candidate_peer_ids
1727 .iter()
1728 .map(|peer_id| (peer_id.clone(), selector.is_peer_backed_off(peer_id)))
1729 .collect();
1730 drop(selector);
1731
1732 let rank: HashMap<&str, usize> = selector_order
1733 .iter()
1734 .enumerate()
1735 .map(|(idx, peer_id)| (peer_id.as_str(), idx))
1736 .collect();
1737 let active = self.peer_active_requests.read().await;
1738 candidate_peer_ids.sort_by(|left, right| {
1739 let left_backed_off = backed_off.get(left).copied().unwrap_or(false);
1740 let right_backed_off = backed_off.get(right).copied().unwrap_or(false);
1741 if left_backed_off != right_backed_off {
1742 return if left_backed_off {
1743 std::cmp::Ordering::Greater
1744 } else {
1745 std::cmp::Ordering::Less
1746 };
1747 }
1748 let left_rank = rank.get(left.as_str()).copied().unwrap_or(usize::MAX / 2);
1749 let right_rank = rank.get(right.as_str()).copied().unwrap_or(usize::MAX / 2);
1750 let left_load = active.get(left).copied().unwrap_or(0);
1751 let right_load = active.get(right).copied().unwrap_or(0);
1752 (left_rank + left_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY))
1753 .cmp(&(right_rank + right_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY)))
1754 .then_with(|| left.cmp(right))
1755 });
1756 candidate_peer_ids
1757 }
1758
1759 async fn reserve_peer_request(&self, peer_id: &str) {
1760 let mut active = self.peer_active_requests.write().await;
1761 *active.entry(peer_id.to_string()).or_insert(0) += 1;
1762 }
1763
1764 async fn release_peer_request(&self, peer_id: &str) {
1765 let mut active = self.peer_active_requests.write().await;
1766 let Some(count) = active.get_mut(peer_id) else {
1767 return;
1768 };
1769 if *count <= 1 {
1770 active.remove(peer_id);
1771 } else {
1772 *count -= 1;
1773 }
1774 }
1775
1776 async fn release_queried_peer_requests(&self, peer_ids: &[String]) {
1777 for peer_id in peer_ids {
1778 self.release_peer_request(peer_id).await;
1779 }
1780 }
1781
1782 fn requested_quote_mint(&self) -> Option<&str> {
1783 if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
1784 if self.routing.cashu_accepted_mints.is_empty()
1785 || self
1786 .routing
1787 .cashu_accepted_mints
1788 .iter()
1789 .any(|mint| mint == default_mint)
1790 {
1791 return Some(default_mint);
1792 }
1793 }
1794
1795 self.routing
1796 .cashu_accepted_mints
1797 .first()
1798 .map(String::as_str)
1799 }
1800
1801 fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
1802 if let Some(requested_mint) = requested_mint {
1803 if self.accepts_quote_mint(Some(requested_mint)) {
1804 return Some(requested_mint.to_string());
1805 }
1806 }
1807 if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
1808 return Some(default_mint.clone());
1809 }
1810 if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
1811 return Some(first_mint.clone());
1812 }
1813 requested_mint.map(str::to_string)
1814 }
1815
1816 fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
1817 if self.routing.cashu_accepted_mints.is_empty() {
1818 return true;
1819 }
1820
1821 let Some(mint_url) = mint_url else {
1822 return false;
1823 };
1824 self.routing
1825 .cashu_accepted_mints
1826 .iter()
1827 .any(|mint| mint == mint_url)
1828 }
1829
1830 fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
1831 let Some(mint_url) = mint_url else {
1832 return self.routing.cashu_default_mint.is_none()
1833 && self.routing.cashu_accepted_mints.is_empty();
1834 };
1835 self.routing.cashu_default_mint.as_deref() == Some(mint_url)
1836 || self
1837 .routing
1838 .cashu_accepted_mints
1839 .iter()
1840 .any(|mint| mint == mint_url)
1841 }
1842
1843 async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
1844 let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
1845 if base == 0 {
1846 return 0;
1847 }
1848
1849 let selector = self.peer_selector.read().await;
1850 let Some(stats) = selector.get_stats(peer_id) else {
1851 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
1852 return if max_cap > 0 { base.min(max_cap) } else { base };
1853 };
1854
1855 if stats.cashu_payment_defaults > 0
1856 && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
1857 {
1858 return 0;
1859 }
1860
1861 let success_bonus = stats
1862 .successes
1863 .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
1864 let receipt_bonus = stats
1865 .cashu_payment_receipts
1866 .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
1867 let mut cap = base
1868 .saturating_add(success_bonus)
1869 .saturating_add(receipt_bonus);
1870 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
1871 if max_cap > 0 {
1872 cap = cap.min(max_cap);
1873 }
1874 cap
1875 }
1876
1877 async fn should_accept_quote_response(
1878 &self,
1879 from_peer: &str,
1880 preferred_mint_url: Option<&str>,
1881 offered_payment_sat: u64,
1882 res: &DataQuoteResponse,
1883 ) -> bool {
1884 let Some(payment_sat) = res.p else {
1885 return false;
1886 };
1887 if payment_sat > offered_payment_sat {
1888 return false;
1889 }
1890
1891 let response_mint = res.m.as_deref();
1892 if response_mint == preferred_mint_url {
1893 return true;
1894 }
1895 if self.trusts_quote_mint(response_mint) {
1896 return true;
1897 }
1898 if response_mint.is_none() {
1899 return false;
1900 }
1901
1902 payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
1903 }
1904
1905 async fn issue_quote(
1906 &self,
1907 peer_id: &str,
1908 hash_key: &str,
1909 payment_sat: u64,
1910 ttl_ms: u32,
1911 mint_url: Option<&str>,
1912 ) -> u64 {
1913 let quote_id = {
1914 let mut next = self.next_quote_id.write().await;
1915 let quote_id = *next;
1916 *next = next.saturating_add(1);
1917 quote_id
1918 };
1919
1920 let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
1921 self.issued_quotes.write().await.insert(
1922 (peer_id.to_string(), hash_key.to_string(), quote_id),
1923 IssuedQuote {
1924 expires_at,
1925 payment_sat,
1926 mint_url: mint_url.map(str::to_string),
1927 },
1928 );
1929 quote_id
1930 }
1931
1932 async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
1933 let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
1934 let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
1935 return false;
1936 };
1937 quote.expires_at > Instant::now()
1938 }
1939
1940 async fn send_request_to_peer(
1941 &self,
1942 peer_id: &str,
1943 hash: &Hash,
1944 request_htl: u8,
1945 quote_id: Option<u64>,
1946 ) -> bool {
1947 if !should_forward_htl(request_htl) {
1948 return false;
1949 }
1950
1951 let channel = match self.signaling.get_channel(peer_id).await {
1952 Some(c) => c,
1953 None => return false,
1954 };
1955
1956 let htl_config = {
1957 let configs = self.htl_configs.read().await;
1958 configs
1959 .get(peer_id)
1960 .cloned()
1961 .unwrap_or_else(PeerHTLConfig::random)
1962 };
1963
1964 let send_htl = htl_config.decrement(request_htl);
1965 let req = match quote_id {
1966 Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
1967 None => create_request(hash, send_htl),
1968 };
1969 let request_bytes = encode_request(&req);
1970 let request_len = request_bytes.len() as u64;
1971
1972 {
1973 let mut selector = self.peer_selector.write().await;
1974 selector.record_request(peer_id, request_len);
1975 }
1976
1977 match channel.send(request_bytes).await {
1978 Ok(()) => {
1979 self.record_peer_wire_sent(peer_id, request_len).await;
1980 true
1981 }
1982 Err(_) => {
1983 self.peer_selector.write().await.record_failure(peer_id);
1984 false
1985 }
1986 }
1987 }
1988
1989 async fn send_quote_request_to_peer(
1990 &self,
1991 peer_id: &str,
1992 hash: &Hash,
1993 payment_sat: u64,
1994 ttl_ms: u32,
1995 mint_url: Option<&str>,
1996 ) -> bool {
1997 let channel = match self.signaling.get_channel(peer_id).await {
1998 Some(c) => c,
1999 None => return false,
2000 };
2001
2002 let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
2003 let request_bytes = encode_quote_request(&req);
2004 let request_len = request_bytes.len() as u64;
2005
2006 match channel.send(request_bytes).await {
2007 Ok(()) => {
2008 self.record_peer_wire_sent(peer_id, request_len).await;
2009 true
2010 }
2011 Err(_) => false,
2012 }
2013 }
2014
2015 pub async fn set_read_sources(&self, sources: Vec<Arc<dyn MeshReadSource>>) {
2016 let mut by_id = HashMap::new();
2017 let mut stats = self.read_source_stats.write().await;
2018 for source in sources {
2019 let source_id = source.id().to_string();
2020 by_id.insert(source_id.clone(), source);
2021 stats
2022 .entry(source_id)
2023 .or_insert_with(AdaptiveSourceStats::default);
2024 }
2025 *self.read_sources.write().await = by_id;
2026 }
2027
2028 async fn record_read_source_request(&self, source_id: &str) {
2029 let mut stats = self.read_source_stats.write().await;
2030 stats
2031 .entry(source_id.to_string())
2032 .or_insert_with(AdaptiveSourceStats::default)
2033 .requests += 1;
2034 }
2035
2036 async fn record_read_source_miss(&self, source_id: &str) {
2037 let mut stats = self.read_source_stats.write().await;
2038 stats
2039 .entry(source_id.to_string())
2040 .or_insert_with(AdaptiveSourceStats::default)
2041 .misses += 1;
2042 }
2043
2044 async fn record_read_source_success(&self, source_id: &str, elapsed_ms: u64) {
2045 let now = Instant::now();
2046 let mut stats = self.read_source_stats.write().await;
2047 let stats = stats
2048 .entry(source_id.to_string())
2049 .or_insert_with(AdaptiveSourceStats::default);
2050 stats.successes += 1;
2051 stats.last_success_at = Some(now);
2052 stats.backoff_level = 0;
2053 stats.backed_off_until = None;
2054 if stats.srtt_ms <= 0.0 {
2055 stats.srtt_ms = elapsed_ms as f64;
2056 stats.rttvar_ms = elapsed_ms as f64 / 2.0;
2057 return;
2058 }
2059 let elapsed = elapsed_ms as f64;
2060 stats.rttvar_ms = 0.75 * stats.rttvar_ms + 0.25 * (stats.srtt_ms - elapsed).abs();
2061 stats.srtt_ms = 0.875 * stats.srtt_ms + 0.125 * elapsed;
2062 }
2063
2064 async fn record_read_source_failure(&self, source_id: &str) {
2065 let now = Instant::now();
2066 let mut stats = self.read_source_stats.write().await;
2067 let stats = stats
2068 .entry(source_id.to_string())
2069 .or_insert_with(AdaptiveSourceStats::default);
2070 stats.failures += 1;
2071 stats.last_failure_at = Some(now);
2072 Self::apply_source_backoff(stats, now);
2073 }
2074
2075 async fn record_read_source_timeout(&self, source_id: &str) {
2076 let now = Instant::now();
2077 let mut stats = self.read_source_stats.write().await;
2078 let stats = stats
2079 .entry(source_id.to_string())
2080 .or_insert_with(AdaptiveSourceStats::default);
2081 stats.timeouts += 1;
2082 stats.last_failure_at = Some(now);
2083 Self::apply_source_backoff(stats, now);
2084 }
2085
2086 fn apply_source_backoff(stats: &mut AdaptiveSourceStats, now: Instant) {
2087 stats.backoff_level = stats.backoff_level.saturating_add(1);
2088 let backoff_ms = (INITIAL_SOURCE_BACKOFF_MS
2089 .saturating_mul(2u64.saturating_pow(stats.backoff_level.saturating_sub(1))))
2090 .min(MAX_SOURCE_BACKOFF_MS);
2091 stats.backed_off_until = Some(now + Duration::from_millis(backoff_ms));
2092 }
2093
2094 async fn ordered_read_sources(&self) -> Vec<Arc<dyn MeshReadSource>> {
2095 let sources = self.read_sources.read().await;
2096 if sources.is_empty() {
2097 return Vec::new();
2098 }
2099
2100 let mut available: Vec<Arc<dyn MeshReadSource>> = sources
2101 .values()
2102 .filter(|source| source.is_available())
2103 .cloned()
2104 .collect();
2105 if available.is_empty() {
2106 return Vec::new();
2107 }
2108
2109 let now = Instant::now();
2110 let stats = self.read_source_stats.read().await;
2111 let mut healthy: Vec<Arc<dyn MeshReadSource>> = available
2112 .iter()
2113 .filter(|source| {
2114 stats
2115 .get(source.id())
2116 .and_then(|s| s.backed_off_until)
2117 .is_none_or(|until| until <= now)
2118 })
2119 .cloned()
2120 .collect();
2121 if !healthy.is_empty() {
2122 available = std::mem::take(&mut healthy);
2123 }
2124
2125 available.sort_by(|left, right| {
2126 let left_stats = stats.get(left.id()).cloned().unwrap_or_default();
2127 let right_stats = stats.get(right.id()).cloned().unwrap_or_default();
2128 adaptive_source_score(&right_stats, now)
2129 .partial_cmp(&adaptive_source_score(&left_stats, now))
2130 .unwrap_or(std::cmp::Ordering::Equal)
2131 .then_with(|| left.id().cmp(right.id()))
2132 });
2133 available
2134 }
2135
2136 async fn should_probe_multiple_read_sources(
2137 &self,
2138 ordered_sources: &[Arc<dyn MeshReadSource>],
2139 ) -> bool {
2140 if ordered_sources.len() <= 1 {
2141 return false;
2142 }
2143 let stats = self.read_source_stats.read().await;
2144 let best = stats
2145 .get(ordered_sources[0].id())
2146 .cloned()
2147 .unwrap_or_default();
2148 let second = stats
2149 .get(ordered_sources[1].id())
2150 .cloned()
2151 .unwrap_or_default();
2152 if !source_has_history(&best) || !source_has_history(&second) {
2153 return false;
2154 }
2155 let now = Instant::now();
2156 adaptive_source_score(&best, now) - adaptive_source_score(&second, now)
2157 < SOURCE_SCORE_TIE_DELTA
2158 }
2159
2160 async fn source_dispatch_for(&self, source_count: usize) -> RequestDispatchConfig {
2161 if source_count == 0 {
2162 return self.routing.dispatch;
2163 }
2164 let ordered_sources = self.ordered_read_sources().await;
2165 let probe_multiple = self
2166 .should_probe_multiple_read_sources(&ordered_sources)
2167 .await;
2168 let initial_fanout = if probe_multiple {
2169 source_count.min(2)
2170 } else {
2171 1
2172 };
2173 RequestDispatchConfig {
2174 initial_fanout,
2175 hedge_fanout: self.routing.dispatch.hedge_fanout,
2176 max_fanout: self.routing.dispatch.max_fanout.min(source_count),
2177 hedge_interval_ms: self.routing.dispatch.hedge_interval_ms,
2178 }
2179 }
2180
2181 pub async fn peer_count(&self) -> usize {
2183 self.signaling.peer_count().await
2184 }
2185
2186 pub async fn peer_ids(&self) -> Vec<String> {
2188 self.signaling.peer_ids().await
2189 }
2190
2191 pub async fn needs_peers(&self) -> bool {
2193 self.signaling.needs_peers().await
2194 }
2195
2196 pub async fn send_hello(&self) -> Result<(), TransportError> {
2198 self.signaling.send_hello(vec![]).await
2199 }
2200
2201 pub async fn drain_available_data_messages(self: &Arc<Self>) -> DataPumpStats {
2206 let mut stats = DataPumpStats::default();
2207 let peer_ids = self.signaling.peer_ids().await;
2208 for peer_id in peer_ids {
2209 let Some(channel) = self.signaling.get_channel(&peer_id).await else {
2210 continue;
2211 };
2212
2213 while let Some(data) = channel.try_recv() {
2214 stats.processed += 1;
2215 stats.processed_bytes += data.len() as u64;
2216 if let Some(msg) = parse_message(&data) {
2217 match msg {
2218 DataMessage::Request(_) => stats.request_messages += 1,
2219 DataMessage::Response(_) => stats.response_messages += 1,
2220 DataMessage::QuoteRequest(_) => stats.quote_request_messages += 1,
2221 DataMessage::QuoteResponse(_) => stats.quote_response_messages += 1,
2222 DataMessage::PubsubInterest(_) => stats.pubsub_interest_messages += 1,
2223 DataMessage::PubsubFrame(_) => stats.pubsub_frame_messages += 1,
2224 DataMessage::PubsubInventory(_) => stats.pubsub_inventory_messages += 1,
2225 DataMessage::PubsubWant(_) => stats.pubsub_want_messages += 1,
2226 DataMessage::Payment(_)
2227 | DataMessage::PaymentAck(_)
2228 | DataMessage::Chunk(_)
2229 | DataMessage::PeerHints(_) => {}
2230 }
2231 }
2232 self.handle_data_message(&peer_id, &data).await;
2233 }
2234 }
2235 stats
2236 }
2237
2238 pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
2240 self.peer_selector
2241 .write()
2242 .await
2243 .record_cashu_payment(peer_id, amount_sat);
2244 }
2245
2246 pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
2248 self.peer_selector
2249 .write()
2250 .await
2251 .record_cashu_receipt(peer_id, amount_sat);
2252 }
2253
2254 pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
2256 self.peer_selector
2257 .write()
2258 .await
2259 .record_cashu_payment_default(peer_id);
2260 }
2261
2262 pub async fn selector_summary(&self) -> crate::peer_selector::SelectorSummary {
2264 self.peer_selector.read().await.summary()
2265 }
2266
2267 fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
2268 selector.is_peer_blocked_for_payment_defaults(
2269 peer_id,
2270 self.routing.cashu_payment_default_block_threshold,
2271 )
2272 }
2273
2274 pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
2276 self.peer_selector
2277 .read()
2278 .await
2279 .export_peer_metadata_snapshot()
2280 }
2281
2282 pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
2287 let snapshot = self
2288 .peer_selector
2289 .read()
2290 .await
2291 .export_peer_metadata_snapshot();
2292 let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
2293 StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
2294 })?;
2295 let snapshot_hash = hashtree_core::sha256(&bytes);
2296 let _ = self.local_store.put(snapshot_hash, bytes).await?;
2297
2298 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
2299 let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
2300 let _ = self.local_store.delete(&pointer_slot).await?;
2301 let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
2302
2303 Ok(snapshot_hash)
2304 }
2305
2306 pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
2308 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
2309 let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
2310 return Ok(false);
2311 };
2312 let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
2313 StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
2314 })?;
2315 let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
2316
2317 let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
2318 return Ok(false);
2319 };
2320 let snapshot: PeerMetadataSnapshot =
2321 serde_json::from_slice(&snapshot_bytes).map_err(|e| {
2322 StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
2323 })?;
2324 self.peer_selector
2325 .write()
2326 .await
2327 .import_peer_metadata_snapshot(&snapshot);
2328 Ok(true)
2329 }
2330
2331 pub async fn get_with_quote(
2336 &self,
2337 hash: &Hash,
2338 payment_sat: u64,
2339 quote_ttl: Duration,
2340 ) -> Result<Option<Vec<u8>>, StoreError> {
2341 if let Some(data) = self.local_store.get(hash).await? {
2342 return Ok(Some(data));
2343 }
2344 Ok(self
2345 .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
2346 .await)
2347 }
2348
2349 async fn request_from_peers_with_quote(
2350 &self,
2351 hash: &Hash,
2352 payment_sat: u64,
2353 quote_ttl: Duration,
2354 ) -> Option<Vec<u8>> {
2355 let ordered_peer_ids = self.ordered_connected_peers(None).await;
2356 if ordered_peer_ids.is_empty() {
2357 return None;
2358 }
2359
2360 if let Some(quote) = self
2361 .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
2362 .await
2363 {
2364 if let Some(data) = self
2365 .request_from_single_peer(hash, "e.peer_id, MAX_HTL, Some(quote.quote_id))
2366 .await
2367 {
2368 return Some(data);
2369 }
2370 }
2371
2372 self.request_from_mesh(hash).await
2373 }
2374
2375 async fn request_quote_from_peers(
2376 &self,
2377 hash: &Hash,
2378 payment_sat: u64,
2379 quote_ttl: Duration,
2380 ordered_peer_ids: &[String],
2381 ) -> Option<NegotiatedQuote> {
2382 if ordered_peer_ids.is_empty() {
2383 return None;
2384 }
2385 let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
2386 if ttl_ms == 0 {
2387 return None;
2388 }
2389 let requested_mint = self.requested_quote_mint().map(str::to_string);
2390
2391 let hash_key = hash_to_key(hash);
2392 let (tx, rx) = oneshot::channel();
2393 self.pending_quotes.write().await.insert(
2394 hash_key.clone(),
2395 PendingQuoteRequest {
2396 response_tx: tx,
2397 preferred_mint_url: requested_mint.clone(),
2398 offered_payment_sat: payment_sat,
2399 },
2400 );
2401
2402 let rx = Arc::new(Mutex::new(rx));
2403 let result = run_hedged_waves(
2404 ordered_peer_ids.len(),
2405 self.routing.dispatch,
2406 self.request_timeout,
2407 |range| {
2408 let wave_peer_ids = ordered_peer_ids[range].to_vec();
2409 let requested_mint = requested_mint.clone();
2410 let hash = *hash;
2411 async move {
2412 let mut sent = 0usize;
2413 for peer_id in wave_peer_ids {
2414 if self
2415 .send_quote_request_to_peer(
2416 &peer_id,
2417 &hash,
2418 payment_sat,
2419 ttl_ms,
2420 requested_mint.as_deref(),
2421 )
2422 .await
2423 {
2424 sent += 1;
2425 }
2426 }
2427 sent
2428 }
2429 },
2430 |wait| {
2431 let rx = rx.clone();
2432 async move {
2433 let mut rx = rx.lock().await;
2434 match tokio::time::timeout(wait, &mut *rx).await {
2435 Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
2436 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
2437 Err(_) => HedgedWaveAction::Continue,
2438 }
2439 }
2440 },
2441 )
2442 .await;
2443 let _ = self.pending_quotes.write().await.remove(&hash_key);
2444 result
2445 }
2446
2447 async fn request_from_single_peer(
2448 &self,
2449 hash: &Hash,
2450 peer_id: &str,
2451 request_htl: u8,
2452 quote_id: Option<u64>,
2453 ) -> Option<Vec<u8>> {
2454 let hash_key = hash_to_key(hash);
2455 let (tx, rx) = oneshot::channel();
2456 self.pending_requests.write().await.insert(
2457 hash_key.clone(),
2458 PendingRequest {
2459 response_tx: tx,
2460 started_at: Instant::now(),
2461 queried_peers: vec![peer_id.to_string()],
2462 },
2463 );
2464
2465 let mut rx = rx;
2466 if !self
2467 .send_request_to_peer(peer_id, hash, request_htl, quote_id)
2468 .await
2469 {
2470 let _ = self.pending_requests.write().await.remove(&hash_key);
2471 return None;
2472 }
2473 self.reserve_peer_request(peer_id).await;
2474
2475 if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
2476 if hashtree_core::sha256(&data) == *hash {
2477 let _ = self.local_store.put(*hash, data.clone()).await;
2478 return Some(data);
2479 }
2480 }
2481
2482 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2483 self.release_queried_peer_requests(&pending.queried_peers)
2484 .await;
2485 for peer_id in pending.queried_peers {
2486 self.peer_selector.write().await.record_timeout(&peer_id);
2487 }
2488 }
2489 let _ = self.take_forward_requesters(&hash_key).await;
2490 None
2491 }
2492
2493 async fn request_from_ordered_peers(
2494 &self,
2495 hash: &Hash,
2496 ordered_peer_ids: &[String],
2497 request_htl: u8,
2498 ) -> RouteFetchOutcome {
2499 let hash_key = hash_to_key(hash);
2500 let (tx, rx) = oneshot::channel();
2501 self.pending_requests.write().await.insert(
2502 hash_key.clone(),
2503 PendingRequest {
2504 response_tx: tx,
2505 started_at: Instant::now(),
2506 queried_peers: Vec::new(),
2507 },
2508 );
2509
2510 let rx = Arc::new(Mutex::new(rx));
2511 let result = run_hedged_waves(
2512 ordered_peer_ids.len(),
2513 self.routing.dispatch,
2514 self.request_timeout,
2515 |range| {
2516 let wave_peer_ids = ordered_peer_ids[range].to_vec();
2517 let hash = *hash;
2518 let hash_key = hash_key.clone();
2519 async move {
2520 let mut sent = 0usize;
2521 for peer_id in wave_peer_ids {
2522 if self
2523 .send_request_to_peer(&peer_id, &hash, request_htl, None)
2524 .await
2525 {
2526 sent += 1;
2527 self.reserve_peer_request(&peer_id).await;
2528 if let Some(pending) =
2529 self.pending_requests.write().await.get_mut(&hash_key)
2530 {
2531 pending.queried_peers.push(peer_id);
2532 }
2533 }
2534 }
2535 sent
2536 }
2537 },
2538 |wait| {
2539 let rx = rx.clone();
2540 async move {
2541 let mut rx = rx.lock().await;
2542 match tokio::time::timeout(wait, &mut *rx).await {
2543 Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == *hash => {
2544 HedgedWaveAction::Success(data)
2545 }
2546 Ok(Ok(Some(_))) => HedgedWaveAction::Continue,
2547 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
2548 Err(_) => HedgedWaveAction::Continue,
2549 }
2550 }
2551 },
2552 )
2553 .await;
2554
2555 let Some(data) = result else {
2556 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2557 self.release_queried_peer_requests(&pending.queried_peers)
2558 .await;
2559 for peer_id in pending.queried_peers {
2560 self.peer_selector.write().await.record_timeout(&peer_id);
2561 }
2562 }
2563 let _ = self.take_forward_requesters(&hash_key).await;
2564 return RouteFetchOutcome::Timeout;
2565 };
2566
2567 let _ = self.local_store.put(*hash, data.clone()).await;
2568 RouteFetchOutcome::Hit(data)
2569 }
2570
2571 async fn request_from_read_sources_inner(&self, hash: &Hash) -> RouteFetchOutcome {
2572 let ordered_sources = self.ordered_read_sources().await;
2573 if ordered_sources.is_empty() {
2574 return RouteFetchOutcome::Miss;
2575 }
2576
2577 let dispatch = normalize_dispatch_config(
2578 self.source_dispatch_for(ordered_sources.len()).await,
2579 ordered_sources.len(),
2580 );
2581 let wave_plan = build_hedged_wave_plan(ordered_sources.len(), dispatch);
2582 if wave_plan.is_empty() {
2583 return RouteFetchOutcome::Miss;
2584 }
2585
2586 let deadline = Instant::now() + self.request_timeout;
2587 let mut pending = FuturesUnordered::new();
2588 let mut pending_source_ids = HashSet::new();
2589 let mut saw_timeout = false;
2590 let mut next_source_idx = 0usize;
2591
2592 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
2593 let from = next_source_idx;
2594 let to = (next_source_idx + wave_size).min(ordered_sources.len());
2595 next_source_idx = to;
2596
2597 for source in &ordered_sources[from..to] {
2598 let source = Arc::clone(source);
2599 let source_id = source.id().to_string();
2600 self.record_read_source_request(&source_id).await;
2601 pending_source_ids.insert(source_id.clone());
2602 let hash = *hash;
2603 pending.push(tokio::spawn(async move {
2604 let started_at = Instant::now();
2605 let result = std::panic::AssertUnwindSafe(source.get(&hash))
2606 .catch_unwind()
2607 .await;
2608 match result {
2609 Ok(Some(data)) => SourceFetchOutcome::Hit {
2610 source_id,
2611 data,
2612 elapsed_ms: started_at.elapsed().as_millis().max(1) as u64,
2613 },
2614 Ok(None) => SourceFetchOutcome::Miss { source_id },
2615 Err(_) => SourceFetchOutcome::Failure { source_id },
2616 }
2617 }));
2618 }
2619
2620 let is_last_wave =
2621 wave_idx + 1 == wave_plan.len() || next_source_idx >= ordered_sources.len();
2622 let window_end = if is_last_wave {
2623 deadline
2624 } else {
2625 (Instant::now() + Duration::from_millis(dispatch.hedge_interval_ms)).min(deadline)
2626 };
2627
2628 while Instant::now() < window_end {
2629 let remaining = window_end.saturating_duration_since(Instant::now());
2630 let Some(result) = tokio::time::timeout(remaining, pending.next())
2631 .await
2632 .ok()
2633 .flatten()
2634 else {
2635 break;
2636 };
2637 let Ok(outcome) = result else {
2638 continue;
2639 };
2640 match outcome {
2641 SourceFetchOutcome::Hit {
2642 source_id,
2643 data,
2644 elapsed_ms,
2645 } => {
2646 pending_source_ids.remove(&source_id);
2647 self.record_read_source_success(&source_id, elapsed_ms)
2648 .await;
2649 return RouteFetchOutcome::Hit(data);
2650 }
2651 SourceFetchOutcome::Miss { source_id } => {
2652 pending_source_ids.remove(&source_id);
2653 self.record_read_source_miss(&source_id).await;
2654 }
2655 SourceFetchOutcome::Failure { source_id } => {
2656 pending_source_ids.remove(&source_id);
2657 self.record_read_source_failure(&source_id).await;
2658 }
2659 }
2660 }
2661
2662 if Instant::now() >= deadline {
2663 break;
2664 }
2665 }
2666
2667 for source_id in pending_source_ids {
2668 saw_timeout = true;
2669 self.record_read_source_timeout(&source_id).await;
2670 }
2671 if saw_timeout {
2672 RouteFetchOutcome::Timeout
2673 } else {
2674 RouteFetchOutcome::Miss
2675 }
2676 }
2677
2678 async fn request_from_read_sources(&self, hash: &Hash) -> RouteFetchOutcome {
2679 let hash_key = hash_to_key(hash);
2680 let existing_wait = {
2681 let mut inflight = self.inflight_source_fetches.lock().await;
2682 if let Some(existing) = inflight.get_mut(&hash_key) {
2683 let (tx, rx) = oneshot::channel();
2684 existing.waiters.push(tx);
2685 Some(rx)
2686 } else {
2687 inflight.insert(
2688 hash_key.clone(),
2689 InflightSourceFetch {
2690 waiters: Vec::new(),
2691 },
2692 );
2693 None
2694 }
2695 };
2696
2697 if let Some(wait) = existing_wait {
2698 return wait.await.unwrap_or(RouteFetchOutcome::Timeout);
2699 }
2700
2701 let result = self.request_from_read_sources_inner(hash).await;
2702 if let RouteFetchOutcome::Hit(hit) = &result {
2703 let _ = self.local_store.put(*hash, hit.clone()).await;
2704 }
2705 self.complete_inflight_source_fetch(&hash_key, result.clone())
2706 .await;
2707
2708 result
2709 }
2710
2711 async fn complete_inflight_source_fetch(&self, hash_key: &str, result: RouteFetchOutcome) {
2712 let waiters = self
2713 .inflight_source_fetches
2714 .lock()
2715 .await
2716 .remove(hash_key)
2717 .map(|inflight| inflight.waiters)
2718 .unwrap_or_default();
2719 for waiter in waiters {
2720 let _ = waiter.send(result.clone());
2721 }
2722 }
2723
2724 async fn cancel_pending_peer_route(&self, hash: &Hash) {
2725 let hash_key = hash_to_key(hash);
2726 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2727 self.release_queried_peer_requests(&pending.queried_peers)
2728 .await;
2729 }
2730 }
2731
2732 async fn cancel_losing_route(&self, hash: &Hash, route: &ReadRoute, winner_data: &[u8]) {
2733 match route {
2734 ReadRoute::Peers(_) => self.cancel_pending_peer_route(hash).await,
2735 ReadRoute::Sources => {
2736 let hash_key = hash_to_key(hash);
2737 self.complete_inflight_source_fetch(
2738 &hash_key,
2739 RouteFetchOutcome::Hit(winner_data.to_vec()),
2740 )
2741 .await;
2742 }
2743 }
2744 }
2745
2746 async fn ranked_read_routes(&self, context: &MeshReadContext) -> Vec<RankedReadRoute> {
2747 let mut routes = Vec::new();
2748 let ordered_peers = if should_forward_htl(context.request_htl) {
2749 self.ordered_connected_peers(context.exclude_peer_id.as_deref())
2750 .await
2751 } else {
2752 Vec::new()
2753 };
2754 if !ordered_peers.is_empty() {
2755 let best_peer_id = ordered_peers[0].clone();
2756 let selector = self.peer_selector.read().await;
2757 let best_peer = selector.get_stats(&best_peer_id).cloned();
2758 let now = Instant::now();
2759 let (score, has_history) = match best_peer.as_ref() {
2760 Some(stats) => (
2761 peer_endpoint_score(stats, now),
2762 peer_endpoint_has_history(stats),
2763 ),
2764 None => (0.0, false),
2765 };
2766 routes.push(RankedReadRoute {
2767 route: ReadRoute::Peers(ordered_peers),
2768 best_endpoint_id: format!("peer:{best_peer_id}"),
2769 score,
2770 has_history,
2771 });
2772 }
2773 let ordered_sources = self.ordered_read_sources().await;
2774 if let Some(best_source) = ordered_sources.first() {
2775 let stats = self.read_source_stats.read().await;
2776 let best_source_stats = stats.get(best_source.id()).cloned().unwrap_or_default();
2777 let now = Instant::now();
2778 routes.push(RankedReadRoute {
2779 route: ReadRoute::Sources,
2780 best_endpoint_id: format!("source:{}", best_source.id()),
2781 score: adaptive_source_score(&best_source_stats, now),
2782 has_history: source_has_history(&best_source_stats),
2783 });
2784 }
2785 if routes.len() <= 1 {
2786 return routes;
2787 }
2788
2789 routes.sort_by(|left, right| {
2790 right
2791 .score
2792 .partial_cmp(&left.score)
2793 .unwrap_or(std::cmp::Ordering::Equal)
2794 .then_with(|| ranked_route_kind(&left.route).cmp(&ranked_route_kind(&right.route)))
2795 .then_with(|| left.best_endpoint_id.cmp(&right.best_endpoint_id))
2796 .then_with(|| left.route.id().cmp(right.route.id()))
2797 });
2798 routes
2799 }
2800
2801 fn should_probe_multiple_routes(&self, routes: &[RankedReadRoute]) -> bool {
2802 if routes.len() <= 1 {
2803 return false;
2804 }
2805 if !routes[0].has_history || !routes[1].has_history {
2806 return false;
2807 }
2808 (routes[0].score - routes[1].score) < SOURCE_SCORE_TIE_DELTA
2809 }
2810
2811 async fn run_read_route(
2812 &self,
2813 hash: &Hash,
2814 route: &ReadRoute,
2815 context: &MeshReadContext,
2816 ) -> RouteFetchOutcome {
2817 match route {
2818 ReadRoute::Peers(peer_ids) => {
2819 self.request_from_ordered_peers(hash, peer_ids, context.request_htl)
2820 .await
2821 }
2822 ReadRoute::Sources => self.request_from_read_sources(hash).await,
2823 }
2824 }
2825
2826 async fn request_from_mesh_with_context(
2827 &self,
2828 hash: &Hash,
2829 context: &MeshReadContext,
2830 ) -> Option<Vec<u8>> {
2831 let routes = self.ranked_read_routes(context).await;
2832 match routes.as_slice() {
2833 [] => None,
2834 [ranked] => match self.run_read_route(hash, &ranked.route, context).await {
2835 RouteFetchOutcome::Hit(data) => Some(data),
2836 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
2837 },
2838 [first, second, ..] => {
2839 if self.should_probe_multiple_routes(&routes) {
2840 let first_fut = self.run_read_route(hash, &first.route, context);
2841 let second_fut = self.run_read_route(hash, &second.route, context);
2842 tokio::pin!(first_fut);
2843 tokio::pin!(second_fut);
2844 let mut first_done = false;
2845 let mut second_done = false;
2846 loop {
2847 tokio::select! {
2848 result = &mut first_fut, if !first_done => {
2849 first_done = true;
2850 if let RouteFetchOutcome::Hit(data) = result {
2851 if !second_done {
2852 self.cancel_losing_route(hash, &second.route, &data).await;
2853 }
2854 return Some(data);
2855 }
2856 }
2857 result = &mut second_fut, if !second_done => {
2858 second_done = true;
2859 if let RouteFetchOutcome::Hit(data) = result {
2860 if !first_done {
2861 self.cancel_losing_route(hash, &first.route, &data).await;
2862 }
2863 return Some(data);
2864 }
2865 }
2866 else => break,
2867 }
2868 if first_done && second_done {
2869 break;
2870 }
2871 }
2872 None
2873 } else {
2874 match self.run_read_route(hash, &first.route, context).await {
2875 RouteFetchOutcome::Hit(data) => return Some(data),
2876 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
2877 }
2878 for ranked in routes.iter().skip(1) {
2879 match self.run_read_route(hash, &ranked.route, context).await {
2880 RouteFetchOutcome::Hit(data) => return Some(data),
2881 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
2882 }
2883 }
2884 None
2885 }
2886 }
2887 }
2888 }
2889
2890 async fn request_from_mesh(&self, hash: &Hash) -> Option<Vec<u8>> {
2891 self.request_from_mesh_with_context(hash, &MeshReadContext::default())
2892 .await
2893 }
2894
2895 async fn begin_forward_request(&self, hash_key: &str, requester_id: &str) -> bool {
2896 let mut pending = self.pending_forward_requests.write().await;
2897 if let Some(existing) = pending.get_mut(hash_key) {
2898 existing.requester_ids.insert(requester_id.to_string());
2899 return false;
2900 }
2901
2902 let mut requester_ids = HashSet::new();
2903 requester_ids.insert(requester_id.to_string());
2904 pending.insert(
2905 hash_key.to_string(),
2906 PendingForwardRequest { requester_ids },
2907 );
2908 true
2909 }
2910
2911 async fn was_recent_forward_miss(&self, hash_key: &str) -> bool {
2912 self.recent_forward_misses.lock().await.contains(hash_key)
2913 }
2914
2915 async fn mark_recent_forward_miss(&self, hash_key: &str) {
2916 let _ = self
2917 .recent_forward_misses
2918 .lock()
2919 .await
2920 .insert_if_new(hash_key.to_string());
2921 }
2922
2923 async fn take_forward_requesters(&self, hash_key: &str) -> Vec<String> {
2924 self.pending_forward_requests
2925 .write()
2926 .await
2927 .remove(hash_key)
2928 .map(|pending| pending.requester_ids.into_iter().collect())
2929 .unwrap_or_default()
2930 }
2931
2932 async fn complete_pending_response(
2933 self: &Arc<Self>,
2934 from_peer: &str,
2935 hash: &Hash,
2936 hash_key: String,
2937 payload: Vec<u8>,
2938 ) {
2939 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2940 self.release_queried_peer_requests(&pending.queried_peers)
2941 .await;
2942 let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
2943 self.peer_selector.write().await.record_success(
2944 from_peer,
2945 rtt_ms,
2946 payload.len() as u64,
2947 );
2948 let forward_requesters = self.take_forward_requesters(&hash_key).await;
2949 let response_bytes = if forward_requesters.is_empty() {
2950 None
2951 } else {
2952 Some(encode_response(&create_response(hash, payload.clone())))
2953 };
2954 let _ = pending.response_tx.send(Some(payload));
2955 if let Some(response_bytes) = response_bytes {
2956 for requester_id in forward_requesters {
2957 Arc::clone(self)
2958 .enqueue_response_send(requester_id, response_bytes.clone(), Instant::now())
2959 .await;
2960 }
2961 }
2962 }
2963 }
2964
2965 async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
2966 if !res.a {
2967 return;
2968 }
2969
2970 let Some(quote_id) = res.q else {
2971 return;
2972 };
2973
2974 let hash_key = hash_to_key(&res.h);
2975 let (preferred_mint_url, offered_payment_sat) = {
2976 let pending_quotes = self.pending_quotes.read().await;
2977 let Some(pending) = pending_quotes.get(&hash_key) else {
2978 return;
2979 };
2980 (
2981 pending.preferred_mint_url.clone(),
2982 pending.offered_payment_sat,
2983 )
2984 };
2985 if !self
2986 .should_accept_quote_response(
2987 from_peer,
2988 preferred_mint_url.as_deref(),
2989 offered_payment_sat,
2990 &res,
2991 )
2992 .await
2993 {
2994 return;
2995 }
2996 let mut pending_quotes = self.pending_quotes.write().await;
2997 if let Some(pending) = pending_quotes.remove(&hash_key) {
2998 let _ = pending.response_tx.send(Some(NegotiatedQuote {
2999 peer_id: from_peer.to_string(),
3000 quote_id,
3001 mint_url: res.m,
3002 }));
3003 }
3004 }
3005
3006 async fn handle_response_message(
3007 self: &Arc<Self>,
3008 from_peer: &str,
3009 res: crate::protocol::DataResponse,
3010 ) {
3011 let hash_key = hash_to_key(&res.h);
3012 let hash = match crate::protocol::bytes_to_hash(&res.h) {
3013 Some(h) => h,
3014 None => return,
3015 };
3016
3017 if hashtree_core::sha256(&res.d) != hash {
3019 self.peer_selector.write().await.record_failure(from_peer);
3020 if self.debug {
3021 println!("[MeshStoreCore] Ignoring invalid response payload for {hash_key}");
3022 }
3023 return;
3024 }
3025
3026 self.record_useful_bytes_received_from_peer(from_peer, res.d.len() as u64)
3027 .await;
3028 self.complete_pending_response(from_peer, &hash, hash_key, res.d)
3029 .await;
3030 }
3031
3032 async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
3033 let hash = match crate::protocol::bytes_to_hash(&req.h) {
3034 Some(h) => h,
3035 None => return,
3036 };
3037 let hash_key = hash_to_key(&hash);
3038
3039 {
3040 let selector = self.peer_selector.read().await;
3041 if self.should_refuse_requests_from_peer(&selector, from_peer) {
3042 if self.debug {
3043 println!(
3044 "[MeshStoreCore] Refusing quote request from delinquent peer {}",
3045 from_peer
3046 );
3047 }
3048 return;
3049 }
3050 }
3051
3052 let chosen_mint = self.choose_quote_mint(req.m.as_deref());
3053 let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
3054 && !self.should_drop_response(&hash)
3055 && !self.should_corrupt_response(&hash);
3056
3057 let res = if can_serve {
3058 let quote_id = self
3059 .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
3060 .await;
3061 create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
3062 } else {
3063 create_quote_response_unavailable(&hash)
3064 };
3065 let response_bytes = encode_quote_response(&res);
3066 if let Some(channel) = self.signaling.get_channel(from_peer).await {
3067 if channel.send(response_bytes.clone()).await.is_ok() {
3068 self.record_peer_wire_sent(from_peer, response_bytes.len() as u64)
3069 .await;
3070 }
3071 }
3072 }
3073
3074 async fn handle_request_message(
3075 self: &Arc<Self>,
3076 from_peer: &str,
3077 req: crate::protocol::DataRequest,
3078 ) {
3079 let hash = match crate::protocol::bytes_to_hash(&req.h) {
3080 Some(h) => h,
3081 None => return,
3082 };
3083 let hash_key = hash_to_key(&hash);
3084
3085 if let Some(quote_id) = req.q {
3086 if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
3087 if self.debug {
3088 println!(
3089 "[MeshStoreCore] Refusing request with invalid or expired quote {} from {}",
3090 quote_id, from_peer
3091 );
3092 }
3093 return;
3094 }
3095 }
3096
3097 let allow_peer_forwarding = {
3098 let selector = self.peer_selector.read().await;
3099 !self.should_refuse_requests_from_peer(&selector, from_peer)
3100 };
3101
3102 if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
3104 if self.should_drop_response(&hash) {
3105 if self.debug {
3106 println!(
3107 "[MeshStoreCore] Dropping response for {} due to actor profile",
3108 hash_to_key(&hash)
3109 );
3110 }
3111 return;
3112 }
3113
3114 let response_delay = self.response_send_delay(&hash, data.len());
3115 if self.should_corrupt_response(&hash) {
3116 if data.is_empty() {
3117 data.push(0x80);
3118 } else {
3119 data[0] ^= 0x80;
3120 }
3121 }
3122
3123 let res = create_response(&hash, data);
3125 let response_bytes = encode_response(&res);
3126 let ready_at = Instant::now() + response_delay;
3127 Arc::clone(self)
3128 .enqueue_response_send(from_peer.to_string(), response_bytes, ready_at)
3129 .await;
3130 return;
3131 }
3132
3133 if self.pending_requests.read().await.contains_key(&hash_key) {
3134 let _ = self.begin_forward_request(&hash_key, from_peer).await;
3135 return;
3136 }
3137
3138 if self.was_recent_forward_miss(&hash_key).await {
3139 if self.debug {
3140 println!(
3141 "[MeshStoreCore] Suppressing recently missed forwarded request for {}",
3142 hash_key
3143 );
3144 }
3145 return;
3146 }
3147
3148 if !self.begin_forward_request(&hash_key, from_peer).await {
3149 return;
3150 }
3151
3152 let from_peer = from_peer.to_string();
3153 let this = Arc::clone(self);
3154 let request_htl = req.htl;
3155 tokio::spawn(async move {
3156 let result = if allow_peer_forwarding {
3157 let context = MeshReadContext {
3158 exclude_peer_id: Some(from_peer.clone()),
3159 request_htl,
3160 };
3161 this.request_from_mesh_with_context(&hash, &context).await
3162 } else {
3163 if this.debug {
3164 println!(
3165 "[MeshStoreCore] Serving request from delinquent peer {} via read sources only",
3166 from_peer
3167 );
3168 }
3169 match this.request_from_read_sources(&hash).await {
3170 RouteFetchOutcome::Hit(data) => Some(data),
3171 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
3172 }
3173 };
3174 let requester_ids = this.take_forward_requesters(&hash_key).await;
3175 if let Some(data) = result {
3176 let ready_at = Instant::now() + this.response_send_delay(&hash, data.len());
3177 let res = create_response(&hash, data);
3178 let response_bytes = encode_response(&res);
3179 for requester_id in requester_ids {
3180 Arc::clone(&this)
3181 .enqueue_response_send(requester_id, response_bytes.clone(), ready_at)
3182 .await;
3183 }
3184 } else {
3185 this.mark_recent_forward_miss(&hash_key).await;
3186 }
3187 });
3188 }
3189
3190 async fn handle_pubsub_interest_message(
3191 self: &Arc<Self>,
3192 from_peer: &str,
3193 mut interest: PubsubInterest,
3194 ) {
3195 if !self.apply_pubsub_interest_route(from_peer, &interest).await {
3196 return;
3197 }
3198
3199 if interest.htl <= 1 {
3200 return;
3201 }
3202 interest.htl = interest.htl.saturating_sub(1);
3203 let _ = self
3204 .send_pubsub_interest_to_peers(&interest, Some(from_peer))
3205 .await;
3206 }
3207
3208 async fn handle_pubsub_frame_message(
3209 self: &Arc<Self>,
3210 from_peer: &str,
3211 mut frame: PubsubFrame,
3212 wire_bytes: usize,
3213 ) {
3214 if frame.stream_id.is_empty() || frame.origin_peer_id.is_empty() {
3215 return;
3216 }
3217 if frame.origin_peer_id == self.signaling.peer_id() {
3218 return;
3219 }
3220
3221 let frame_key = Self::pubsub_frame_key(&frame);
3222 if !self
3223 .pubsub_seen_frames
3224 .lock()
3225 .await
3226 .insert_if_new(frame_key.clone())
3227 {
3228 return;
3229 }
3230 self.cache_pubsub_frame(frame_key.clone(), frame.clone())
3231 .await;
3232
3233 let local_interested = self
3234 .pubsub_local_interests
3235 .read()
3236 .await
3237 .contains(&frame.stream_id);
3238 let mut downstream_peers = if frame.htl > 1 {
3239 match self.routing.pubsub_delivery_mode {
3240 PubsubDeliveryMode::InterestPush => {
3241 let mut peers = self
3242 .interested_pubsub_peers(&frame.stream_id, Some(from_peer))
3243 .await;
3244 peers.extend(
3245 self.take_pubsub_want_peers(&frame_key, Some(from_peer))
3246 .await,
3247 );
3248 peers.sort();
3249 peers.dedup();
3250 peers
3251 }
3252 PubsubDeliveryMode::HtlInvWant => {
3253 self.take_pubsub_want_peers(&frame_key, Some(from_peer))
3254 .await
3255 }
3256 }
3257 } else {
3258 Vec::new()
3259 };
3260 downstream_peers.retain(|peer_id| peer_id != from_peer);
3261
3262 if local_interested || !downstream_peers.is_empty() {
3263 self.record_useful_bytes_received_from_peer(from_peer, wire_bytes as u64)
3264 .await;
3265 }
3266
3267 if local_interested {
3268 self.enqueue_pubsub_event(PubsubEvent {
3269 stream_id: frame.stream_id.clone(),
3270 seq: frame.seq,
3271 origin_peer_id: frame.origin_peer_id.clone(),
3272 from_peer_id: from_peer.to_string(),
3273 payload: frame.payload.clone(),
3274 })
3275 .await;
3276 }
3277
3278 if downstream_peers.is_empty() {
3279 return;
3280 }
3281
3282 frame.htl = frame.htl.saturating_sub(1);
3283 let _ = self
3284 .send_pubsub_frame_to_peers(&frame, &downstream_peers)
3285 .await;
3286 }
3287
3288 async fn handle_pubsub_inventory_message(
3289 self: &Arc<Self>,
3290 from_peer: &str,
3291 inv: PubsubInventory,
3292 wire_bytes: usize,
3293 ) {
3294 if inv.stream_id.is_empty() || inv.origin_peer_id.is_empty() {
3295 return;
3296 }
3297 if inv.origin_peer_id == self.signaling.peer_id() {
3298 return;
3299 }
3300
3301 let key = Self::pubsub_key(&inv.origin_peer_id, &inv.stream_id, inv.seq);
3302 if !self
3303 .pubsub_seen_inventories
3304 .lock()
3305 .await
3306 .insert_if_new(key.clone())
3307 {
3308 return;
3309 }
3310 {
3311 let mut routes = self.pubsub_inventory_routes.write().await;
3312 routes
3313 .entry(key.clone())
3314 .or_insert_with(|| from_peer.to_string());
3315 }
3316
3317 let local_interested = self
3318 .pubsub_local_interests
3319 .read()
3320 .await
3321 .contains(&inv.stream_id);
3322 let downstream_peers = self
3323 .interested_pubsub_peers(&inv.stream_id, Some(from_peer))
3324 .await;
3325 if local_interested || !downstream_peers.is_empty() {
3326 self.record_useful_bytes_received_from_peer(from_peer, wire_bytes as u64)
3327 .await;
3328 let want =
3329 create_pubsub_want(inv.stream_id.clone(), inv.seq, inv.origin_peer_id.clone());
3330 let _ = self.send_pubsub_want_upstream(&key, &want, None).await;
3331 }
3332
3333 if !should_forward_htl(inv.htl) {
3334 return;
3335 }
3336 let _ = self.flood_pubsub_inventory(&inv, Some(from_peer)).await;
3337 }
3338
3339 async fn handle_pubsub_want_message(
3340 self: &Arc<Self>,
3341 from_peer: &str,
3342 want: PubsubWant,
3343 wire_bytes: usize,
3344 ) {
3345 if want.stream_id.is_empty() || want.origin_peer_id.is_empty() {
3346 return;
3347 }
3348 if want.origin_peer_id == from_peer {
3349 return;
3350 }
3351
3352 let key = Self::pubsub_key(&want.origin_peer_id, &want.stream_id, want.seq);
3353 let want_key = format!("{from_peer}:{key}");
3354 if !self.pubsub_seen_wants.lock().await.insert_if_new(want_key) {
3355 return;
3356 }
3357
3358 if let Some(frame) = self.cached_pubsub_frame(&key).await {
3359 self.record_useful_bytes_received_from_peer(from_peer, wire_bytes as u64)
3360 .await;
3361 let peers = vec![from_peer.to_string()];
3362 let _ = self.send_pubsub_frame_to_peers(&frame, &peers).await;
3363 return;
3364 }
3365
3366 let has_upstream_route = self.pubsub_inventory_routes.read().await.contains_key(&key);
3367 if !has_upstream_route {
3368 return;
3369 }
3370
3371 if self.remember_pubsub_want_peer(key.clone(), from_peer).await {
3372 self.record_useful_bytes_received_from_peer(from_peer, wire_bytes as u64)
3373 .await;
3374 }
3375 let _ = self
3376 .send_pubsub_want_upstream(&key, &want, Some(from_peer))
3377 .await;
3378 }
3379
3380 pub async fn handle_data_message(self: &Arc<Self>, from_peer: &str, data: &[u8]) {
3382 self.record_peer_wire_received(from_peer, data.len() as u64)
3383 .await;
3384 let parsed = match parse_message(data) {
3385 Some(m) => m,
3386 None => return,
3387 };
3388
3389 match parsed {
3390 DataMessage::Request(req) => {
3391 self.handle_request_message(from_peer, req).await;
3392 }
3393 DataMessage::Response(res) => {
3394 self.handle_response_message(from_peer, res).await;
3395 }
3396 DataMessage::QuoteRequest(req) => {
3397 self.handle_quote_request_message(from_peer, req).await;
3398 }
3399 DataMessage::QuoteResponse(res) => {
3400 self.handle_quote_response_message(from_peer, res).await;
3401 }
3402 DataMessage::PubsubInterest(interest) => {
3403 self.handle_pubsub_interest_message(from_peer, interest)
3404 .await;
3405 }
3406 DataMessage::PubsubFrame(frame) => {
3407 self.handle_pubsub_frame_message(from_peer, frame, data.len())
3408 .await;
3409 }
3410 DataMessage::PubsubInventory(inv) => {
3411 self.handle_pubsub_inventory_message(from_peer, inv, data.len())
3412 .await;
3413 }
3414 DataMessage::PubsubWant(want) => {
3415 self.handle_pubsub_want_message(from_peer, want, data.len())
3416 .await;
3417 }
3418 DataMessage::Payment(_)
3419 | DataMessage::PaymentAck(_)
3420 | DataMessage::Chunk(_)
3421 | DataMessage::PeerHints(_) => {}
3422 }
3423 }
3424}
3425
3426#[async_trait]
3427impl<S, R, F> Store for MeshStoreCore<S, R, F>
3428where
3429 S: Store + Send + Sync + 'static,
3430 R: SignalingTransport + Send + Sync + 'static,
3431 F: PeerLinkFactory + Send + Sync + 'static,
3432{
3433 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
3434 self.local_store.put(hash, data).await
3435 }
3436
3437 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
3438 if let Some(data) = self.local_store.get(hash).await? {
3440 return Ok(Some(data));
3441 }
3442
3443 Ok(self.request_from_mesh(hash).await)
3445 }
3446
3447 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
3448 self.local_store.has(hash).await
3449 }
3450
3451 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
3452 self.local_store.delete(hash).await
3453 }
3454}
3455
3456#[cfg(test)]
3457mod tests;
3458
3459pub type SimMeshStore<S> =
3461 MeshStoreCore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;