1use async_trait::async_trait;
8use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
9use std::collections::hash_map::DefaultHasher;
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::future::Future;
12use std::hash::{Hash as _, Hasher};
13use std::ops::Range;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{oneshot, Mutex, RwLock};
18use tokio::time::Instant;
19
20use hashtree_core::{Hash, Store, StoreError};
21
22use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
23use crate::protocol::{
24 create_pubsub_frame, create_pubsub_interest, create_pubsub_inventory, create_pubsub_want,
25 create_quote_request, create_quote_response_available, create_quote_response_unavailable,
26 create_request, create_request_with_quote, create_response, encode_pubsub_frame,
27 encode_pubsub_interest, encode_pubsub_inventory, encode_pubsub_want, encode_quote_request,
28 encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
29 DataMessage, DataQuoteRequest, DataQuoteResponse, PubsubFrame, PubsubInterest, PubsubInventory,
30 PubsubWant,
31};
32use crate::pubsub_strategy::{
33 reciprocal_virtual_finish, select_reciprocal_outbound_job, OutboundJobCandidate,
34 PeerTrafficSnapshot, PubsubCandidate, PubsubSchedulerConfig,
35};
36use crate::signaling::MeshRouter;
37use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
38use crate::types::{
39 should_forward_htl, PeerHTLConfig, SignalingMessage, TimedSeenSet, MAX_HTL, MESH_EVENT_POLICY,
40};
41
42const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
45const RECENT_FORWARD_MISS_CAPACITY: usize = 4096;
46const MIN_RECENT_FORWARD_MISS_TTL_MS: u64 = 250;
47const PUBSUB_SEEN_CAPACITY: usize = 16_384;
48const PUBSUB_INBOX_CAPACITY: usize = 4_096;
49const PUBSUB_FRAME_CACHE_CAPACITY: usize = 4_096;
50const PUBSUB_SEEN_TTL: Duration = Duration::from_secs(120);
51
52struct PendingRequest {
54 response_tx: oneshot::Sender<Option<Vec<u8>>>,
55 started_at: Instant,
56 queried_peers: Vec<String>,
57}
58
59struct PendingQuoteRequest {
60 response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
61 preferred_mint_url: Option<String>,
62 offered_payment_sat: u64,
63}
64
65struct PendingForwardRequest {
66 requester_ids: HashSet<String>,
67}
68
69type PeerWireStats = PeerTrafficSnapshot;
70
71struct PendingResponseSend {
72 job_id: u64,
73 peer_id: String,
74 bytes: Vec<u8>,
75 ready_at: Instant,
76 queue_sequence: u64,
77}
78
79#[async_trait]
80pub trait MeshReadSource: Send + Sync {
81 fn id(&self) -> &str;
82
83 fn is_available(&self) -> bool {
84 true
85 }
86
87 async fn get(&self, hash: &Hash) -> Option<Vec<u8>>;
88}
89
90#[derive(Debug, Clone)]
91struct NegotiatedQuote {
92 peer_id: String,
93 quote_id: u64,
94 #[allow(dead_code)]
95 mint_url: Option<String>,
96}
97
98struct IssuedQuote {
99 expires_at: Instant,
100 #[allow(dead_code)]
101 payment_sat: u64,
102 #[allow(dead_code)]
103 mint_url: Option<String>,
104}
105
106#[derive(Debug, Clone, Default)]
107struct AdaptiveSourceStats {
108 requests: u64,
109 successes: u64,
110 misses: u64,
111 failures: u64,
112 timeouts: u64,
113 srtt_ms: f64,
114 rttvar_ms: f64,
115 backoff_level: u32,
116 backed_off_until: Option<Instant>,
117 last_success_at: Option<Instant>,
118 last_failure_at: Option<Instant>,
119}
120
121#[derive(Debug, Clone)]
122enum RouteFetchOutcome {
123 Hit(Vec<u8>),
124 Miss,
125 Timeout,
126}
127
128struct InflightSourceFetch {
129 waiters: Vec<oneshot::Sender<RouteFetchOutcome>>,
130}
131
132enum SourceFetchOutcome {
133 Hit {
134 source_id: String,
135 data: Vec<u8>,
136 elapsed_ms: u64,
137 },
138 Miss {
139 source_id: String,
140 },
141 Failure {
142 source_id: String,
143 },
144}
145
146const INITIAL_SOURCE_BACKOFF_MS: u64 = 250;
147const MAX_SOURCE_BACKOFF_MS: u64 = 10_000;
148const SOURCE_SCORE_TIE_DELTA: f64 = 0.15;
149const RECENT_SOURCE_SUCCESS_WINDOW: Duration = Duration::from_secs(60);
150const ACTIVE_PEER_REQUEST_RANK_PENALTY: usize = 3;
151
152fn source_reliability_score(stats: &AdaptiveSourceStats) -> f64 {
153 (stats.successes as f64 + 1.0) / (stats.requests as f64 + 2.0)
154}
155
156fn source_latency_score(stats: &AdaptiveSourceStats) -> f64 {
157 if stats.srtt_ms <= 0.0 {
158 return 0.5;
159 }
160 (500.0 / (stats.srtt_ms + 50.0)).min(1.0)
161}
162
163fn source_has_history(stats: &AdaptiveSourceStats) -> bool {
164 stats.requests > 0
165 || stats.successes > 0
166 || stats.misses > 0
167 || stats.failures > 0
168 || stats.timeouts > 0
169}
170
171fn adaptive_source_score(stats: &AdaptiveSourceStats, now: Instant) -> f64 {
172 if let Some(backed_off_until) = stats.backed_off_until {
173 if backed_off_until > now {
174 return f64::NEG_INFINITY;
175 }
176 }
177
178 let miss_penalty = if stats.requests > 0 {
179 (stats.misses as f64 / stats.requests as f64) * 0.15
180 } else {
181 0.0
182 };
183 let failure_penalty = if stats.requests > 0 {
184 ((stats.failures + stats.timeouts) as f64 / stats.requests as f64) * 0.3
185 } else {
186 0.0
187 };
188 let recency_bonus = if stats
189 .last_success_at
190 .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
191 {
192 0.1
193 } else {
194 0.0
195 };
196
197 0.6 * source_reliability_score(stats) + 0.3 * source_latency_score(stats) + recency_bonus
198 - miss_penalty
199 - failure_penalty
200}
201
202fn peer_endpoint_has_history(stats: &crate::peer_selector::PeerStats) -> bool {
203 stats.requests_sent > 0 || stats.successes > 0 || stats.failures > 0 || stats.timeouts > 0
204}
205
206fn peer_endpoint_score(stats: &crate::peer_selector::PeerStats, now: Instant) -> f64 {
207 if stats.backed_off_until.is_some_and(|until| until > now) {
208 return f64::NEG_INFINITY;
209 }
210
211 let miss_penalty = 0.0;
212 let failure_penalty = if stats.requests_sent > 0 {
213 ((stats.failures + stats.timeouts) as f64 / stats.requests_sent as f64) * 0.3
214 } else {
215 0.0
216 };
217 let recency_bonus = if stats
218 .last_success
219 .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
220 {
221 0.1
222 } else {
223 0.0
224 };
225
226 0.6 * stats.success_rate()
227 + 0.3
228 * source_latency_score(&AdaptiveSourceStats {
229 srtt_ms: stats.srtt_ms,
230 ..AdaptiveSourceStats::default()
231 })
232 + recency_bonus
233 - miss_penalty
234 - failure_penalty
235}
236
237#[derive(Clone)]
238enum ReadRoute {
239 Peers(Vec<String>),
240 Sources,
241}
242
243impl ReadRoute {
244 fn id(&self) -> &'static str {
245 match self {
246 Self::Peers(_) => "peers",
247 Self::Sources => "sources",
248 }
249 }
250}
251
252struct RankedReadRoute {
253 route: ReadRoute,
254 best_endpoint_id: String,
255 score: f64,
256 has_history: bool,
257}
258
259fn ranked_route_kind(route: &ReadRoute) -> u8 {
260 match route {
261 ReadRoute::Sources => 0,
262 ReadRoute::Peers(_) => 1,
263 }
264}
265
266#[derive(Debug, Clone)]
267struct MeshReadContext {
268 exclude_peer_id: Option<String>,
269 request_htl: u8,
270}
271
272impl Default for MeshReadContext {
273 fn default() -> Self {
274 Self {
275 exclude_peer_id: None,
276 request_htl: MAX_HTL,
277 }
278 }
279}
280
281#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
283pub struct DataPumpStats {
284 pub processed: usize,
285 pub request_messages: usize,
286 pub response_messages: usize,
287 pub quote_request_messages: u64,
288 pub quote_response_messages: u64,
289 pub pubsub_interest_messages: u64,
290 pub pubsub_frame_messages: u64,
291 pub pubsub_inventory_messages: u64,
292 pub pubsub_want_messages: u64,
293 pub processed_bytes: u64,
294}
295
296#[derive(Debug, Clone, PartialEq, Eq)]
298pub struct PubsubEvent {
299 pub stream_id: String,
300 pub seq: u64,
301 pub origin_peer_id: String,
302 pub from_peer_id: String,
303 pub payload: Vec<u8>,
304}
305
306#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
308pub struct PubsubPublishStats {
309 pub selected_peers: usize,
310 pub sent_peers: usize,
311 pub sent_bytes: u64,
312 pub deferred_peers: usize,
313}
314
315#[derive(Debug, Clone, Copy, PartialEq, Eq)]
317pub enum PubsubDeliveryMode {
318 InterestPush,
320 HtlInvWant,
322}
323
324impl Default for PubsubDeliveryMode {
325 fn default() -> Self {
326 Self::HtlInvWant
327 }
328}
329
330#[derive(Debug, Clone, Copy)]
336pub struct RequestDispatchConfig {
337 pub initial_fanout: usize,
339 pub hedge_fanout: usize,
341 pub max_fanout: usize,
343 pub hedge_interval_ms: u64,
345}
346
347impl Default for RequestDispatchConfig {
348 fn default() -> Self {
349 Self {
350 initial_fanout: usize::MAX,
351 hedge_fanout: usize::MAX,
352 max_fanout: usize::MAX,
353 hedge_interval_ms: 0,
354 }
355 }
356}
357
358pub fn normalize_dispatch_config(
360 dispatch: RequestDispatchConfig,
361 available_peers: usize,
362) -> RequestDispatchConfig {
363 let mut cfg = dispatch;
364 let cap = if cfg.max_fanout == 0 {
365 available_peers
366 } else {
367 cfg.max_fanout.min(available_peers)
368 };
369 cfg.max_fanout = cap;
370 cfg.initial_fanout = if cfg.initial_fanout == 0 {
371 1
372 } else {
373 cfg.initial_fanout.min(cap.max(1))
374 };
375 cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
376 1
377 } else {
378 cfg.hedge_fanout.min(cap.max(1))
379 };
380 cfg
381}
382
383pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
385 if peer_count == 0 {
386 return Vec::new();
387 }
388 let cap = dispatch.max_fanout.min(peer_count);
389 if cap == 0 {
390 return Vec::new();
391 }
392
393 let mut plan = Vec::new();
394 let mut sent = 0usize;
395 let first = dispatch.initial_fanout.min(cap).max(1);
396 plan.push(first);
397 sent += first;
398
399 while sent < cap {
400 let next = dispatch.hedge_fanout.min(cap - sent).max(1);
401 plan.push(next);
402 sent += next;
403 }
404 plan
405}
406
407#[derive(Debug)]
409pub enum HedgedWaveAction<T> {
410 Continue,
411 Success(T),
412 Abort,
413}
414
415pub async fn run_hedged_waves<T, SendWave, SendWaveFut, WaitWave, WaitWaveFut>(
420 peer_count: usize,
421 dispatch: RequestDispatchConfig,
422 request_timeout: Duration,
423 mut send_wave: SendWave,
424 mut wait_wave: WaitWave,
425) -> Option<T>
426where
427 SendWave: FnMut(Range<usize>) -> SendWaveFut,
428 SendWaveFut: Future<Output = usize>,
429 WaitWave: FnMut(Duration) -> WaitWaveFut,
430 WaitWaveFut: Future<Output = HedgedWaveAction<T>>,
431{
432 let dispatch = normalize_dispatch_config(dispatch, peer_count);
433 let wave_plan = build_hedged_wave_plan(peer_count, dispatch);
434 if wave_plan.is_empty() {
435 return None;
436 }
437
438 let deadline = Instant::now() + request_timeout;
439 let mut sent_total = 0usize;
440 let mut next_peer_idx = 0usize;
441
442 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
443 let from = next_peer_idx;
444 let to = (next_peer_idx + wave_size).min(peer_count);
445 next_peer_idx = to;
446
447 if from == to {
448 continue;
449 }
450
451 sent_total += send_wave(from..to).await;
452 if sent_total == 0 {
453 if next_peer_idx >= peer_count {
454 break;
455 }
456 continue;
457 }
458
459 let now = Instant::now();
460 if now >= deadline {
461 break;
462 }
463 let remaining = deadline.saturating_duration_since(now);
464 let is_last_wave = wave_idx + 1 == wave_plan.len() || next_peer_idx >= peer_count;
465 let wait = if is_last_wave {
466 remaining
467 } else if dispatch.hedge_interval_ms == 0 {
468 Duration::ZERO
469 } else {
470 Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
471 };
472
473 if wait.is_zero() {
474 continue;
475 }
476
477 match wait_wave(wait).await {
478 HedgedWaveAction::Continue => {}
479 HedgedWaveAction::Success(value) => return Some(value),
480 HedgedWaveAction::Abort => break,
481 }
482 }
483
484 None
485}
486
487pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
489 let mut selector = selector.write().await;
490 let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
491 let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
492 for peer_id in known {
493 if !current.contains(peer_id.as_str()) {
494 selector.remove_peer(&peer_id);
495 }
496 }
497 for peer_id in current_peer_ids {
498 selector.add_peer(peer_id.clone());
499 }
500}
501
502#[derive(Debug, Clone, Copy)]
506pub struct ResponseBehaviorConfig {
507 pub drop_response_prob: f64,
509 pub corrupt_response_prob: f64,
511 pub extra_delay_ms: u64,
513 pub first_byte_delay_ms: u64,
515 pub bytes_per_second: u64,
517 pub stall_response_prob: f64,
519 pub stall_delay_ms: u64,
521}
522
523impl Default for ResponseBehaviorConfig {
524 fn default() -> Self {
525 Self {
526 drop_response_prob: 0.0,
527 corrupt_response_prob: 0.0,
528 extra_delay_ms: 0,
529 first_byte_delay_ms: 0,
530 bytes_per_second: 0,
531 stall_response_prob: 0.0,
532 stall_delay_ms: 0,
533 }
534 }
535}
536
537impl ResponseBehaviorConfig {
538 fn normalized(self) -> Self {
539 Self {
540 drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
541 corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
542 extra_delay_ms: self.extra_delay_ms,
543 first_byte_delay_ms: self.first_byte_delay_ms,
544 bytes_per_second: self.bytes_per_second,
545 stall_response_prob: self.stall_response_prob.clamp(0.0, 1.0),
546 stall_delay_ms: self.stall_delay_ms,
547 }
548 }
549}
550
551#[derive(Debug, Clone)]
553pub struct MeshRoutingConfig {
554 pub selection_strategy: SelectionStrategy,
555 pub fairness_enabled: bool,
556 pub cashu_payment_weight: f64,
558 pub cashu_payment_default_block_threshold: u64,
561 pub cashu_accepted_mints: Vec<String>,
563 pub cashu_default_mint: Option<String>,
565 pub cashu_peer_suggested_mint_base_cap_sat: u64,
567 pub cashu_peer_suggested_mint_success_step_sat: u64,
569 pub cashu_peer_suggested_mint_receipt_step_sat: u64,
571 pub cashu_peer_suggested_mint_max_cap_sat: u64,
573 pub dispatch: RequestDispatchConfig,
574 pub response_behavior: ResponseBehaviorConfig,
575 pub pubsub_scheduler: PubsubSchedulerConfig,
576 pub pubsub_delivery_mode: PubsubDeliveryMode,
577}
578
579impl Default for MeshRoutingConfig {
580 fn default() -> Self {
581 Self {
582 selection_strategy: SelectionStrategy::Weighted,
583 fairness_enabled: true,
584 cashu_payment_weight: 0.0,
585 cashu_payment_default_block_threshold: 0,
586 cashu_accepted_mints: Vec::new(),
587 cashu_default_mint: None,
588 cashu_peer_suggested_mint_base_cap_sat: 0,
589 cashu_peer_suggested_mint_success_step_sat: 0,
590 cashu_peer_suggested_mint_receipt_step_sat: 0,
591 cashu_peer_suggested_mint_max_cap_sat: 0,
592 dispatch: RequestDispatchConfig::default(),
593 response_behavior: ResponseBehaviorConfig::default(),
594 pubsub_scheduler: PubsubSchedulerConfig::default(),
595 pubsub_delivery_mode: PubsubDeliveryMode::HtlInvWant,
596 }
597 }
598}
599
600pub struct MeshStoreCore<S, R, F>
607where
608 S: Store + Send + Sync + 'static,
609 R: SignalingTransport + Send + Sync + 'static,
610 F: PeerLinkFactory + Send + Sync + 'static,
611{
612 local_store: Arc<S>,
614 signaling: Arc<MeshRouter<R, F>>,
616 htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
618 pending_requests: RwLock<HashMap<String, PendingRequest>>,
620 pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
622 pending_forward_requests: RwLock<HashMap<String, PendingForwardRequest>>,
624 recent_forward_misses: Mutex<TimedSeenSet>,
626 issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
628 next_quote_id: RwLock<u64>,
630 read_sources: RwLock<HashMap<String, Arc<dyn MeshReadSource>>>,
632 read_source_stats: RwLock<HashMap<String, AdaptiveSourceStats>>,
634 inflight_source_fetches: Mutex<HashMap<String, InflightSourceFetch>>,
636 peer_selector: RwLock<PeerSelector>,
638 peer_active_requests: RwLock<HashMap<String, usize>>,
640 peer_wire_stats: RwLock<HashMap<String, PeerWireStats>>,
642 pubsub_local_interests: RwLock<HashSet<String>>,
644 pubsub_local_interest_versions: RwLock<HashMap<String, u64>>,
646 pubsub_peer_interests: RwLock<HashMap<String, HashSet<String>>>,
648 pubsub_interest_routes: RwLock<HashMap<(String, String), String>>,
650 pubsub_interest_versions: RwLock<HashMap<(String, String), u64>>,
652 pubsub_seen_interests: Mutex<TimedSeenSet>,
654 pubsub_seen_frames: Mutex<TimedSeenSet>,
656 pubsub_seen_inventories: Mutex<TimedSeenSet>,
658 pubsub_seen_wants: Mutex<TimedSeenSet>,
660 pubsub_inventory_routes: RwLock<HashMap<String, String>>,
662 pubsub_want_routes: RwLock<HashMap<String, HashSet<String>>>,
664 pubsub_upstream_wants: Mutex<TimedSeenSet>,
666 pubsub_frame_cache: Mutex<VecDeque<(String, PubsubFrame)>>,
668 pubsub_inbox: Mutex<VecDeque<PubsubEvent>>,
670 pubsub_deferred_counts: RwLock<HashMap<(String, String), u64>>,
672 next_pubsub_interest_seq: AtomicU64,
674 pending_response_sends: Mutex<Vec<PendingResponseSend>>,
676 response_scheduler_running: AtomicBool,
678 next_response_job_id: AtomicU64,
680 routing: MeshRoutingConfig,
682 request_timeout: Duration,
684 debug: bool,
686 running: RwLock<bool>,
688}
689
690impl<S, R, F> MeshStoreCore<S, R, F>
691where
692 S: Store + Send + Sync + 'static,
693 R: SignalingTransport + Send + Sync + 'static,
694 F: PeerLinkFactory + Send + Sync + 'static,
695{
696 pub fn new(
698 local_store: Arc<S>,
699 signaling: Arc<MeshRouter<R, F>>,
700 request_timeout: Duration,
701 debug: bool,
702 ) -> Self {
703 Self::new_with_routing(
704 local_store,
705 signaling,
706 request_timeout,
707 debug,
708 Default::default(),
709 )
710 }
711
712 pub fn new_with_routing(
714 local_store: Arc<S>,
715 signaling: Arc<MeshRouter<R, F>>,
716 request_timeout: Duration,
717 debug: bool,
718 routing: MeshRoutingConfig,
719 ) -> Self {
720 let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
721 selector.set_fairness(routing.fairness_enabled);
722 selector.set_cashu_payment_weight(routing.cashu_payment_weight);
723 Self {
724 local_store,
725 signaling,
726 htl_configs: RwLock::new(HashMap::new()),
727 pending_requests: RwLock::new(HashMap::new()),
728 pending_quotes: RwLock::new(HashMap::new()),
729 pending_forward_requests: RwLock::new(HashMap::new()),
730 recent_forward_misses: Mutex::new(TimedSeenSet::new(
731 RECENT_FORWARD_MISS_CAPACITY,
732 Self::recent_forward_miss_ttl(request_timeout),
733 )),
734 issued_quotes: RwLock::new(HashMap::new()),
735 next_quote_id: RwLock::new(1),
736 read_sources: RwLock::new(HashMap::new()),
737 read_source_stats: RwLock::new(HashMap::new()),
738 inflight_source_fetches: Mutex::new(HashMap::new()),
739 peer_selector: RwLock::new(selector),
740 peer_active_requests: RwLock::new(HashMap::new()),
741 peer_wire_stats: RwLock::new(HashMap::new()),
742 pubsub_local_interests: RwLock::new(HashSet::new()),
743 pubsub_local_interest_versions: RwLock::new(HashMap::new()),
744 pubsub_peer_interests: RwLock::new(HashMap::new()),
745 pubsub_interest_routes: RwLock::new(HashMap::new()),
746 pubsub_interest_versions: RwLock::new(HashMap::new()),
747 pubsub_seen_interests: Mutex::new(TimedSeenSet::new(
748 PUBSUB_SEEN_CAPACITY,
749 PUBSUB_SEEN_TTL,
750 )),
751 pubsub_seen_frames: Mutex::new(TimedSeenSet::new(
752 PUBSUB_SEEN_CAPACITY,
753 PUBSUB_SEEN_TTL,
754 )),
755 pubsub_seen_inventories: Mutex::new(TimedSeenSet::new(
756 PUBSUB_SEEN_CAPACITY,
757 PUBSUB_SEEN_TTL,
758 )),
759 pubsub_seen_wants: Mutex::new(TimedSeenSet::new(PUBSUB_SEEN_CAPACITY, PUBSUB_SEEN_TTL)),
760 pubsub_inventory_routes: RwLock::new(HashMap::new()),
761 pubsub_want_routes: RwLock::new(HashMap::new()),
762 pubsub_upstream_wants: Mutex::new(TimedSeenSet::new(
763 PUBSUB_SEEN_CAPACITY,
764 PUBSUB_SEEN_TTL,
765 )),
766 pubsub_frame_cache: Mutex::new(VecDeque::new()),
767 pubsub_inbox: Mutex::new(VecDeque::new()),
768 pubsub_deferred_counts: RwLock::new(HashMap::new()),
769 next_pubsub_interest_seq: AtomicU64::new(1),
770 pending_response_sends: Mutex::new(Vec::new()),
771 response_scheduler_running: AtomicBool::new(false),
772 next_response_job_id: AtomicU64::new(1),
773 routing,
774 request_timeout,
775 debug,
776 running: RwLock::new(false),
777 }
778 }
779
780 fn recent_forward_miss_ttl(request_timeout: Duration) -> Duration {
781 let ttl_ms = request_timeout
782 .as_millis()
783 .saturating_mul(2)
784 .max(MIN_RECENT_FORWARD_MISS_TTL_MS as u128)
785 .min(u64::MAX as u128) as u64;
786 Duration::from_millis(ttl_ms)
787 }
788
789 pub async fn start(&self) -> Result<(), TransportError> {
791 *self.running.write().await = true;
792
793 self.signaling.send_hello(vec![]).await?;
795
796 Ok(())
797 }
798
799 pub async fn stop(&self) {
801 *self.running.write().await = false;
802 }
803
804 pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
806 let peer_id = msg.peer_id().to_string();
808 {
809 let mut configs = self.htl_configs.write().await;
810 if !configs.contains_key(&peer_id) {
811 configs.insert(peer_id.clone(), PeerHTLConfig::random());
812 }
813 }
814 self.peer_selector.write().await.add_peer(peer_id.clone());
815
816 let result = self.signaling.handle_message(msg).await;
817 if result.is_ok() {
818 self.announce_pubsub_interests_to_peer(&peer_id).await;
819 }
820 result
821 }
822
823 pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
825 &self.signaling
826 }
827
828 fn response_behavior(&self) -> ResponseBehaviorConfig {
829 self.routing.response_behavior.normalized()
830 }
831
832 async fn record_peer_wire_sent(&self, peer_id: &str, bytes: u64) {
833 if bytes == 0 {
834 return;
835 }
836 let mut stats = self.peer_wire_stats.write().await;
837 let entry = stats.entry(peer_id.to_string()).or_default();
838 entry.bytes_sent = entry.bytes_sent.saturating_add(bytes);
839 }
840
841 async fn record_peer_wire_received(&self, peer_id: &str, bytes: u64) {
842 if bytes == 0 {
843 return;
844 }
845 let mut stats = self.peer_wire_stats.write().await;
846 let entry = stats.entry(peer_id.to_string()).or_default();
847 entry.bytes_received = entry.bytes_received.saturating_add(bytes);
848 }
849
850 pub async fn record_useful_bytes_received_from_peer(&self, peer_id: &str, bytes: u64) {
855 if bytes == 0 {
856 return;
857 }
858 let mut stats = self.peer_wire_stats.write().await;
859 let entry = stats.entry(peer_id.to_string()).or_default();
860 entry.useful_bytes_received = entry.useful_bytes_received.saturating_add(bytes);
861 }
862
863 pub async fn peer_traffic_snapshot(&self, peer_id: &str) -> PeerTrafficSnapshot {
865 self.peer_wire_stats
866 .read()
867 .await
868 .get(peer_id)
869 .copied()
870 .unwrap_or_default()
871 }
872
873 pub async fn peer_traffic_snapshots(&self) -> HashMap<String, PeerTrafficSnapshot> {
875 self.peer_wire_stats.read().await.clone()
876 }
877
878 fn pubsub_key(origin_peer_id: &str, stream_id: &str, seq: u64) -> String {
879 format!("{origin_peer_id}:{stream_id}:{seq}")
880 }
881
882 fn pubsub_frame_key(frame: &PubsubFrame) -> String {
883 Self::pubsub_key(&frame.origin_peer_id, &frame.stream_id, frame.seq)
884 }
885
886 fn pubsub_interest_key(interest: &PubsubInterest) -> String {
887 format!(
888 "{}:{}:{}:{}",
889 interest.subscriber_peer_id, interest.stream_id, interest.seq, interest.active
890 )
891 }
892
893 fn next_pubsub_interest_seq(&self) -> u64 {
894 self.next_pubsub_interest_seq
895 .fetch_add(1, Ordering::Relaxed)
896 }
897
898 async fn record_peer_pubsub_wire_sent(&self, peer_id: &str, bytes: u64, bandwidth_debt: f64) {
899 if bytes == 0 {
900 return;
901 }
902 let mut stats = self.peer_wire_stats.write().await;
903 let entry = stats.entry(peer_id.to_string()).or_default();
904 entry.bytes_sent = entry.bytes_sent.saturating_add(bytes);
905 entry.bandwidth_debt = bandwidth_debt;
906 }
907
908 async fn send_pubsub_interest_to_peers(
909 &self,
910 interest: &PubsubInterest,
911 exclude_peer_id: Option<&str>,
912 ) -> PubsubPublishStats {
913 if !should_forward_htl(interest.htl) {
914 return PubsubPublishStats::default();
915 }
916
917 let mut peer_ids = self.signaling.peer_ids().await;
918 peer_ids.sort();
919 peer_ids.retain(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude));
920
921 let bytes = encode_pubsub_interest(interest);
922 let mut stats = PubsubPublishStats {
923 selected_peers: peer_ids.len(),
924 ..Default::default()
925 };
926 for peer_id in peer_ids {
927 let Some(channel) = self.signaling.get_channel(&peer_id).await else {
928 continue;
929 };
930 if channel.send(bytes.clone()).await.is_ok() {
931 stats.sent_peers += 1;
932 stats.sent_bytes = stats.sent_bytes.saturating_add(bytes.len() as u64);
933 self.record_peer_wire_sent(&peer_id, bytes.len() as u64)
934 .await;
935 }
936 }
937 stats
938 }
939
940 async fn announce_pubsub_interests_to_peer(&self, peer_id: &str) {
941 let mut interests = self
942 .pubsub_local_interests
943 .read()
944 .await
945 .iter()
946 .cloned()
947 .collect::<Vec<_>>();
948 interests.sort();
949 if interests.is_empty() {
950 return;
951 }
952
953 let interests = {
954 let versions = self.pubsub_local_interest_versions.read().await;
955 interests
956 .into_iter()
957 .filter_map(|stream_id| {
958 versions
959 .get(&stream_id)
960 .copied()
961 .map(|seq| (stream_id, seq))
962 })
963 .collect::<Vec<_>>()
964 };
965
966 for (stream_id, seq) in interests {
967 let interest = create_pubsub_interest(
968 stream_id,
969 self.signaling.peer_id().to_string(),
970 seq,
971 true,
972 MAX_HTL,
973 );
974 let Some(channel) = self.signaling.get_channel(peer_id).await else {
975 continue;
976 };
977 let bytes = encode_pubsub_interest(&interest);
978 if channel.send(bytes.clone()).await.is_ok() {
979 self.record_peer_wire_sent(peer_id, bytes.len() as u64)
980 .await;
981 }
982 }
983 }
984
985 fn remove_pubsub_peer_interest(
986 peer_interests: &mut HashMap<String, HashSet<String>>,
987 routes: &HashMap<(String, String), String>,
988 stream_id: &str,
989 peer_id: &str,
990 ) {
991 let still_has_route = routes
992 .iter()
993 .any(|((stream, _subscriber), peer)| stream == stream_id && peer == peer_id);
994 if still_has_route {
995 return;
996 }
997 if let Some(peers) = peer_interests.get_mut(stream_id) {
998 peers.remove(peer_id);
999 if peers.is_empty() {
1000 peer_interests.remove(stream_id);
1001 }
1002 }
1003 }
1004
1005 async fn apply_pubsub_interest_route(
1006 &self,
1007 from_peer: &str,
1008 interest: &PubsubInterest,
1009 ) -> bool {
1010 if interest.stream_id.is_empty() || interest.subscriber_peer_id.is_empty() {
1011 return false;
1012 }
1013 if interest.subscriber_peer_id == self.signaling.peer_id() {
1014 return false;
1015 }
1016
1017 let interest_key = Self::pubsub_interest_key(interest);
1018 if !self
1019 .pubsub_seen_interests
1020 .lock()
1021 .await
1022 .insert_if_new(interest_key)
1023 {
1024 return false;
1025 }
1026
1027 let route_key = (
1028 interest.stream_id.clone(),
1029 interest.subscriber_peer_id.clone(),
1030 );
1031 {
1032 let mut versions = self.pubsub_interest_versions.write().await;
1033 if versions
1034 .get(&route_key)
1035 .is_some_and(|latest| *latest >= interest.seq)
1036 {
1037 return false;
1038 }
1039 versions.insert(route_key.clone(), interest.seq);
1040 }
1041
1042 let mut peer_interests = self.pubsub_peer_interests.write().await;
1043 let mut routes = self.pubsub_interest_routes.write().await;
1044 if interest.active {
1045 if let Some(previous_peer) = routes.insert(route_key, from_peer.to_string()) {
1046 if previous_peer != from_peer {
1047 Self::remove_pubsub_peer_interest(
1048 &mut peer_interests,
1049 &routes,
1050 &interest.stream_id,
1051 &previous_peer,
1052 );
1053 }
1054 }
1055 peer_interests
1056 .entry(interest.stream_id.clone())
1057 .or_default()
1058 .insert(from_peer.to_string());
1059 } else if let Some(previous_peer) = routes.remove(&route_key) {
1060 Self::remove_pubsub_peer_interest(
1061 &mut peer_interests,
1062 &routes,
1063 &interest.stream_id,
1064 &previous_peer,
1065 );
1066 } else {
1067 Self::remove_pubsub_peer_interest(
1068 &mut peer_interests,
1069 &routes,
1070 &interest.stream_id,
1071 from_peer,
1072 );
1073 }
1074
1075 true
1076 }
1077
1078 async fn interested_pubsub_peers(
1079 &self,
1080 stream_id: &str,
1081 exclude_peer_id: Option<&str>,
1082 ) -> Vec<String> {
1083 let connected = self
1084 .signaling
1085 .peer_ids()
1086 .await
1087 .into_iter()
1088 .collect::<HashSet<_>>();
1089 let mut peers = self
1090 .pubsub_peer_interests
1091 .read()
1092 .await
1093 .get(stream_id)
1094 .map(|peers| peers.iter().cloned().collect::<Vec<_>>())
1095 .unwrap_or_default();
1096 peers.retain(|peer_id| {
1097 connected.contains(peer_id) && exclude_peer_id.is_none_or(|exclude| peer_id != exclude)
1098 });
1099 peers.sort();
1100 peers
1101 }
1102
1103 async fn decrement_pubsub_htl_for_peer(&self, peer_id: &str, htl: u8) -> u8 {
1104 let htl_config = {
1105 let configs = self.htl_configs.read().await;
1106 configs
1107 .get(peer_id)
1108 .cloned()
1109 .unwrap_or_else(PeerHTLConfig::random)
1110 };
1111 htl_config.decrement_with_policy(htl, &MESH_EVENT_POLICY)
1112 }
1113
1114 async fn send_pubsub_inventory_to_peers(
1115 &self,
1116 inv: &PubsubInventory,
1117 peer_ids: &[String],
1118 ) -> PubsubPublishStats {
1119 if peer_ids.is_empty() || !should_forward_htl(inv.htl) {
1120 return PubsubPublishStats::default();
1121 }
1122
1123 let mut stats = PubsubPublishStats {
1124 selected_peers: peer_ids.len(),
1125 ..Default::default()
1126 };
1127 for peer_id in peer_ids {
1128 let send_htl = self.decrement_pubsub_htl_for_peer(peer_id, inv.htl).await;
1129 if !should_forward_htl(send_htl) {
1130 continue;
1131 }
1132 let Some(channel) = self.signaling.get_channel(peer_id).await else {
1133 continue;
1134 };
1135 let mut outgoing = inv.clone();
1136 outgoing.htl = send_htl;
1137 let bytes = encode_pubsub_inventory(&outgoing);
1138 let message_bytes = bytes.len() as u64;
1139 if channel.send(bytes).await.is_ok() {
1140 stats.sent_peers += 1;
1141 stats.sent_bytes = stats.sent_bytes.saturating_add(message_bytes);
1142 self.record_peer_wire_sent(peer_id, message_bytes).await;
1143 }
1144 }
1145 stats
1146 }
1147
1148 async fn flood_pubsub_inventory(
1149 &self,
1150 inv: &PubsubInventory,
1151 exclude_peer_id: Option<&str>,
1152 ) -> PubsubPublishStats {
1153 let mut peer_ids = self.signaling.peer_ids().await;
1154 peer_ids.sort();
1155 peer_ids.retain(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude));
1156 self.send_pubsub_inventory_to_peers(inv, &peer_ids).await
1157 }
1158
1159 async fn send_pubsub_want_to_peer(&self, want: &PubsubWant, peer_id: &str) -> bool {
1160 let Some(channel) = self.signaling.get_channel(peer_id).await else {
1161 return false;
1162 };
1163 let bytes = encode_pubsub_want(want);
1164 let message_bytes = bytes.len() as u64;
1165 match channel.send(bytes).await {
1166 Ok(()) => {
1167 self.record_peer_wire_sent(peer_id, message_bytes).await;
1168 true
1169 }
1170 Err(_) => false,
1171 }
1172 }
1173
1174 async fn send_pubsub_want_upstream(
1175 &self,
1176 key: &str,
1177 want: &PubsubWant,
1178 exclude_peer_id: Option<&str>,
1179 ) -> bool {
1180 let upstream = {
1181 let routes = self.pubsub_inventory_routes.read().await;
1182 routes.get(key).cloned()
1183 };
1184 let Some(upstream) = upstream else {
1185 return false;
1186 };
1187 if exclude_peer_id.is_some_and(|exclude| exclude == upstream) {
1188 return false;
1189 }
1190 let want_key = format!("{key}:{upstream}");
1191 if !self
1192 .pubsub_upstream_wants
1193 .lock()
1194 .await
1195 .insert_if_new(want_key)
1196 {
1197 return false;
1198 }
1199 self.send_pubsub_want_to_peer(want, &upstream).await
1200 }
1201
1202 async fn cache_pubsub_frame(&self, key: String, frame: PubsubFrame) {
1203 let mut cache = self.pubsub_frame_cache.lock().await;
1204 if let Some(index) = cache.iter().position(|(cached_key, _)| cached_key == &key) {
1205 cache.remove(index);
1206 }
1207 cache.push_back((key, frame));
1208 while cache.len() > PUBSUB_FRAME_CACHE_CAPACITY {
1209 cache.pop_front();
1210 }
1211 }
1212
1213 async fn cached_pubsub_frame(&self, key: &str) -> Option<PubsubFrame> {
1214 self.pubsub_frame_cache
1215 .lock()
1216 .await
1217 .iter()
1218 .find_map(|(cached_key, frame)| {
1219 if cached_key == key {
1220 Some(frame.clone())
1221 } else {
1222 None
1223 }
1224 })
1225 }
1226
1227 async fn remember_pubsub_want_peer(&self, key: String, from_peer: &str) -> bool {
1228 let mut routes = self.pubsub_want_routes.write().await;
1229 routes.entry(key).or_default().insert(from_peer.to_string())
1230 }
1231
1232 async fn take_pubsub_want_peers(
1233 &self,
1234 key: &str,
1235 exclude_peer_id: Option<&str>,
1236 ) -> Vec<String> {
1237 let connected = self
1238 .signaling
1239 .peer_ids()
1240 .await
1241 .into_iter()
1242 .collect::<HashSet<_>>();
1243 let mut peers = self
1244 .pubsub_want_routes
1245 .write()
1246 .await
1247 .remove(key)
1248 .map(|peers| peers.into_iter().collect::<Vec<_>>())
1249 .unwrap_or_default();
1250 peers.retain(|peer_id| {
1251 connected.contains(peer_id) && exclude_peer_id.is_none_or(|exclude| peer_id != exclude)
1252 });
1253 peers.sort();
1254 peers
1255 }
1256
1257 async fn select_pubsub_peers(
1258 &self,
1259 stream_id: &str,
1260 seq: u64,
1261 message_bytes: u64,
1262 peer_ids: &[String],
1263 ) -> (Vec<String>, Vec<String>) {
1264 let traffic = self.peer_wire_stats.read().await;
1265 let deferred_counts = self.pubsub_deferred_counts.read().await;
1266 let candidates = peer_ids
1267 .iter()
1268 .map(|peer_id| PubsubCandidate {
1269 peer_id: peer_id.clone(),
1270 traffic: traffic.get(peer_id).copied().unwrap_or_default(),
1271 deferred_count: deferred_counts
1272 .get(&(stream_id.to_string(), peer_id.clone()))
1273 .copied()
1274 .unwrap_or_default(),
1275 })
1276 .collect::<Vec<_>>();
1277 drop(deferred_counts);
1278 drop(traffic);
1279
1280 let selection = self.routing.pubsub_scheduler.select(
1281 stream_id,
1282 seq,
1283 self.signaling.peer_id(),
1284 message_bytes,
1285 &candidates,
1286 );
1287
1288 {
1289 let mut deferred_counts = self.pubsub_deferred_counts.write().await;
1290 for peer_id in &selection.deferred {
1291 *deferred_counts
1292 .entry((stream_id.to_string(), peer_id.clone()))
1293 .or_insert(0) += 1;
1294 }
1295 for peer_id in &selection.selected {
1296 deferred_counts.remove(&(stream_id.to_string(), peer_id.clone()));
1297 }
1298 }
1299
1300 (selection.selected, selection.deferred)
1301 }
1302
1303 async fn send_pubsub_frame_to_peers(
1304 &self,
1305 frame: &PubsubFrame,
1306 peer_ids: &[String],
1307 ) -> PubsubPublishStats {
1308 if peer_ids.is_empty() || !should_forward_htl(frame.htl) {
1309 return PubsubPublishStats::default();
1310 }
1311
1312 let bytes = encode_pubsub_frame(frame);
1313 let message_bytes = bytes.len() as u64;
1314 let (selected, deferred) = self
1315 .select_pubsub_peers(&frame.stream_id, frame.seq, message_bytes, peer_ids)
1316 .await;
1317 let mut stats = PubsubPublishStats {
1318 selected_peers: selected.len(),
1319 deferred_peers: deferred.len(),
1320 ..Default::default()
1321 };
1322
1323 for peer_id in selected {
1324 let Some(channel) = self.signaling.get_channel(&peer_id).await else {
1325 continue;
1326 };
1327 let snapshot = self.peer_traffic_snapshot(&peer_id).await;
1328 let bandwidth_debt = reciprocal_virtual_finish(snapshot, message_bytes);
1329 if channel.send(bytes.clone()).await.is_ok() {
1330 stats.sent_peers += 1;
1331 stats.sent_bytes = stats.sent_bytes.saturating_add(message_bytes);
1332 self.record_peer_pubsub_wire_sent(&peer_id, message_bytes, bandwidth_debt)
1333 .await;
1334 }
1335 }
1336
1337 stats
1338 }
1339
1340 async fn enqueue_pubsub_event(&self, event: PubsubEvent) {
1341 let mut inbox = self.pubsub_inbox.lock().await;
1342 inbox.push_back(event);
1343 while inbox.len() > PUBSUB_INBOX_CAPACITY {
1344 inbox.pop_front();
1345 }
1346 }
1347
1348 pub async fn subscribe_pubsub(
1350 self: &Arc<Self>,
1351 stream_id: impl Into<String>,
1352 ) -> PubsubPublishStats {
1353 let stream_id = stream_id.into();
1354 if stream_id.is_empty() {
1355 return PubsubPublishStats::default();
1356 }
1357 self.pubsub_local_interests
1358 .write()
1359 .await
1360 .insert(stream_id.clone());
1361 let seq = {
1362 let mut versions = self.pubsub_local_interest_versions.write().await;
1363 match versions.get(&stream_id).copied() {
1364 Some(seq) => seq,
1365 None => {
1366 let seq = self.next_pubsub_interest_seq();
1367 versions.insert(stream_id.clone(), seq);
1368 seq
1369 }
1370 }
1371 };
1372 let interest = create_pubsub_interest(
1373 stream_id,
1374 self.signaling.peer_id().to_string(),
1375 seq,
1376 true,
1377 MAX_HTL,
1378 );
1379 self.send_pubsub_interest_to_peers(&interest, None).await
1380 }
1381
1382 pub async fn unsubscribe_pubsub(
1384 self: &Arc<Self>,
1385 stream_id: impl Into<String>,
1386 ) -> PubsubPublishStats {
1387 let stream_id = stream_id.into();
1388 if stream_id.is_empty() {
1389 return PubsubPublishStats::default();
1390 }
1391 self.pubsub_local_interests.write().await.remove(&stream_id);
1392 self.pubsub_local_interest_versions
1393 .write()
1394 .await
1395 .remove(&stream_id);
1396 let interest = create_pubsub_interest(
1397 stream_id,
1398 self.signaling.peer_id().to_string(),
1399 self.next_pubsub_interest_seq(),
1400 false,
1401 MAX_HTL,
1402 );
1403 self.send_pubsub_interest_to_peers(&interest, None).await
1404 }
1405
1406 pub async fn publish_pubsub(
1408 self: &Arc<Self>,
1409 stream_id: impl Into<String>,
1410 seq: u64,
1411 payload: Vec<u8>,
1412 ) -> PubsubPublishStats {
1413 let stream_id = stream_id.into();
1414 if stream_id.is_empty() {
1415 return PubsubPublishStats::default();
1416 }
1417 let payload_bytes = payload.len() as u64;
1418 let frame = create_pubsub_frame(
1419 stream_id.clone(),
1420 seq,
1421 self.signaling.peer_id().to_string(),
1422 payload.clone(),
1423 MAX_HTL,
1424 );
1425 let frame_key = Self::pubsub_frame_key(&frame);
1426 self.pubsub_seen_frames
1427 .lock()
1428 .await
1429 .insert_if_new(frame_key.clone());
1430 self.cache_pubsub_frame(frame_key, frame.clone()).await;
1431
1432 if self
1433 .pubsub_local_interests
1434 .read()
1435 .await
1436 .contains(&stream_id)
1437 {
1438 self.enqueue_pubsub_event(PubsubEvent {
1439 stream_id: stream_id.clone(),
1440 seq,
1441 origin_peer_id: self.signaling.peer_id().to_string(),
1442 from_peer_id: self.signaling.peer_id().to_string(),
1443 payload,
1444 })
1445 .await;
1446 }
1447
1448 match self.routing.pubsub_delivery_mode {
1449 PubsubDeliveryMode::InterestPush => {
1450 let peers = self.interested_pubsub_peers(&stream_id, None).await;
1451 self.send_pubsub_frame_to_peers(&frame, &peers).await
1452 }
1453 PubsubDeliveryMode::HtlInvWant => {
1454 let inv = create_pubsub_inventory(
1455 stream_id,
1456 seq,
1457 self.signaling.peer_id().to_string(),
1458 payload_bytes,
1459 MESH_EVENT_POLICY.max_htl,
1460 );
1461 self.flood_pubsub_inventory(&inv, None).await
1462 }
1463 }
1464 }
1465
1466 pub async fn drain_pubsub_events(&self) -> Vec<PubsubEvent> {
1468 self.pubsub_inbox.lock().await.drain(..).collect()
1469 }
1470
1471 pub async fn pubsub_interest_peers(&self, stream_id: &str) -> Vec<String> {
1473 self.interested_pubsub_peers(stream_id, None).await
1474 }
1475
1476 fn choose_ready_response_job(
1477 ready_jobs: &[(u64, String, usize, Instant, u64)],
1478 stats: &HashMap<String, PeerWireStats>,
1479 ) -> Option<(u64, f64)> {
1480 let jobs = ready_jobs
1481 .iter()
1482 .map(|job| OutboundJobCandidate {
1483 job_id: job.0,
1484 peer_id: job.1.clone(),
1485 message_bytes: job.2 as u64,
1486 queue_sequence: job.4,
1487 })
1488 .collect::<Vec<_>>();
1489 select_reciprocal_outbound_job(&jobs, |peer_id| {
1490 stats.get(peer_id).copied().unwrap_or_default()
1491 })
1492 .map(|choice| (choice.job_id, choice.virtual_finish))
1493 }
1494
1495 async fn enqueue_response_send(
1496 self: &Arc<Self>,
1497 peer_id: String,
1498 bytes: Vec<u8>,
1499 ready_at: Instant,
1500 ) {
1501 let job_id = self.next_response_job_id.fetch_add(1, Ordering::Relaxed);
1502 {
1503 let mut queue = self.pending_response_sends.lock().await;
1504 queue.push(PendingResponseSend {
1505 job_id,
1506 peer_id,
1507 bytes,
1508 ready_at,
1509 queue_sequence: job_id,
1510 });
1511 }
1512
1513 if self
1514 .response_scheduler_running
1515 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1516 .is_ok()
1517 {
1518 let this = Arc::clone(self);
1519 tokio::spawn(async move {
1520 this.run_response_scheduler().await;
1521 });
1522 }
1523 }
1524
1525 async fn run_response_scheduler(self: Arc<Self>) {
1526 loop {
1527 let snapshot = {
1528 let queue = self.pending_response_sends.lock().await;
1529 if queue.is_empty() {
1530 self.response_scheduler_running
1531 .store(false, Ordering::Release);
1532 return;
1533 }
1534 queue
1535 .iter()
1536 .map(|job| {
1537 (
1538 job.job_id,
1539 job.peer_id.clone(),
1540 job.bytes.len(),
1541 job.ready_at,
1542 job.queue_sequence,
1543 )
1544 })
1545 .collect::<Vec<_>>()
1546 };
1547
1548 let now = Instant::now();
1549 let mut earliest_ready_at: Option<Instant> = None;
1550 let mut ready_jobs = Vec::new();
1551 for job in &snapshot {
1552 if job.3 <= now {
1553 ready_jobs.push(job.clone());
1554 } else {
1555 earliest_ready_at = Some(match earliest_ready_at {
1556 Some(current) => current.min(job.3),
1557 None => job.3,
1558 });
1559 }
1560 }
1561
1562 if ready_jobs.is_empty() {
1563 if let Some(ready_at) = earliest_ready_at {
1564 tokio::time::sleep(ready_at.saturating_duration_since(Instant::now())).await;
1565 continue;
1566 }
1567 self.response_scheduler_running
1568 .store(false, Ordering::Release);
1569 return;
1570 }
1571
1572 let (selected_job_id, selected_finish) = {
1573 let stats = self.peer_wire_stats.read().await;
1574 Self::choose_ready_response_job(&ready_jobs, &stats).expect("ready response job")
1575 };
1576
1577 let selected = {
1578 let mut queue = self.pending_response_sends.lock().await;
1579 let Some(index) = queue.iter().position(|job| job.job_id == selected_job_id) else {
1580 continue;
1581 };
1582 queue.swap_remove(index)
1583 };
1584
1585 let sent = if let Some(channel) = self.signaling.get_channel(&selected.peer_id).await {
1586 channel.send(selected.bytes.clone()).await.is_ok()
1587 } else {
1588 false
1589 };
1590
1591 let queued_peers = {
1592 let queue = self.pending_response_sends.lock().await;
1593 queue
1594 .iter()
1595 .map(|job| job.peer_id.clone())
1596 .collect::<HashSet<_>>()
1597 };
1598 let mut stats = self.peer_wire_stats.write().await;
1599 let entry = stats.entry(selected.peer_id.clone()).or_default();
1600 if sent {
1601 entry.bytes_sent = entry.bytes_sent.saturating_add(selected.bytes.len() as u64);
1602 entry.bandwidth_debt = selected_finish;
1603 }
1604 if queued_peers.is_empty() {
1605 for peer_stats in stats.values_mut() {
1606 peer_stats.bandwidth_debt = 0.0;
1607 }
1608 } else {
1609 let floor = queued_peers
1610 .iter()
1611 .filter_map(|peer_id| stats.get(peer_id).map(|peer| peer.bandwidth_debt))
1612 .fold(f64::INFINITY, f64::min);
1613 if floor.is_finite() && floor > 0.0 {
1614 for peer_id in queued_peers {
1615 if let Some(peer_stats) = stats.get_mut(&peer_id) {
1616 peer_stats.bandwidth_debt =
1617 (peer_stats.bandwidth_debt - floor).max(0.0);
1618 }
1619 }
1620 }
1621 }
1622 }
1623 }
1624
1625 fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
1626 let mut hasher = DefaultHasher::new();
1627 peer_id.hash(&mut hasher);
1628 hash.hash(&mut hasher);
1629 salt.hash(&mut hasher);
1630 let v = hasher.finish();
1631 (v as f64) / (u64::MAX as f64)
1632 }
1633
1634 fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
1635 Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
1636 }
1637
1638 fn peer_metadata_pointer_slot_hash() -> Hash {
1639 hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
1640 }
1641
1642 fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
1643 let bytes = hex::decode(hash_hex)
1644 .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
1645 if bytes.len() != 32 {
1646 return Err(StoreError::Other(format!(
1647 "Invalid hash length {}, expected 32 bytes",
1648 bytes.len()
1649 )));
1650 }
1651 let mut hash = [0u8; 32];
1652 hash.copy_from_slice(&bytes);
1653 Ok(hash)
1654 }
1655
1656 fn should_drop_response(&self, hash: &Hash) -> bool {
1657 let p = self.response_behavior().drop_response_prob;
1658 if p <= 0.0 {
1659 return false;
1660 }
1661 self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
1662 }
1663
1664 fn should_corrupt_response(&self, hash: &Hash) -> bool {
1665 let p = self.response_behavior().corrupt_response_prob;
1666 if p <= 0.0 {
1667 return false;
1668 }
1669 self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
1670 }
1671
1672 fn should_stall_response(&self, hash: &Hash) -> bool {
1673 let p = self.response_behavior().stall_response_prob;
1674 if p <= 0.0 {
1675 return false;
1676 }
1677 self.deterministic_actor_draw(hash, 0x5A_11_5A_11_5A_11_5A_11) < p
1678 }
1679
1680 fn response_send_delay(&self, hash: &Hash, payload_len: usize) -> Duration {
1681 let behavior = self.response_behavior();
1682 let mut total_ms = behavior
1683 .extra_delay_ms
1684 .saturating_add(behavior.first_byte_delay_ms);
1685
1686 if behavior.bytes_per_second > 0 && payload_len > 0 {
1687 let throughput_ms = ((payload_len as u128) * 1000)
1688 .div_ceil(behavior.bytes_per_second as u128)
1689 .min(u64::MAX as u128) as u64;
1690 total_ms = total_ms.saturating_add(throughput_ms);
1691 }
1692
1693 if behavior.stall_delay_ms > 0 && self.should_stall_response(hash) {
1694 total_ms = total_ms.saturating_add(behavior.stall_delay_ms);
1695 }
1696
1697 Duration::from_millis(total_ms)
1698 }
1699
1700 async fn ordered_connected_peers(&self, exclude_peer_id: Option<&str>) -> Vec<String> {
1701 let current_peer_ids = self.signaling.peer_ids().await;
1702 if current_peer_ids.is_empty() {
1703 return Vec::new();
1704 }
1705
1706 sync_selector_peers(&self.peer_selector, ¤t_peer_ids).await;
1707 let hash_get_peer_ids: HashSet<String> = self
1708 .signaling
1709 .hash_get_peer_ids()
1710 .await
1711 .into_iter()
1712 .collect();
1713 let mut candidate_peer_ids: Vec<String> = current_peer_ids
1714 .into_iter()
1715 .filter(|peer_id| hash_get_peer_ids.contains(peer_id))
1716 .filter(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude))
1717 .collect();
1718 if candidate_peer_ids.is_empty() {
1719 return Vec::new();
1720 }
1721
1722 let current_set: HashSet<&str> = candidate_peer_ids.iter().map(String::as_str).collect();
1723 let mut selector = self.peer_selector.write().await;
1724 let mut selector_order = selector.select_peers();
1725 selector_order.retain(|peer_id| current_set.contains(peer_id.as_str()));
1726 if selector_order.is_empty() {
1727 let mut fallback = candidate_peer_ids;
1728 fallback.sort();
1729 return fallback;
1730 }
1731 let backed_off: HashMap<String, bool> = candidate_peer_ids
1732 .iter()
1733 .map(|peer_id| (peer_id.clone(), selector.is_peer_backed_off(peer_id)))
1734 .collect();
1735 drop(selector);
1736
1737 let rank: HashMap<&str, usize> = selector_order
1738 .iter()
1739 .enumerate()
1740 .map(|(idx, peer_id)| (peer_id.as_str(), idx))
1741 .collect();
1742 let active = self.peer_active_requests.read().await;
1743 candidate_peer_ids.sort_by(|left, right| {
1744 let left_backed_off = backed_off.get(left).copied().unwrap_or(false);
1745 let right_backed_off = backed_off.get(right).copied().unwrap_or(false);
1746 if left_backed_off != right_backed_off {
1747 return if left_backed_off {
1748 std::cmp::Ordering::Greater
1749 } else {
1750 std::cmp::Ordering::Less
1751 };
1752 }
1753 let left_rank = rank.get(left.as_str()).copied().unwrap_or(usize::MAX / 2);
1754 let right_rank = rank.get(right.as_str()).copied().unwrap_or(usize::MAX / 2);
1755 let left_load = active.get(left).copied().unwrap_or(0);
1756 let right_load = active.get(right).copied().unwrap_or(0);
1757 (left_rank + left_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY))
1758 .cmp(&(right_rank + right_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY)))
1759 .then_with(|| left.cmp(right))
1760 });
1761 candidate_peer_ids
1762 }
1763
1764 async fn reserve_peer_request(&self, peer_id: &str) {
1765 let mut active = self.peer_active_requests.write().await;
1766 *active.entry(peer_id.to_string()).or_insert(0) += 1;
1767 }
1768
1769 async fn release_peer_request(&self, peer_id: &str) {
1770 let mut active = self.peer_active_requests.write().await;
1771 let Some(count) = active.get_mut(peer_id) else {
1772 return;
1773 };
1774 if *count <= 1 {
1775 active.remove(peer_id);
1776 } else {
1777 *count -= 1;
1778 }
1779 }
1780
1781 async fn release_queried_peer_requests(&self, peer_ids: &[String]) {
1782 for peer_id in peer_ids {
1783 self.release_peer_request(peer_id).await;
1784 }
1785 }
1786
1787 fn requested_quote_mint(&self) -> Option<&str> {
1788 if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
1789 if self.routing.cashu_accepted_mints.is_empty()
1790 || self
1791 .routing
1792 .cashu_accepted_mints
1793 .iter()
1794 .any(|mint| mint == default_mint)
1795 {
1796 return Some(default_mint);
1797 }
1798 }
1799
1800 self.routing
1801 .cashu_accepted_mints
1802 .first()
1803 .map(String::as_str)
1804 }
1805
1806 fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
1807 if let Some(requested_mint) = requested_mint {
1808 if self.accepts_quote_mint(Some(requested_mint)) {
1809 return Some(requested_mint.to_string());
1810 }
1811 }
1812 if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
1813 return Some(default_mint.clone());
1814 }
1815 if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
1816 return Some(first_mint.clone());
1817 }
1818 requested_mint.map(str::to_string)
1819 }
1820
1821 fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
1822 if self.routing.cashu_accepted_mints.is_empty() {
1823 return true;
1824 }
1825
1826 let Some(mint_url) = mint_url else {
1827 return false;
1828 };
1829 self.routing
1830 .cashu_accepted_mints
1831 .iter()
1832 .any(|mint| mint == mint_url)
1833 }
1834
1835 fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
1836 let Some(mint_url) = mint_url else {
1837 return self.routing.cashu_default_mint.is_none()
1838 && self.routing.cashu_accepted_mints.is_empty();
1839 };
1840 self.routing.cashu_default_mint.as_deref() == Some(mint_url)
1841 || self
1842 .routing
1843 .cashu_accepted_mints
1844 .iter()
1845 .any(|mint| mint == mint_url)
1846 }
1847
1848 async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
1849 let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
1850 if base == 0 {
1851 return 0;
1852 }
1853
1854 let selector = self.peer_selector.read().await;
1855 let Some(stats) = selector.get_stats(peer_id) else {
1856 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
1857 return if max_cap > 0 { base.min(max_cap) } else { base };
1858 };
1859
1860 if stats.cashu_payment_defaults > 0
1861 && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
1862 {
1863 return 0;
1864 }
1865
1866 let success_bonus = stats
1867 .successes
1868 .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
1869 let receipt_bonus = stats
1870 .cashu_payment_receipts
1871 .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
1872 let mut cap = base
1873 .saturating_add(success_bonus)
1874 .saturating_add(receipt_bonus);
1875 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
1876 if max_cap > 0 {
1877 cap = cap.min(max_cap);
1878 }
1879 cap
1880 }
1881
1882 async fn should_accept_quote_response(
1883 &self,
1884 from_peer: &str,
1885 preferred_mint_url: Option<&str>,
1886 offered_payment_sat: u64,
1887 res: &DataQuoteResponse,
1888 ) -> bool {
1889 let Some(payment_sat) = res.p else {
1890 return false;
1891 };
1892 if payment_sat > offered_payment_sat {
1893 return false;
1894 }
1895
1896 let response_mint = res.m.as_deref();
1897 if response_mint == preferred_mint_url {
1898 return true;
1899 }
1900 if self.trusts_quote_mint(response_mint) {
1901 return true;
1902 }
1903 if response_mint.is_none() {
1904 return false;
1905 }
1906
1907 payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
1908 }
1909
1910 async fn issue_quote(
1911 &self,
1912 peer_id: &str,
1913 hash_key: &str,
1914 payment_sat: u64,
1915 ttl_ms: u32,
1916 mint_url: Option<&str>,
1917 ) -> u64 {
1918 let quote_id = {
1919 let mut next = self.next_quote_id.write().await;
1920 let quote_id = *next;
1921 *next = next.saturating_add(1);
1922 quote_id
1923 };
1924
1925 let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
1926 self.issued_quotes.write().await.insert(
1927 (peer_id.to_string(), hash_key.to_string(), quote_id),
1928 IssuedQuote {
1929 expires_at,
1930 payment_sat,
1931 mint_url: mint_url.map(str::to_string),
1932 },
1933 );
1934 quote_id
1935 }
1936
1937 async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
1938 let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
1939 let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
1940 return false;
1941 };
1942 quote.expires_at > Instant::now()
1943 }
1944
1945 async fn send_request_to_peer(
1946 &self,
1947 peer_id: &str,
1948 hash: &Hash,
1949 request_htl: u8,
1950 quote_id: Option<u64>,
1951 ) -> bool {
1952 if !should_forward_htl(request_htl) {
1953 return false;
1954 }
1955
1956 let channel = match self.signaling.get_channel(peer_id).await {
1957 Some(c) => c,
1958 None => return false,
1959 };
1960
1961 let htl_config = {
1962 let configs = self.htl_configs.read().await;
1963 configs
1964 .get(peer_id)
1965 .cloned()
1966 .unwrap_or_else(PeerHTLConfig::random)
1967 };
1968
1969 let send_htl = htl_config.decrement(request_htl);
1970 let req = match quote_id {
1971 Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
1972 None => create_request(hash, send_htl),
1973 };
1974 let request_bytes = encode_request(&req);
1975 let request_len = request_bytes.len() as u64;
1976
1977 {
1978 let mut selector = self.peer_selector.write().await;
1979 selector.record_request(peer_id, request_len);
1980 }
1981
1982 match channel.send(request_bytes).await {
1983 Ok(()) => {
1984 self.record_peer_wire_sent(peer_id, request_len).await;
1985 true
1986 }
1987 Err(_) => {
1988 self.peer_selector.write().await.record_failure(peer_id);
1989 false
1990 }
1991 }
1992 }
1993
1994 async fn send_quote_request_to_peer(
1995 &self,
1996 peer_id: &str,
1997 hash: &Hash,
1998 payment_sat: u64,
1999 ttl_ms: u32,
2000 mint_url: Option<&str>,
2001 ) -> bool {
2002 let channel = match self.signaling.get_channel(peer_id).await {
2003 Some(c) => c,
2004 None => return false,
2005 };
2006
2007 let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
2008 let request_bytes = encode_quote_request(&req);
2009 let request_len = request_bytes.len() as u64;
2010
2011 match channel.send(request_bytes).await {
2012 Ok(()) => {
2013 self.record_peer_wire_sent(peer_id, request_len).await;
2014 true
2015 }
2016 Err(_) => false,
2017 }
2018 }
2019
2020 pub async fn set_read_sources(&self, sources: Vec<Arc<dyn MeshReadSource>>) {
2021 let mut by_id = HashMap::new();
2022 let mut stats = self.read_source_stats.write().await;
2023 for source in sources {
2024 let source_id = source.id().to_string();
2025 by_id.insert(source_id.clone(), source);
2026 stats
2027 .entry(source_id)
2028 .or_insert_with(AdaptiveSourceStats::default);
2029 }
2030 *self.read_sources.write().await = by_id;
2031 }
2032
2033 async fn record_read_source_request(&self, source_id: &str) {
2034 let mut stats = self.read_source_stats.write().await;
2035 stats
2036 .entry(source_id.to_string())
2037 .or_insert_with(AdaptiveSourceStats::default)
2038 .requests += 1;
2039 }
2040
2041 async fn record_read_source_miss(&self, source_id: &str) {
2042 let mut stats = self.read_source_stats.write().await;
2043 stats
2044 .entry(source_id.to_string())
2045 .or_insert_with(AdaptiveSourceStats::default)
2046 .misses += 1;
2047 }
2048
2049 async fn record_read_source_success(&self, source_id: &str, elapsed_ms: u64) {
2050 let now = Instant::now();
2051 let mut stats = self.read_source_stats.write().await;
2052 let stats = stats
2053 .entry(source_id.to_string())
2054 .or_insert_with(AdaptiveSourceStats::default);
2055 stats.successes += 1;
2056 stats.last_success_at = Some(now);
2057 stats.backoff_level = 0;
2058 stats.backed_off_until = None;
2059 if stats.srtt_ms <= 0.0 {
2060 stats.srtt_ms = elapsed_ms as f64;
2061 stats.rttvar_ms = elapsed_ms as f64 / 2.0;
2062 return;
2063 }
2064 let elapsed = elapsed_ms as f64;
2065 stats.rttvar_ms = 0.75 * stats.rttvar_ms + 0.25 * (stats.srtt_ms - elapsed).abs();
2066 stats.srtt_ms = 0.875 * stats.srtt_ms + 0.125 * elapsed;
2067 }
2068
2069 async fn record_read_source_failure(&self, source_id: &str) {
2070 let now = Instant::now();
2071 let mut stats = self.read_source_stats.write().await;
2072 let stats = stats
2073 .entry(source_id.to_string())
2074 .or_insert_with(AdaptiveSourceStats::default);
2075 stats.failures += 1;
2076 stats.last_failure_at = Some(now);
2077 Self::apply_source_backoff(stats, now);
2078 }
2079
2080 async fn record_read_source_timeout(&self, source_id: &str) {
2081 let now = Instant::now();
2082 let mut stats = self.read_source_stats.write().await;
2083 let stats = stats
2084 .entry(source_id.to_string())
2085 .or_insert_with(AdaptiveSourceStats::default);
2086 stats.timeouts += 1;
2087 stats.last_failure_at = Some(now);
2088 Self::apply_source_backoff(stats, now);
2089 }
2090
2091 fn apply_source_backoff(stats: &mut AdaptiveSourceStats, now: Instant) {
2092 stats.backoff_level = stats.backoff_level.saturating_add(1);
2093 let backoff_ms = (INITIAL_SOURCE_BACKOFF_MS
2094 .saturating_mul(2u64.saturating_pow(stats.backoff_level.saturating_sub(1))))
2095 .min(MAX_SOURCE_BACKOFF_MS);
2096 stats.backed_off_until = Some(now + Duration::from_millis(backoff_ms));
2097 }
2098
2099 async fn ordered_read_sources(&self) -> Vec<Arc<dyn MeshReadSource>> {
2100 let sources = self.read_sources.read().await;
2101 if sources.is_empty() {
2102 return Vec::new();
2103 }
2104
2105 let mut available: Vec<Arc<dyn MeshReadSource>> = sources
2106 .values()
2107 .filter(|source| source.is_available())
2108 .cloned()
2109 .collect();
2110 if available.is_empty() {
2111 return Vec::new();
2112 }
2113
2114 let now = Instant::now();
2115 let stats = self.read_source_stats.read().await;
2116 let mut healthy: Vec<Arc<dyn MeshReadSource>> = available
2117 .iter()
2118 .filter(|source| {
2119 stats
2120 .get(source.id())
2121 .and_then(|s| s.backed_off_until)
2122 .is_none_or(|until| until <= now)
2123 })
2124 .cloned()
2125 .collect();
2126 if !healthy.is_empty() {
2127 available = std::mem::take(&mut healthy);
2128 }
2129
2130 available.sort_by(|left, right| {
2131 let left_stats = stats.get(left.id()).cloned().unwrap_or_default();
2132 let right_stats = stats.get(right.id()).cloned().unwrap_or_default();
2133 adaptive_source_score(&right_stats, now)
2134 .partial_cmp(&adaptive_source_score(&left_stats, now))
2135 .unwrap_or(std::cmp::Ordering::Equal)
2136 .then_with(|| left.id().cmp(right.id()))
2137 });
2138 available
2139 }
2140
2141 async fn should_probe_multiple_read_sources(
2142 &self,
2143 ordered_sources: &[Arc<dyn MeshReadSource>],
2144 ) -> bool {
2145 if ordered_sources.len() <= 1 {
2146 return false;
2147 }
2148 let stats = self.read_source_stats.read().await;
2149 let best = stats
2150 .get(ordered_sources[0].id())
2151 .cloned()
2152 .unwrap_or_default();
2153 let second = stats
2154 .get(ordered_sources[1].id())
2155 .cloned()
2156 .unwrap_or_default();
2157 if !source_has_history(&best) || !source_has_history(&second) {
2158 return false;
2159 }
2160 let now = Instant::now();
2161 adaptive_source_score(&best, now) - adaptive_source_score(&second, now)
2162 < SOURCE_SCORE_TIE_DELTA
2163 }
2164
2165 async fn source_dispatch_for(&self, source_count: usize) -> RequestDispatchConfig {
2166 if source_count == 0 {
2167 return self.routing.dispatch;
2168 }
2169 let ordered_sources = self.ordered_read_sources().await;
2170 let probe_multiple = self
2171 .should_probe_multiple_read_sources(&ordered_sources)
2172 .await;
2173 let initial_fanout = if probe_multiple {
2174 source_count.min(2)
2175 } else {
2176 1
2177 };
2178 RequestDispatchConfig {
2179 initial_fanout,
2180 hedge_fanout: self.routing.dispatch.hedge_fanout,
2181 max_fanout: self.routing.dispatch.max_fanout.min(source_count),
2182 hedge_interval_ms: self.routing.dispatch.hedge_interval_ms,
2183 }
2184 }
2185
2186 pub async fn peer_count(&self) -> usize {
2188 self.signaling.peer_count().await
2189 }
2190
2191 pub async fn needs_peers(&self) -> bool {
2193 self.signaling.needs_peers().await
2194 }
2195
2196 pub async fn send_hello(&self) -> Result<(), TransportError> {
2198 self.signaling.send_hello(vec![]).await
2199 }
2200
2201 pub async fn drain_available_data_messages(self: &Arc<Self>) -> DataPumpStats {
2206 let mut stats = DataPumpStats::default();
2207 let peer_ids = self.signaling.peer_ids().await;
2208 for peer_id in peer_ids {
2209 let Some(channel) = self.signaling.get_channel(&peer_id).await else {
2210 continue;
2211 };
2212
2213 while let Some(data) = channel.try_recv() {
2214 stats.processed += 1;
2215 stats.processed_bytes += data.len() as u64;
2216 if let Some(msg) = parse_message(&data) {
2217 match msg {
2218 DataMessage::Request(_) => stats.request_messages += 1,
2219 DataMessage::Response(_) => stats.response_messages += 1,
2220 DataMessage::QuoteRequest(_) => stats.quote_request_messages += 1,
2221 DataMessage::QuoteResponse(_) => stats.quote_response_messages += 1,
2222 DataMessage::PubsubInterest(_) => stats.pubsub_interest_messages += 1,
2223 DataMessage::PubsubFrame(_) => stats.pubsub_frame_messages += 1,
2224 DataMessage::PubsubInventory(_) => stats.pubsub_inventory_messages += 1,
2225 DataMessage::PubsubWant(_) => stats.pubsub_want_messages += 1,
2226 DataMessage::Payment(_)
2227 | DataMessage::PaymentAck(_)
2228 | DataMessage::Chunk(_)
2229 | DataMessage::PeerHints(_) => {}
2230 }
2231 }
2232 self.handle_data_message(&peer_id, &data).await;
2233 }
2234 }
2235 stats
2236 }
2237
2238 pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
2240 self.peer_selector
2241 .write()
2242 .await
2243 .record_cashu_payment(peer_id, amount_sat);
2244 }
2245
2246 pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
2248 self.peer_selector
2249 .write()
2250 .await
2251 .record_cashu_receipt(peer_id, amount_sat);
2252 }
2253
2254 pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
2256 self.peer_selector
2257 .write()
2258 .await
2259 .record_cashu_payment_default(peer_id);
2260 }
2261
2262 pub async fn selector_summary(&self) -> crate::peer_selector::SelectorSummary {
2264 self.peer_selector.read().await.summary()
2265 }
2266
2267 fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
2268 selector.is_peer_blocked_for_payment_defaults(
2269 peer_id,
2270 self.routing.cashu_payment_default_block_threshold,
2271 )
2272 }
2273
2274 pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
2276 self.peer_selector
2277 .read()
2278 .await
2279 .export_peer_metadata_snapshot()
2280 }
2281
2282 pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
2287 let snapshot = self
2288 .peer_selector
2289 .read()
2290 .await
2291 .export_peer_metadata_snapshot();
2292 let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
2293 StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
2294 })?;
2295 let snapshot_hash = hashtree_core::sha256(&bytes);
2296 let _ = self.local_store.put(snapshot_hash, bytes).await?;
2297
2298 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
2299 let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
2300 let _ = self.local_store.delete(&pointer_slot).await?;
2301 let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
2302
2303 Ok(snapshot_hash)
2304 }
2305
2306 pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
2308 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
2309 let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
2310 return Ok(false);
2311 };
2312 let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
2313 StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
2314 })?;
2315 let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
2316
2317 let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
2318 return Ok(false);
2319 };
2320 let snapshot: PeerMetadataSnapshot =
2321 serde_json::from_slice(&snapshot_bytes).map_err(|e| {
2322 StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
2323 })?;
2324 self.peer_selector
2325 .write()
2326 .await
2327 .import_peer_metadata_snapshot(&snapshot);
2328 Ok(true)
2329 }
2330
2331 pub async fn get_with_quote(
2336 &self,
2337 hash: &Hash,
2338 payment_sat: u64,
2339 quote_ttl: Duration,
2340 ) -> Result<Option<Vec<u8>>, StoreError> {
2341 if let Some(data) = self.local_store.get(hash).await? {
2342 return Ok(Some(data));
2343 }
2344 Ok(self
2345 .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
2346 .await)
2347 }
2348
2349 async fn request_from_peers_with_quote(
2350 &self,
2351 hash: &Hash,
2352 payment_sat: u64,
2353 quote_ttl: Duration,
2354 ) -> Option<Vec<u8>> {
2355 let ordered_peer_ids = self.ordered_connected_peers(None).await;
2356 if ordered_peer_ids.is_empty() {
2357 return None;
2358 }
2359
2360 if let Some(quote) = self
2361 .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
2362 .await
2363 {
2364 if let Some(data) = self
2365 .request_from_single_peer(hash, "e.peer_id, MAX_HTL, Some(quote.quote_id))
2366 .await
2367 {
2368 return Some(data);
2369 }
2370 }
2371
2372 self.request_from_mesh(hash).await
2373 }
2374
2375 async fn request_quote_from_peers(
2376 &self,
2377 hash: &Hash,
2378 payment_sat: u64,
2379 quote_ttl: Duration,
2380 ordered_peer_ids: &[String],
2381 ) -> Option<NegotiatedQuote> {
2382 if ordered_peer_ids.is_empty() {
2383 return None;
2384 }
2385 let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
2386 if ttl_ms == 0 {
2387 return None;
2388 }
2389 let requested_mint = self.requested_quote_mint().map(str::to_string);
2390
2391 let hash_key = hash_to_key(hash);
2392 let (tx, rx) = oneshot::channel();
2393 self.pending_quotes.write().await.insert(
2394 hash_key.clone(),
2395 PendingQuoteRequest {
2396 response_tx: tx,
2397 preferred_mint_url: requested_mint.clone(),
2398 offered_payment_sat: payment_sat,
2399 },
2400 );
2401
2402 let rx = Arc::new(Mutex::new(rx));
2403 let result = run_hedged_waves(
2404 ordered_peer_ids.len(),
2405 self.routing.dispatch,
2406 self.request_timeout,
2407 |range| {
2408 let wave_peer_ids = ordered_peer_ids[range].to_vec();
2409 let requested_mint = requested_mint.clone();
2410 let hash = *hash;
2411 async move {
2412 let mut sent = 0usize;
2413 for peer_id in wave_peer_ids {
2414 if self
2415 .send_quote_request_to_peer(
2416 &peer_id,
2417 &hash,
2418 payment_sat,
2419 ttl_ms,
2420 requested_mint.as_deref(),
2421 )
2422 .await
2423 {
2424 sent += 1;
2425 }
2426 }
2427 sent
2428 }
2429 },
2430 |wait| {
2431 let rx = rx.clone();
2432 async move {
2433 let mut rx = rx.lock().await;
2434 match tokio::time::timeout(wait, &mut *rx).await {
2435 Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
2436 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
2437 Err(_) => HedgedWaveAction::Continue,
2438 }
2439 }
2440 },
2441 )
2442 .await;
2443 let _ = self.pending_quotes.write().await.remove(&hash_key);
2444 result
2445 }
2446
2447 async fn request_from_single_peer(
2448 &self,
2449 hash: &Hash,
2450 peer_id: &str,
2451 request_htl: u8,
2452 quote_id: Option<u64>,
2453 ) -> Option<Vec<u8>> {
2454 let hash_key = hash_to_key(hash);
2455 let (tx, rx) = oneshot::channel();
2456 self.pending_requests.write().await.insert(
2457 hash_key.clone(),
2458 PendingRequest {
2459 response_tx: tx,
2460 started_at: Instant::now(),
2461 queried_peers: vec![peer_id.to_string()],
2462 },
2463 );
2464
2465 let mut rx = rx;
2466 if !self
2467 .send_request_to_peer(peer_id, hash, request_htl, quote_id)
2468 .await
2469 {
2470 let _ = self.pending_requests.write().await.remove(&hash_key);
2471 return None;
2472 }
2473 self.reserve_peer_request(peer_id).await;
2474
2475 if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
2476 if hashtree_core::sha256(&data) == *hash {
2477 let _ = self.local_store.put(*hash, data.clone()).await;
2478 return Some(data);
2479 }
2480 }
2481
2482 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2483 self.release_queried_peer_requests(&pending.queried_peers)
2484 .await;
2485 for peer_id in pending.queried_peers {
2486 self.peer_selector.write().await.record_timeout(&peer_id);
2487 }
2488 }
2489 let _ = self.take_forward_requesters(&hash_key).await;
2490 None
2491 }
2492
2493 async fn request_from_ordered_peers(
2494 &self,
2495 hash: &Hash,
2496 ordered_peer_ids: &[String],
2497 request_htl: u8,
2498 ) -> RouteFetchOutcome {
2499 let hash_key = hash_to_key(hash);
2500 let (tx, rx) = oneshot::channel();
2501 self.pending_requests.write().await.insert(
2502 hash_key.clone(),
2503 PendingRequest {
2504 response_tx: tx,
2505 started_at: Instant::now(),
2506 queried_peers: Vec::new(),
2507 },
2508 );
2509
2510 let rx = Arc::new(Mutex::new(rx));
2511 let result = run_hedged_waves(
2512 ordered_peer_ids.len(),
2513 self.routing.dispatch,
2514 self.request_timeout,
2515 |range| {
2516 let wave_peer_ids = ordered_peer_ids[range].to_vec();
2517 let hash = *hash;
2518 let hash_key = hash_key.clone();
2519 async move {
2520 let mut sent = 0usize;
2521 for peer_id in wave_peer_ids {
2522 if self
2523 .send_request_to_peer(&peer_id, &hash, request_htl, None)
2524 .await
2525 {
2526 sent += 1;
2527 self.reserve_peer_request(&peer_id).await;
2528 if let Some(pending) =
2529 self.pending_requests.write().await.get_mut(&hash_key)
2530 {
2531 pending.queried_peers.push(peer_id);
2532 }
2533 }
2534 }
2535 sent
2536 }
2537 },
2538 |wait| {
2539 let rx = rx.clone();
2540 async move {
2541 let mut rx = rx.lock().await;
2542 match tokio::time::timeout(wait, &mut *rx).await {
2543 Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == *hash => {
2544 HedgedWaveAction::Success(data)
2545 }
2546 Ok(Ok(Some(_))) => HedgedWaveAction::Continue,
2547 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
2548 Err(_) => HedgedWaveAction::Continue,
2549 }
2550 }
2551 },
2552 )
2553 .await;
2554
2555 let Some(data) = result else {
2556 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2557 self.release_queried_peer_requests(&pending.queried_peers)
2558 .await;
2559 for peer_id in pending.queried_peers {
2560 self.peer_selector.write().await.record_timeout(&peer_id);
2561 }
2562 }
2563 let _ = self.take_forward_requesters(&hash_key).await;
2564 return RouteFetchOutcome::Timeout;
2565 };
2566
2567 let _ = self.local_store.put(*hash, data.clone()).await;
2568 RouteFetchOutcome::Hit(data)
2569 }
2570
2571 async fn request_from_read_sources_inner(&self, hash: &Hash) -> RouteFetchOutcome {
2572 let ordered_sources = self.ordered_read_sources().await;
2573 if ordered_sources.is_empty() {
2574 return RouteFetchOutcome::Miss;
2575 }
2576
2577 let dispatch = normalize_dispatch_config(
2578 self.source_dispatch_for(ordered_sources.len()).await,
2579 ordered_sources.len(),
2580 );
2581 let wave_plan = build_hedged_wave_plan(ordered_sources.len(), dispatch);
2582 if wave_plan.is_empty() {
2583 return RouteFetchOutcome::Miss;
2584 }
2585
2586 let deadline = Instant::now() + self.request_timeout;
2587 let mut pending = FuturesUnordered::new();
2588 let mut pending_source_ids = HashSet::new();
2589 let mut saw_timeout = false;
2590 let mut next_source_idx = 0usize;
2591
2592 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
2593 let from = next_source_idx;
2594 let to = (next_source_idx + wave_size).min(ordered_sources.len());
2595 next_source_idx = to;
2596
2597 for source in &ordered_sources[from..to] {
2598 let source = Arc::clone(source);
2599 let source_id = source.id().to_string();
2600 self.record_read_source_request(&source_id).await;
2601 pending_source_ids.insert(source_id.clone());
2602 let hash = *hash;
2603 pending.push(tokio::spawn(async move {
2604 let started_at = Instant::now();
2605 let result = std::panic::AssertUnwindSafe(source.get(&hash))
2606 .catch_unwind()
2607 .await;
2608 match result {
2609 Ok(Some(data)) => SourceFetchOutcome::Hit {
2610 source_id,
2611 data,
2612 elapsed_ms: started_at.elapsed().as_millis().max(1) as u64,
2613 },
2614 Ok(None) => SourceFetchOutcome::Miss { source_id },
2615 Err(_) => SourceFetchOutcome::Failure { source_id },
2616 }
2617 }));
2618 }
2619
2620 let is_last_wave =
2621 wave_idx + 1 == wave_plan.len() || next_source_idx >= ordered_sources.len();
2622 let window_end = if is_last_wave {
2623 deadline
2624 } else {
2625 (Instant::now() + Duration::from_millis(dispatch.hedge_interval_ms)).min(deadline)
2626 };
2627
2628 while Instant::now() < window_end {
2629 let remaining = window_end.saturating_duration_since(Instant::now());
2630 let Some(result) = tokio::time::timeout(remaining, pending.next())
2631 .await
2632 .ok()
2633 .flatten()
2634 else {
2635 break;
2636 };
2637 let Ok(outcome) = result else {
2638 continue;
2639 };
2640 match outcome {
2641 SourceFetchOutcome::Hit {
2642 source_id,
2643 data,
2644 elapsed_ms,
2645 } => {
2646 pending_source_ids.remove(&source_id);
2647 self.record_read_source_success(&source_id, elapsed_ms)
2648 .await;
2649 return RouteFetchOutcome::Hit(data);
2650 }
2651 SourceFetchOutcome::Miss { source_id } => {
2652 pending_source_ids.remove(&source_id);
2653 self.record_read_source_miss(&source_id).await;
2654 }
2655 SourceFetchOutcome::Failure { source_id } => {
2656 pending_source_ids.remove(&source_id);
2657 self.record_read_source_failure(&source_id).await;
2658 }
2659 }
2660 }
2661
2662 if Instant::now() >= deadline {
2663 break;
2664 }
2665 }
2666
2667 for source_id in pending_source_ids {
2668 saw_timeout = true;
2669 self.record_read_source_timeout(&source_id).await;
2670 }
2671 if saw_timeout {
2672 RouteFetchOutcome::Timeout
2673 } else {
2674 RouteFetchOutcome::Miss
2675 }
2676 }
2677
2678 async fn request_from_read_sources(&self, hash: &Hash) -> RouteFetchOutcome {
2679 let hash_key = hash_to_key(hash);
2680 let existing_wait = {
2681 let mut inflight = self.inflight_source_fetches.lock().await;
2682 if let Some(existing) = inflight.get_mut(&hash_key) {
2683 let (tx, rx) = oneshot::channel();
2684 existing.waiters.push(tx);
2685 Some(rx)
2686 } else {
2687 inflight.insert(
2688 hash_key.clone(),
2689 InflightSourceFetch {
2690 waiters: Vec::new(),
2691 },
2692 );
2693 None
2694 }
2695 };
2696
2697 if let Some(wait) = existing_wait {
2698 return wait.await.unwrap_or(RouteFetchOutcome::Timeout);
2699 }
2700
2701 let result = self.request_from_read_sources_inner(hash).await;
2702 if let RouteFetchOutcome::Hit(hit) = &result {
2703 let _ = self.local_store.put(*hash, hit.clone()).await;
2704 }
2705 self.complete_inflight_source_fetch(&hash_key, result.clone())
2706 .await;
2707
2708 result
2709 }
2710
2711 async fn complete_inflight_source_fetch(&self, hash_key: &str, result: RouteFetchOutcome) {
2712 let waiters = self
2713 .inflight_source_fetches
2714 .lock()
2715 .await
2716 .remove(hash_key)
2717 .map(|inflight| inflight.waiters)
2718 .unwrap_or_default();
2719 for waiter in waiters {
2720 let _ = waiter.send(result.clone());
2721 }
2722 }
2723
2724 async fn cancel_pending_peer_route(&self, hash: &Hash) {
2725 let hash_key = hash_to_key(hash);
2726 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2727 self.release_queried_peer_requests(&pending.queried_peers)
2728 .await;
2729 }
2730 }
2731
2732 async fn cancel_losing_route(&self, hash: &Hash, route: &ReadRoute, winner_data: &[u8]) {
2733 match route {
2734 ReadRoute::Peers(_) => self.cancel_pending_peer_route(hash).await,
2735 ReadRoute::Sources => {
2736 let hash_key = hash_to_key(hash);
2737 self.complete_inflight_source_fetch(
2738 &hash_key,
2739 RouteFetchOutcome::Hit(winner_data.to_vec()),
2740 )
2741 .await;
2742 }
2743 }
2744 }
2745
2746 async fn ranked_read_routes(&self, context: &MeshReadContext) -> Vec<RankedReadRoute> {
2747 let mut routes = Vec::new();
2748 let ordered_peers = if should_forward_htl(context.request_htl) {
2749 self.ordered_connected_peers(context.exclude_peer_id.as_deref())
2750 .await
2751 } else {
2752 Vec::new()
2753 };
2754 if !ordered_peers.is_empty() {
2755 let best_peer_id = ordered_peers[0].clone();
2756 let selector = self.peer_selector.read().await;
2757 let best_peer = selector.get_stats(&best_peer_id).cloned();
2758 let now = Instant::now();
2759 let (score, has_history) = match best_peer.as_ref() {
2760 Some(stats) => (
2761 peer_endpoint_score(stats, now),
2762 peer_endpoint_has_history(stats),
2763 ),
2764 None => (0.0, false),
2765 };
2766 routes.push(RankedReadRoute {
2767 route: ReadRoute::Peers(ordered_peers),
2768 best_endpoint_id: format!("peer:{best_peer_id}"),
2769 score,
2770 has_history,
2771 });
2772 }
2773 let ordered_sources = self.ordered_read_sources().await;
2774 if let Some(best_source) = ordered_sources.first() {
2775 let stats = self.read_source_stats.read().await;
2776 let best_source_stats = stats.get(best_source.id()).cloned().unwrap_or_default();
2777 let now = Instant::now();
2778 routes.push(RankedReadRoute {
2779 route: ReadRoute::Sources,
2780 best_endpoint_id: format!("source:{}", best_source.id()),
2781 score: adaptive_source_score(&best_source_stats, now),
2782 has_history: source_has_history(&best_source_stats),
2783 });
2784 }
2785 if routes.len() <= 1 {
2786 return routes;
2787 }
2788
2789 routes.sort_by(|left, right| {
2790 right
2791 .score
2792 .partial_cmp(&left.score)
2793 .unwrap_or(std::cmp::Ordering::Equal)
2794 .then_with(|| ranked_route_kind(&left.route).cmp(&ranked_route_kind(&right.route)))
2795 .then_with(|| left.best_endpoint_id.cmp(&right.best_endpoint_id))
2796 .then_with(|| left.route.id().cmp(right.route.id()))
2797 });
2798 routes
2799 }
2800
2801 fn should_probe_multiple_routes(&self, routes: &[RankedReadRoute]) -> bool {
2802 if routes.len() <= 1 {
2803 return false;
2804 }
2805 if !routes[0].has_history || !routes[1].has_history {
2806 return false;
2807 }
2808 (routes[0].score - routes[1].score) < SOURCE_SCORE_TIE_DELTA
2809 }
2810
2811 async fn run_read_route(
2812 &self,
2813 hash: &Hash,
2814 route: &ReadRoute,
2815 context: &MeshReadContext,
2816 ) -> RouteFetchOutcome {
2817 match route {
2818 ReadRoute::Peers(peer_ids) => {
2819 self.request_from_ordered_peers(hash, peer_ids, context.request_htl)
2820 .await
2821 }
2822 ReadRoute::Sources => self.request_from_read_sources(hash).await,
2823 }
2824 }
2825
2826 async fn request_from_mesh_with_context(
2827 &self,
2828 hash: &Hash,
2829 context: &MeshReadContext,
2830 ) -> Option<Vec<u8>> {
2831 let routes = self.ranked_read_routes(context).await;
2832 match routes.as_slice() {
2833 [] => None,
2834 [ranked] => match self.run_read_route(hash, &ranked.route, context).await {
2835 RouteFetchOutcome::Hit(data) => Some(data),
2836 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
2837 },
2838 [first, second, ..] => {
2839 if self.should_probe_multiple_routes(&routes) {
2840 let first_fut = self.run_read_route(hash, &first.route, context);
2841 let second_fut = self.run_read_route(hash, &second.route, context);
2842 tokio::pin!(first_fut);
2843 tokio::pin!(second_fut);
2844 let mut first_done = false;
2845 let mut second_done = false;
2846 loop {
2847 tokio::select! {
2848 result = &mut first_fut, if !first_done => {
2849 first_done = true;
2850 if let RouteFetchOutcome::Hit(data) = result {
2851 if !second_done {
2852 self.cancel_losing_route(hash, &second.route, &data).await;
2853 }
2854 return Some(data);
2855 }
2856 }
2857 result = &mut second_fut, if !second_done => {
2858 second_done = true;
2859 if let RouteFetchOutcome::Hit(data) = result {
2860 if !first_done {
2861 self.cancel_losing_route(hash, &first.route, &data).await;
2862 }
2863 return Some(data);
2864 }
2865 }
2866 else => break,
2867 }
2868 if first_done && second_done {
2869 break;
2870 }
2871 }
2872 None
2873 } else {
2874 match self.run_read_route(hash, &first.route, context).await {
2875 RouteFetchOutcome::Hit(data) => return Some(data),
2876 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
2877 }
2878 for ranked in routes.iter().skip(1) {
2879 match self.run_read_route(hash, &ranked.route, context).await {
2880 RouteFetchOutcome::Hit(data) => return Some(data),
2881 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
2882 }
2883 }
2884 None
2885 }
2886 }
2887 }
2888 }
2889
2890 async fn request_from_mesh(&self, hash: &Hash) -> Option<Vec<u8>> {
2891 self.request_from_mesh_with_context(hash, &MeshReadContext::default())
2892 .await
2893 }
2894
2895 async fn begin_forward_request(&self, hash_key: &str, requester_id: &str) -> bool {
2896 let mut pending = self.pending_forward_requests.write().await;
2897 if let Some(existing) = pending.get_mut(hash_key) {
2898 existing.requester_ids.insert(requester_id.to_string());
2899 return false;
2900 }
2901
2902 let mut requester_ids = HashSet::new();
2903 requester_ids.insert(requester_id.to_string());
2904 pending.insert(
2905 hash_key.to_string(),
2906 PendingForwardRequest { requester_ids },
2907 );
2908 true
2909 }
2910
2911 async fn was_recent_forward_miss(&self, hash_key: &str) -> bool {
2912 self.recent_forward_misses.lock().await.contains(hash_key)
2913 }
2914
2915 async fn mark_recent_forward_miss(&self, hash_key: &str) {
2916 let _ = self
2917 .recent_forward_misses
2918 .lock()
2919 .await
2920 .insert_if_new(hash_key.to_string());
2921 }
2922
2923 async fn take_forward_requesters(&self, hash_key: &str) -> Vec<String> {
2924 self.pending_forward_requests
2925 .write()
2926 .await
2927 .remove(hash_key)
2928 .map(|pending| pending.requester_ids.into_iter().collect())
2929 .unwrap_or_default()
2930 }
2931
2932 async fn complete_pending_response(
2933 self: &Arc<Self>,
2934 from_peer: &str,
2935 hash: &Hash,
2936 hash_key: String,
2937 payload: Vec<u8>,
2938 ) {
2939 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2940 self.release_queried_peer_requests(&pending.queried_peers)
2941 .await;
2942 let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
2943 self.peer_selector.write().await.record_success(
2944 from_peer,
2945 rtt_ms,
2946 payload.len() as u64,
2947 );
2948 let forward_requesters = self.take_forward_requesters(&hash_key).await;
2949 let response_bytes = if forward_requesters.is_empty() {
2950 None
2951 } else {
2952 Some(encode_response(&create_response(hash, payload.clone())))
2953 };
2954 let _ = pending.response_tx.send(Some(payload));
2955 if let Some(response_bytes) = response_bytes {
2956 for requester_id in forward_requesters {
2957 Arc::clone(self)
2958 .enqueue_response_send(requester_id, response_bytes.clone(), Instant::now())
2959 .await;
2960 }
2961 }
2962 }
2963 }
2964
2965 async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
2966 if !res.a {
2967 return;
2968 }
2969
2970 let Some(quote_id) = res.q else {
2971 return;
2972 };
2973
2974 let hash_key = hash_to_key(&res.h);
2975 let (preferred_mint_url, offered_payment_sat) = {
2976 let pending_quotes = self.pending_quotes.read().await;
2977 let Some(pending) = pending_quotes.get(&hash_key) else {
2978 return;
2979 };
2980 (
2981 pending.preferred_mint_url.clone(),
2982 pending.offered_payment_sat,
2983 )
2984 };
2985 if !self
2986 .should_accept_quote_response(
2987 from_peer,
2988 preferred_mint_url.as_deref(),
2989 offered_payment_sat,
2990 &res,
2991 )
2992 .await
2993 {
2994 return;
2995 }
2996 let mut pending_quotes = self.pending_quotes.write().await;
2997 if let Some(pending) = pending_quotes.remove(&hash_key) {
2998 let _ = pending.response_tx.send(Some(NegotiatedQuote {
2999 peer_id: from_peer.to_string(),
3000 quote_id,
3001 mint_url: res.m,
3002 }));
3003 }
3004 }
3005
3006 async fn handle_response_message(
3007 self: &Arc<Self>,
3008 from_peer: &str,
3009 res: crate::protocol::DataResponse,
3010 ) {
3011 let hash_key = hash_to_key(&res.h);
3012 let hash = match crate::protocol::bytes_to_hash(&res.h) {
3013 Some(h) => h,
3014 None => return,
3015 };
3016
3017 if hashtree_core::sha256(&res.d) != hash {
3019 self.peer_selector.write().await.record_failure(from_peer);
3020 if self.debug {
3021 println!("[MeshStoreCore] Ignoring invalid response payload for {hash_key}");
3022 }
3023 return;
3024 }
3025
3026 self.record_useful_bytes_received_from_peer(from_peer, res.d.len() as u64)
3027 .await;
3028 self.complete_pending_response(from_peer, &hash, hash_key, res.d)
3029 .await;
3030 }
3031
3032 async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
3033 let hash = match crate::protocol::bytes_to_hash(&req.h) {
3034 Some(h) => h,
3035 None => return,
3036 };
3037 let hash_key = hash_to_key(&hash);
3038
3039 {
3040 let selector = self.peer_selector.read().await;
3041 if self.should_refuse_requests_from_peer(&selector, from_peer) {
3042 if self.debug {
3043 println!(
3044 "[MeshStoreCore] Refusing quote request from delinquent peer {}",
3045 from_peer
3046 );
3047 }
3048 return;
3049 }
3050 }
3051
3052 let chosen_mint = self.choose_quote_mint(req.m.as_deref());
3053 let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
3054 && !self.should_drop_response(&hash)
3055 && !self.should_corrupt_response(&hash);
3056
3057 let res = if can_serve {
3058 let quote_id = self
3059 .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
3060 .await;
3061 create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
3062 } else {
3063 create_quote_response_unavailable(&hash)
3064 };
3065 let response_bytes = encode_quote_response(&res);
3066 if let Some(channel) = self.signaling.get_channel(from_peer).await {
3067 if channel.send(response_bytes.clone()).await.is_ok() {
3068 self.record_peer_wire_sent(from_peer, response_bytes.len() as u64)
3069 .await;
3070 }
3071 }
3072 }
3073
3074 async fn handle_request_message(
3075 self: &Arc<Self>,
3076 from_peer: &str,
3077 req: crate::protocol::DataRequest,
3078 ) {
3079 let hash = match crate::protocol::bytes_to_hash(&req.h) {
3080 Some(h) => h,
3081 None => return,
3082 };
3083 let hash_key = hash_to_key(&hash);
3084
3085 if let Some(quote_id) = req.q {
3086 if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
3087 if self.debug {
3088 println!(
3089 "[MeshStoreCore] Refusing request with invalid or expired quote {} from {}",
3090 quote_id, from_peer
3091 );
3092 }
3093 return;
3094 }
3095 }
3096
3097 let allow_peer_forwarding = {
3098 let selector = self.peer_selector.read().await;
3099 !self.should_refuse_requests_from_peer(&selector, from_peer)
3100 };
3101
3102 if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
3104 if self.should_drop_response(&hash) {
3105 if self.debug {
3106 println!(
3107 "[MeshStoreCore] Dropping response for {} due to actor profile",
3108 hash_to_key(&hash)
3109 );
3110 }
3111 return;
3112 }
3113
3114 let response_delay = self.response_send_delay(&hash, data.len());
3115 if self.should_corrupt_response(&hash) {
3116 if data.is_empty() {
3117 data.push(0x80);
3118 } else {
3119 data[0] ^= 0x80;
3120 }
3121 }
3122
3123 let res = create_response(&hash, data);
3125 let response_bytes = encode_response(&res);
3126 let ready_at = Instant::now() + response_delay;
3127 Arc::clone(self)
3128 .enqueue_response_send(from_peer.to_string(), response_bytes, ready_at)
3129 .await;
3130 return;
3131 }
3132
3133 if self.pending_requests.read().await.contains_key(&hash_key) {
3134 let _ = self.begin_forward_request(&hash_key, from_peer).await;
3135 return;
3136 }
3137
3138 if self.was_recent_forward_miss(&hash_key).await {
3139 if self.debug {
3140 println!(
3141 "[MeshStoreCore] Suppressing recently missed forwarded request for {}",
3142 hash_key
3143 );
3144 }
3145 return;
3146 }
3147
3148 if !self.begin_forward_request(&hash_key, from_peer).await {
3149 return;
3150 }
3151
3152 let from_peer = from_peer.to_string();
3153 let this = Arc::clone(self);
3154 let request_htl = req.htl;
3155 tokio::spawn(async move {
3156 let result = if allow_peer_forwarding {
3157 let context = MeshReadContext {
3158 exclude_peer_id: Some(from_peer.clone()),
3159 request_htl,
3160 };
3161 this.request_from_mesh_with_context(&hash, &context).await
3162 } else {
3163 if this.debug {
3164 println!(
3165 "[MeshStoreCore] Serving request from delinquent peer {} via read sources only",
3166 from_peer
3167 );
3168 }
3169 match this.request_from_read_sources(&hash).await {
3170 RouteFetchOutcome::Hit(data) => Some(data),
3171 RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
3172 }
3173 };
3174 let requester_ids = this.take_forward_requesters(&hash_key).await;
3175 if let Some(data) = result {
3176 let ready_at = Instant::now() + this.response_send_delay(&hash, data.len());
3177 let res = create_response(&hash, data);
3178 let response_bytes = encode_response(&res);
3179 for requester_id in requester_ids {
3180 Arc::clone(&this)
3181 .enqueue_response_send(requester_id, response_bytes.clone(), ready_at)
3182 .await;
3183 }
3184 } else {
3185 this.mark_recent_forward_miss(&hash_key).await;
3186 }
3187 });
3188 }
3189
3190 async fn handle_pubsub_interest_message(
3191 self: &Arc<Self>,
3192 from_peer: &str,
3193 mut interest: PubsubInterest,
3194 ) {
3195 if !self.apply_pubsub_interest_route(from_peer, &interest).await {
3196 return;
3197 }
3198
3199 if interest.htl <= 1 {
3200 return;
3201 }
3202 interest.htl = interest.htl.saturating_sub(1);
3203 let _ = self
3204 .send_pubsub_interest_to_peers(&interest, Some(from_peer))
3205 .await;
3206 }
3207
3208 async fn handle_pubsub_frame_message(
3209 self: &Arc<Self>,
3210 from_peer: &str,
3211 mut frame: PubsubFrame,
3212 wire_bytes: usize,
3213 ) {
3214 if frame.stream_id.is_empty() || frame.origin_peer_id.is_empty() {
3215 return;
3216 }
3217 if frame.origin_peer_id == self.signaling.peer_id() {
3218 return;
3219 }
3220
3221 let frame_key = Self::pubsub_frame_key(&frame);
3222 if !self
3223 .pubsub_seen_frames
3224 .lock()
3225 .await
3226 .insert_if_new(frame_key.clone())
3227 {
3228 return;
3229 }
3230 self.cache_pubsub_frame(frame_key.clone(), frame.clone())
3231 .await;
3232
3233 let local_interested = self
3234 .pubsub_local_interests
3235 .read()
3236 .await
3237 .contains(&frame.stream_id);
3238 let mut downstream_peers = if frame.htl > 1 {
3239 match self.routing.pubsub_delivery_mode {
3240 PubsubDeliveryMode::InterestPush => {
3241 let mut peers = self
3242 .interested_pubsub_peers(&frame.stream_id, Some(from_peer))
3243 .await;
3244 peers.extend(
3245 self.take_pubsub_want_peers(&frame_key, Some(from_peer))
3246 .await,
3247 );
3248 peers.sort();
3249 peers.dedup();
3250 peers
3251 }
3252 PubsubDeliveryMode::HtlInvWant => {
3253 self.take_pubsub_want_peers(&frame_key, Some(from_peer))
3254 .await
3255 }
3256 }
3257 } else {
3258 Vec::new()
3259 };
3260 downstream_peers.retain(|peer_id| peer_id != from_peer);
3261
3262 if local_interested || !downstream_peers.is_empty() {
3263 self.record_useful_bytes_received_from_peer(from_peer, wire_bytes as u64)
3264 .await;
3265 }
3266
3267 if local_interested {
3268 self.enqueue_pubsub_event(PubsubEvent {
3269 stream_id: frame.stream_id.clone(),
3270 seq: frame.seq,
3271 origin_peer_id: frame.origin_peer_id.clone(),
3272 from_peer_id: from_peer.to_string(),
3273 payload: frame.payload.clone(),
3274 })
3275 .await;
3276 }
3277
3278 if downstream_peers.is_empty() {
3279 return;
3280 }
3281
3282 frame.htl = frame.htl.saturating_sub(1);
3283 let _ = self
3284 .send_pubsub_frame_to_peers(&frame, &downstream_peers)
3285 .await;
3286 }
3287
3288 async fn handle_pubsub_inventory_message(
3289 self: &Arc<Self>,
3290 from_peer: &str,
3291 inv: PubsubInventory,
3292 wire_bytes: usize,
3293 ) {
3294 if inv.stream_id.is_empty() || inv.origin_peer_id.is_empty() {
3295 return;
3296 }
3297 if inv.origin_peer_id == self.signaling.peer_id() {
3298 return;
3299 }
3300
3301 let key = Self::pubsub_key(&inv.origin_peer_id, &inv.stream_id, inv.seq);
3302 if !self
3303 .pubsub_seen_inventories
3304 .lock()
3305 .await
3306 .insert_if_new(key.clone())
3307 {
3308 return;
3309 }
3310 {
3311 let mut routes = self.pubsub_inventory_routes.write().await;
3312 routes
3313 .entry(key.clone())
3314 .or_insert_with(|| from_peer.to_string());
3315 }
3316
3317 let local_interested = self
3318 .pubsub_local_interests
3319 .read()
3320 .await
3321 .contains(&inv.stream_id);
3322 let downstream_peers = self
3323 .interested_pubsub_peers(&inv.stream_id, Some(from_peer))
3324 .await;
3325 if local_interested || !downstream_peers.is_empty() {
3326 self.record_useful_bytes_received_from_peer(from_peer, wire_bytes as u64)
3327 .await;
3328 let want =
3329 create_pubsub_want(inv.stream_id.clone(), inv.seq, inv.origin_peer_id.clone());
3330 let _ = self.send_pubsub_want_upstream(&key, &want, None).await;
3331 }
3332
3333 if !should_forward_htl(inv.htl) {
3334 return;
3335 }
3336 let _ = self.flood_pubsub_inventory(&inv, Some(from_peer)).await;
3337 }
3338
3339 async fn handle_pubsub_want_message(
3340 self: &Arc<Self>,
3341 from_peer: &str,
3342 want: PubsubWant,
3343 wire_bytes: usize,
3344 ) {
3345 if want.stream_id.is_empty() || want.origin_peer_id.is_empty() {
3346 return;
3347 }
3348 if want.origin_peer_id == from_peer {
3349 return;
3350 }
3351
3352 let key = Self::pubsub_key(&want.origin_peer_id, &want.stream_id, want.seq);
3353 let want_key = format!("{from_peer}:{key}");
3354 if !self.pubsub_seen_wants.lock().await.insert_if_new(want_key) {
3355 return;
3356 }
3357
3358 if let Some(frame) = self.cached_pubsub_frame(&key).await {
3359 self.record_useful_bytes_received_from_peer(from_peer, wire_bytes as u64)
3360 .await;
3361 let peers = vec![from_peer.to_string()];
3362 let _ = self.send_pubsub_frame_to_peers(&frame, &peers).await;
3363 return;
3364 }
3365
3366 let has_upstream_route = self.pubsub_inventory_routes.read().await.contains_key(&key);
3367 if !has_upstream_route {
3368 return;
3369 }
3370
3371 if self.remember_pubsub_want_peer(key.clone(), from_peer).await {
3372 self.record_useful_bytes_received_from_peer(from_peer, wire_bytes as u64)
3373 .await;
3374 }
3375 let _ = self
3376 .send_pubsub_want_upstream(&key, &want, Some(from_peer))
3377 .await;
3378 }
3379
3380 pub async fn handle_data_message(self: &Arc<Self>, from_peer: &str, data: &[u8]) {
3382 self.record_peer_wire_received(from_peer, data.len() as u64)
3383 .await;
3384 let parsed = match parse_message(data) {
3385 Some(m) => m,
3386 None => return,
3387 };
3388
3389 match parsed {
3390 DataMessage::Request(req) => {
3391 self.handle_request_message(from_peer, req).await;
3392 }
3393 DataMessage::Response(res) => {
3394 self.handle_response_message(from_peer, res).await;
3395 }
3396 DataMessage::QuoteRequest(req) => {
3397 self.handle_quote_request_message(from_peer, req).await;
3398 }
3399 DataMessage::QuoteResponse(res) => {
3400 self.handle_quote_response_message(from_peer, res).await;
3401 }
3402 DataMessage::PubsubInterest(interest) => {
3403 self.handle_pubsub_interest_message(from_peer, interest)
3404 .await;
3405 }
3406 DataMessage::PubsubFrame(frame) => {
3407 self.handle_pubsub_frame_message(from_peer, frame, data.len())
3408 .await;
3409 }
3410 DataMessage::PubsubInventory(inv) => {
3411 self.handle_pubsub_inventory_message(from_peer, inv, data.len())
3412 .await;
3413 }
3414 DataMessage::PubsubWant(want) => {
3415 self.handle_pubsub_want_message(from_peer, want, data.len())
3416 .await;
3417 }
3418 DataMessage::Payment(_)
3419 | DataMessage::PaymentAck(_)
3420 | DataMessage::Chunk(_)
3421 | DataMessage::PeerHints(_) => {}
3422 }
3423 }
3424}
3425
3426#[async_trait]
3427impl<S, R, F> Store for MeshStoreCore<S, R, F>
3428where
3429 S: Store + Send + Sync + 'static,
3430 R: SignalingTransport + Send + Sync + 'static,
3431 F: PeerLinkFactory + Send + Sync + 'static,
3432{
3433 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
3434 self.local_store.put(hash, data).await
3435 }
3436
3437 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
3438 if let Some(data) = self.local_store.get(hash).await? {
3440 return Ok(Some(data));
3441 }
3442
3443 Ok(self.request_from_mesh(hash).await)
3445 }
3446
3447 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
3448 self.local_store.has(hash).await
3449 }
3450
3451 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
3452 self.local_store.delete(hash).await
3453 }
3454}
3455
3456#[cfg(test)]
3457mod tests;
3458
3459pub type SimMeshStore<S> =
3461 MeshStoreCore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
3462
3463pub type ProductionMeshStore<S> =
3465 MeshStoreCore<S, crate::nostr::NostrRelayTransport, crate::real_factory::WebRtcPeerLinkFactory>;