Skip to main content

hashtree_network/
mesh_store_core.rs

1//! Shared routed mesh store core.
2//!
3//! This module provides a concrete store wrapper that works with any local storage
4//! backend plus any signaling transport and peer-link factory. Both production
5//! (Nostr websockets + WebRTC) and simulation (mocks) use this same code.
6
7use async_trait::async_trait;
8use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
9use std::collections::hash_map::DefaultHasher;
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::hash::{Hash as _, Hasher};
13use std::ops::Range;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{oneshot, Mutex, RwLock};
18use tokio::time::Instant;
19
20use hashtree_core::{Hash, Store, StoreError};
21
22use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
23use crate::protocol::{
24    create_quote_request, create_quote_response_available, create_quote_response_unavailable,
25    create_request, create_request_with_quote, create_response, encode_quote_request,
26    encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
27    DataMessage, DataQuoteRequest, DataQuoteResponse,
28};
29use crate::signaling::MeshRouter;
30use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
31use crate::types::{should_forward_htl, PeerHTLConfig, SignalingMessage, TimedSeenSet, MAX_HTL};
32
33// Keep the on-disk namespace stable across the crate rename so existing peer
34// metadata does not disappear for users upgrading from the old package name.
35const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
36const RECENT_FORWARD_MISS_CAPACITY: usize = 4096;
37const MIN_RECENT_FORWARD_MISS_TTL_MS: u64 = 250;
38
39/// Pending request awaiting response
40struct PendingRequest {
41    response_tx: oneshot::Sender<Option<Vec<u8>>>,
42    started_at: Instant,
43    queried_peers: Vec<String>,
44}
45
46struct PendingQuoteRequest {
47    response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
48    preferred_mint_url: Option<String>,
49    offered_payment_sat: u64,
50}
51
52struct PendingForwardRequest {
53    requester_ids: HashSet<String>,
54}
55
56#[derive(Debug, Clone, Default)]
57struct PeerWireStats {
58    bytes_sent: u64,
59    bytes_received: u64,
60    bandwidth_debt: f64,
61}
62
63struct PendingResponseSend {
64    job_id: u64,
65    peer_id: String,
66    bytes: Vec<u8>,
67    ready_at: Instant,
68    queue_sequence: u64,
69}
70
71#[async_trait]
72pub trait MeshReadSource: Send + Sync {
73    fn id(&self) -> &str;
74
75    fn is_available(&self) -> bool {
76        true
77    }
78
79    async fn get(&self, hash: &Hash) -> Option<Vec<u8>>;
80}
81
82#[derive(Debug, Clone)]
83struct NegotiatedQuote {
84    peer_id: String,
85    quote_id: u64,
86    #[allow(dead_code)]
87    mint_url: Option<String>,
88}
89
90struct IssuedQuote {
91    expires_at: Instant,
92    #[allow(dead_code)]
93    payment_sat: u64,
94    #[allow(dead_code)]
95    mint_url: Option<String>,
96}
97
98#[derive(Debug, Clone, Default)]
99struct AdaptiveSourceStats {
100    requests: u64,
101    successes: u64,
102    misses: u64,
103    failures: u64,
104    timeouts: u64,
105    srtt_ms: f64,
106    rttvar_ms: f64,
107    backoff_level: u32,
108    backed_off_until: Option<Instant>,
109    last_success_at: Option<Instant>,
110    last_failure_at: Option<Instant>,
111}
112
113#[derive(Debug, Clone)]
114enum RouteFetchOutcome {
115    Hit(Vec<u8>),
116    Miss,
117    Timeout,
118}
119
120struct InflightSourceFetch {
121    waiters: Vec<oneshot::Sender<RouteFetchOutcome>>,
122}
123
124enum SourceFetchOutcome {
125    Hit {
126        source_id: String,
127        data: Vec<u8>,
128        elapsed_ms: u64,
129    },
130    Miss {
131        source_id: String,
132    },
133    Failure {
134        source_id: String,
135    },
136}
137
138const INITIAL_SOURCE_BACKOFF_MS: u64 = 250;
139const MAX_SOURCE_BACKOFF_MS: u64 = 10_000;
140const SOURCE_SCORE_TIE_DELTA: f64 = 0.15;
141const RECENT_SOURCE_SUCCESS_WINDOW: Duration = Duration::from_secs(60);
142const ACTIVE_PEER_REQUEST_RANK_PENALTY: usize = 3;
143
144fn source_reliability_score(stats: &AdaptiveSourceStats) -> f64 {
145    (stats.successes as f64 + 1.0) / (stats.requests as f64 + 2.0)
146}
147
148fn source_latency_score(stats: &AdaptiveSourceStats) -> f64 {
149    if stats.srtt_ms <= 0.0 {
150        return 0.5;
151    }
152    (500.0 / (stats.srtt_ms + 50.0)).min(1.0)
153}
154
155fn source_has_history(stats: &AdaptiveSourceStats) -> bool {
156    stats.requests > 0
157        || stats.successes > 0
158        || stats.misses > 0
159        || stats.failures > 0
160        || stats.timeouts > 0
161}
162
163fn adaptive_source_score(stats: &AdaptiveSourceStats, now: Instant) -> f64 {
164    if let Some(backed_off_until) = stats.backed_off_until {
165        if backed_off_until > now {
166            return f64::NEG_INFINITY;
167        }
168    }
169
170    let miss_penalty = if stats.requests > 0 {
171        (stats.misses as f64 / stats.requests as f64) * 0.15
172    } else {
173        0.0
174    };
175    let failure_penalty = if stats.requests > 0 {
176        ((stats.failures + stats.timeouts) as f64 / stats.requests as f64) * 0.3
177    } else {
178        0.0
179    };
180    let recency_bonus = if stats
181        .last_success_at
182        .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
183    {
184        0.1
185    } else {
186        0.0
187    };
188
189    0.6 * source_reliability_score(stats) + 0.3 * source_latency_score(stats) + recency_bonus
190        - miss_penalty
191        - failure_penalty
192}
193
194fn peer_endpoint_has_history(stats: &crate::peer_selector::PeerStats) -> bool {
195    stats.requests_sent > 0 || stats.successes > 0 || stats.failures > 0 || stats.timeouts > 0
196}
197
198fn peer_endpoint_score(stats: &crate::peer_selector::PeerStats, now: Instant) -> f64 {
199    if stats.backed_off_until.is_some_and(|until| until > now) {
200        return f64::NEG_INFINITY;
201    }
202
203    let miss_penalty = 0.0;
204    let failure_penalty = if stats.requests_sent > 0 {
205        ((stats.failures + stats.timeouts) as f64 / stats.requests_sent as f64) * 0.3
206    } else {
207        0.0
208    };
209    let recency_bonus = if stats
210        .last_success
211        .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
212    {
213        0.1
214    } else {
215        0.0
216    };
217
218    0.6 * stats.success_rate()
219        + 0.3
220            * source_latency_score(&AdaptiveSourceStats {
221                srtt_ms: stats.srtt_ms,
222                ..AdaptiveSourceStats::default()
223            })
224        + recency_bonus
225        - miss_penalty
226        - failure_penalty
227}
228
229#[derive(Clone)]
230enum ReadRoute {
231    Peers(Vec<String>),
232    Sources,
233}
234
235impl ReadRoute {
236    fn id(&self) -> &'static str {
237        match self {
238            Self::Peers(_) => "peers",
239            Self::Sources => "sources",
240        }
241    }
242}
243
244struct RankedReadRoute {
245    route: ReadRoute,
246    best_endpoint_id: String,
247    score: f64,
248    has_history: bool,
249}
250
251fn ranked_route_kind(route: &ReadRoute) -> u8 {
252    match route {
253        ReadRoute::Sources => 0,
254        ReadRoute::Peers(_) => 1,
255    }
256}
257
258#[derive(Debug, Clone)]
259struct MeshReadContext {
260    exclude_peer_id: Option<String>,
261    request_htl: u8,
262}
263
264impl Default for MeshReadContext {
265    fn default() -> Self {
266        Self {
267            exclude_peer_id: None,
268            request_htl: MAX_HTL,
269        }
270    }
271}
272
273/// Aggregate stats from draining currently available peer-link messages.
274#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
275pub struct DataPumpStats {
276    pub processed: usize,
277    pub request_messages: usize,
278    pub response_messages: usize,
279    pub quote_request_messages: u64,
280    pub quote_response_messages: u64,
281    pub processed_bytes: u64,
282}
283
284/// Request dispatch strategy for peer queries.
285///
286/// `MeshStoreCore` supports two practical retrieval modes:
287/// - Flood (`usize::MAX` fanout): maximize success/latency at bandwidth cost.
288/// - Staged hedging: probe a subset first, then expand.
289#[derive(Debug, Clone, Copy)]
290pub struct RequestDispatchConfig {
291    /// Number of peers queried immediately.
292    pub initial_fanout: usize,
293    /// Number of additional peers to query on each hedge step.
294    pub hedge_fanout: usize,
295    /// Total peers allowed for this request.
296    pub max_fanout: usize,
297    /// Delay between hedge waves (ms). `0` means send all waves immediately.
298    pub hedge_interval_ms: u64,
299}
300
301impl Default for RequestDispatchConfig {
302    fn default() -> Self {
303        Self {
304            initial_fanout: usize::MAX,
305            hedge_fanout: usize::MAX,
306            max_fanout: usize::MAX,
307            hedge_interval_ms: 0,
308        }
309    }
310}
311
312/// Normalize fanout config against current peer availability.
313pub fn normalize_dispatch_config(
314    dispatch: RequestDispatchConfig,
315    available_peers: usize,
316) -> RequestDispatchConfig {
317    let mut cfg = dispatch;
318    let cap = if cfg.max_fanout == 0 {
319        available_peers
320    } else {
321        cfg.max_fanout.min(available_peers)
322    };
323    cfg.max_fanout = cap;
324    cfg.initial_fanout = if cfg.initial_fanout == 0 {
325        1
326    } else {
327        cfg.initial_fanout.min(cap.max(1))
328    };
329    cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
330        1
331    } else {
332        cfg.hedge_fanout.min(cap.max(1))
333    };
334    cfg
335}
336
337/// Build wave sizes for staged hedged dispatch.
338pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
339    if peer_count == 0 {
340        return Vec::new();
341    }
342    let cap = dispatch.max_fanout.min(peer_count);
343    if cap == 0 {
344        return Vec::new();
345    }
346
347    let mut plan = Vec::new();
348    let mut sent = 0usize;
349    let first = dispatch.initial_fanout.min(cap).max(1);
350    plan.push(first);
351    sent += first;
352
353    while sent < cap {
354        let next = dispatch.hedge_fanout.min(cap - sent).max(1);
355        plan.push(next);
356        sent += next;
357    }
358    plan
359}
360
361/// Outcome returned after waiting on a hedged dispatch wave.
362#[derive(Debug)]
363pub enum HedgedWaveAction<T> {
364    Continue,
365    Success(T),
366    Abort,
367}
368
369/// Run a staged hedged dispatch over peer index ranges.
370///
371/// This scheduler is shared by the reusable `MeshStoreCore` and the native
372/// `hashtree-cli` mesh path so tests and production use the same wave timing.
373pub async fn run_hedged_waves<T, SendWave, SendWaveFut, WaitWave, WaitWaveFut>(
374    peer_count: usize,
375    dispatch: RequestDispatchConfig,
376    request_timeout: Duration,
377    mut send_wave: SendWave,
378    mut wait_wave: WaitWave,
379) -> Option<T>
380where
381    SendWave: FnMut(Range<usize>) -> SendWaveFut,
382    SendWaveFut: Future<Output = usize>,
383    WaitWave: FnMut(Duration) -> WaitWaveFut,
384    WaitWaveFut: Future<Output = HedgedWaveAction<T>>,
385{
386    let dispatch = normalize_dispatch_config(dispatch, peer_count);
387    let wave_plan = build_hedged_wave_plan(peer_count, dispatch);
388    if wave_plan.is_empty() {
389        return None;
390    }
391
392    let deadline = Instant::now() + request_timeout;
393    let mut sent_total = 0usize;
394    let mut next_peer_idx = 0usize;
395
396    for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
397        let from = next_peer_idx;
398        let to = (next_peer_idx + wave_size).min(peer_count);
399        next_peer_idx = to;
400
401        if from == to {
402            continue;
403        }
404
405        sent_total += send_wave(from..to).await;
406        if sent_total == 0 {
407            if next_peer_idx >= peer_count {
408                break;
409            }
410            continue;
411        }
412
413        let now = Instant::now();
414        if now >= deadline {
415            break;
416        }
417        let remaining = deadline.saturating_duration_since(now);
418        let is_last_wave = wave_idx + 1 == wave_plan.len() || next_peer_idx >= peer_count;
419        let wait = if is_last_wave {
420            remaining
421        } else if dispatch.hedge_interval_ms == 0 {
422            Duration::ZERO
423        } else {
424            Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
425        };
426
427        if wait.is_zero() {
428            continue;
429        }
430
431        match wait_wave(wait).await {
432            HedgedWaveAction::Continue => {}
433            HedgedWaveAction::Success(value) => return Some(value),
434            HedgedWaveAction::Abort => break,
435        }
436    }
437
438    None
439}
440
441/// Keep selector membership aligned with currently connected peer IDs.
442pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
443    let mut selector = selector.write().await;
444    let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
445    let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
446    for peer_id in known {
447        if !current.contains(peer_id.as_str()) {
448            selector.remove_peer(&peer_id);
449        }
450    }
451    for peer_id in current_peer_ids {
452        selector.add_peer(peer_id.clone());
453    }
454}
455
456/// Response behavior profile for simulation/game-theory actors.
457///
458/// Defaults to honest behavior (always respond correctly, no extra delay).
459#[derive(Debug, Clone, Copy)]
460pub struct ResponseBehaviorConfig {
461    /// Probability that a node drops a response even when it has data.
462    pub drop_response_prob: f64,
463    /// Probability that a node responds with corrupted payload.
464    pub corrupt_response_prob: f64,
465    /// Baseline response delay before a peer starts sending any data.
466    pub extra_delay_ms: u64,
467    /// Additional delay before the first response byte becomes available.
468    pub first_byte_delay_ms: u64,
469    /// Sustained throughput for delivering large payloads. `0` disables size-based slowdown.
470    pub bytes_per_second: u64,
471    /// Probability that an otherwise honest response experiences an extra stall.
472    pub stall_response_prob: f64,
473    /// Extra delay injected when a stall event happens.
474    pub stall_delay_ms: u64,
475}
476
477impl Default for ResponseBehaviorConfig {
478    fn default() -> Self {
479        Self {
480            drop_response_prob: 0.0,
481            corrupt_response_prob: 0.0,
482            extra_delay_ms: 0,
483            first_byte_delay_ms: 0,
484            bytes_per_second: 0,
485            stall_response_prob: 0.0,
486            stall_delay_ms: 0,
487        }
488    }
489}
490
491impl ResponseBehaviorConfig {
492    fn normalized(self) -> Self {
493        Self {
494            drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
495            corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
496            extra_delay_ms: self.extra_delay_ms,
497            first_byte_delay_ms: self.first_byte_delay_ms,
498            bytes_per_second: self.bytes_per_second,
499            stall_response_prob: self.stall_response_prob.clamp(0.0, 1.0),
500            stall_delay_ms: self.stall_delay_ms,
501        }
502    }
503}
504
505/// Routing policy for request ordering + dispatch fanout.
506#[derive(Debug, Clone)]
507pub struct MeshRoutingConfig {
508    pub selection_strategy: SelectionStrategy,
509    pub fairness_enabled: bool,
510    /// Blend weight for payment-priority ranking in selector (`0.0` disables).
511    pub cashu_payment_weight: f64,
512    /// Refuse serving peers that have reached this many unpaid post-delivery settlements.
513    /// `0` disables refusal and only keeps metadata/downranking.
514    pub cashu_payment_default_block_threshold: u64,
515    /// Cashu mint URLs this node is willing to use for settlement.
516    pub cashu_accepted_mints: Vec<String>,
517    /// Preferred Cashu mint URL when initiating paid retrieval.
518    pub cashu_default_mint: Option<String>,
519    /// Baseline cap for accepting a peer-suggested mint outside the trusted set.
520    pub cashu_peer_suggested_mint_base_cap_sat: u64,
521    /// Additional sats allowed per successful delivery from that peer.
522    pub cashu_peer_suggested_mint_success_step_sat: u64,
523    /// Additional sats allowed per successful post-delivery payment received from that peer.
524    pub cashu_peer_suggested_mint_receipt_step_sat: u64,
525    /// Hard upper bound for any single peer-suggested mint quote we accept.
526    pub cashu_peer_suggested_mint_max_cap_sat: u64,
527    pub dispatch: RequestDispatchConfig,
528    pub response_behavior: ResponseBehaviorConfig,
529}
530
531impl Default for MeshRoutingConfig {
532    fn default() -> Self {
533        Self {
534            selection_strategy: SelectionStrategy::Weighted,
535            fairness_enabled: true,
536            cashu_payment_weight: 0.0,
537            cashu_payment_default_block_threshold: 0,
538            cashu_accepted_mints: Vec::new(),
539            cashu_default_mint: None,
540            cashu_peer_suggested_mint_base_cap_sat: 0,
541            cashu_peer_suggested_mint_success_step_sat: 0,
542            cashu_peer_suggested_mint_receipt_step_sat: 0,
543            cashu_peer_suggested_mint_max_cap_sat: 0,
544            dispatch: RequestDispatchConfig::default(),
545            response_behavior: ResponseBehaviorConfig::default(),
546        }
547    }
548}
549
550/// Routed mesh store core that works with any storage backend and transport
551/// implementation.
552///
553/// This is the shared code between production and simulation.
554/// - Production: `MeshStoreCore<LmdbStore, NostrRelayTransport, WebRtcPeerLinkFactory>`
555/// - Simulation: `MeshStoreCore<MemoryStore, MockRelayTransport, MockConnectionFactory>`
556pub struct MeshStoreCore<S, R, F>
557where
558    S: Store + Send + Sync + 'static,
559    R: SignalingTransport + Send + Sync + 'static,
560    F: PeerLinkFactory + Send + Sync + 'static,
561{
562    /// Local backing store
563    local_store: Arc<S>,
564    /// Mesh router (handles peer discovery and connection)
565    signaling: Arc<MeshRouter<R, F>>,
566    /// Per-peer HTL config
567    htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
568    /// Pending requests we sent
569    pending_requests: RwLock<HashMap<String, PendingRequest>>,
570    /// Pending quote negotiations keyed by requested hash.
571    pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
572    /// Forwarded peer requests currently being resolved through the mesh/upstream.
573    pending_forward_requests: RwLock<HashMap<String, PendingForwardRequest>>,
574    /// Bounded negative cache for recently forwarded misses/timeouts.
575    recent_forward_misses: Mutex<TimedSeenSet>,
576    /// Quotes we issued to peers and will accept exactly once until expiry.
577    issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
578    /// Monotonic quote identifier generator.
579    next_quote_id: RwLock<u64>,
580    /// Non-peer read sources such as upstream Blossom servers.
581    read_sources: RwLock<HashMap<String, Arc<dyn MeshReadSource>>>,
582    /// Adaptive health stats for non-peer read sources.
583    read_source_stats: RwLock<HashMap<String, AdaptiveSourceStats>>,
584    /// Shared in-flight upstream reads keyed by hash.
585    inflight_source_fetches: Mutex<HashMap<String, InflightSourceFetch>>,
586    /// Adaptive selector for peer ordering.
587    peer_selector: RwLock<PeerSelector>,
588    /// Active per-peer in-flight reads so concurrent block fetches spread across peers.
589    peer_active_requests: RwLock<HashMap<String, usize>>,
590    /// Actual wire traffic stats used for upload-side reciprocity scheduling.
591    peer_wire_stats: RwLock<HashMap<String, PeerWireStats>>,
592    /// Pending content responses waiting for upload arbitration.
593    pending_response_sends: Mutex<Vec<PendingResponseSend>>,
594    /// Upload response scheduler state.
595    response_scheduler_running: AtomicBool,
596    /// Monotonic id for queued response sends.
597    next_response_job_id: AtomicU64,
598    /// Routing/dispatch configuration.
599    routing: MeshRoutingConfig,
600    /// Request timeout
601    request_timeout: Duration,
602    /// Debug mode
603    debug: bool,
604    /// Running flag
605    running: RwLock<bool>,
606}
607
608impl<S, R, F> MeshStoreCore<S, R, F>
609where
610    S: Store + Send + Sync + 'static,
611    R: SignalingTransport + Send + Sync + 'static,
612    F: PeerLinkFactory + Send + Sync + 'static,
613{
614    /// Create a new routed mesh store core.
615    pub fn new(
616        local_store: Arc<S>,
617        signaling: Arc<MeshRouter<R, F>>,
618        request_timeout: Duration,
619        debug: bool,
620    ) -> Self {
621        Self::new_with_routing(
622            local_store,
623            signaling,
624            request_timeout,
625            debug,
626            Default::default(),
627        )
628    }
629
630    /// Create a new routed mesh store core with explicit routing configuration.
631    pub fn new_with_routing(
632        local_store: Arc<S>,
633        signaling: Arc<MeshRouter<R, F>>,
634        request_timeout: Duration,
635        debug: bool,
636        routing: MeshRoutingConfig,
637    ) -> Self {
638        let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
639        selector.set_fairness(routing.fairness_enabled);
640        selector.set_cashu_payment_weight(routing.cashu_payment_weight);
641        Self {
642            local_store,
643            signaling,
644            htl_configs: RwLock::new(HashMap::new()),
645            pending_requests: RwLock::new(HashMap::new()),
646            pending_quotes: RwLock::new(HashMap::new()),
647            pending_forward_requests: RwLock::new(HashMap::new()),
648            recent_forward_misses: Mutex::new(TimedSeenSet::new(
649                RECENT_FORWARD_MISS_CAPACITY,
650                Self::recent_forward_miss_ttl(request_timeout),
651            )),
652            issued_quotes: RwLock::new(HashMap::new()),
653            next_quote_id: RwLock::new(1),
654            read_sources: RwLock::new(HashMap::new()),
655            read_source_stats: RwLock::new(HashMap::new()),
656            inflight_source_fetches: Mutex::new(HashMap::new()),
657            peer_selector: RwLock::new(selector),
658            peer_active_requests: RwLock::new(HashMap::new()),
659            peer_wire_stats: RwLock::new(HashMap::new()),
660            pending_response_sends: Mutex::new(Vec::new()),
661            response_scheduler_running: AtomicBool::new(false),
662            next_response_job_id: AtomicU64::new(1),
663            routing,
664            request_timeout,
665            debug,
666            running: RwLock::new(false),
667        }
668    }
669
670    fn recent_forward_miss_ttl(request_timeout: Duration) -> Duration {
671        let ttl_ms = request_timeout
672            .as_millis()
673            .saturating_mul(2)
674            .max(MIN_RECENT_FORWARD_MISS_TTL_MS as u128)
675            .min(u64::MAX as u128) as u64;
676        Duration::from_millis(ttl_ms)
677    }
678
679    /// Start the store (begin listening for messages)
680    pub async fn start(&self) -> Result<(), TransportError> {
681        *self.running.write().await = true;
682
683        // Send initial hello
684        self.signaling.send_hello(vec![]).await?;
685
686        Ok(())
687    }
688
689    /// Stop the store
690    pub async fn stop(&self) {
691        *self.running.write().await = false;
692    }
693
694    /// Process incoming signaling message
695    pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
696        // When a new peer connects, initialize their HTL config
697        let peer_id = msg.peer_id().to_string();
698        {
699            let mut configs = self.htl_configs.write().await;
700            if !configs.contains_key(&peer_id) {
701                configs.insert(peer_id.clone(), PeerHTLConfig::random());
702            }
703        }
704        self.peer_selector.write().await.add_peer(peer_id);
705
706        self.signaling.handle_message(msg).await
707    }
708
709    /// Get signaling manager reference
710    pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
711        &self.signaling
712    }
713
714    fn response_behavior(&self) -> ResponseBehaviorConfig {
715        self.routing.response_behavior.normalized()
716    }
717
718    async fn record_peer_wire_sent(&self, peer_id: &str, bytes: u64) {
719        if bytes == 0 {
720            return;
721        }
722        let mut stats = self.peer_wire_stats.write().await;
723        let entry = stats.entry(peer_id.to_string()).or_default();
724        entry.bytes_sent = entry.bytes_sent.saturating_add(bytes);
725    }
726
727    async fn record_peer_wire_received(&self, peer_id: &str, bytes: u64) {
728        if bytes == 0 {
729            return;
730        }
731        let mut stats = self.peer_wire_stats.write().await;
732        let entry = stats.entry(peer_id.to_string()).or_default();
733        entry.bytes_received = entry.bytes_received.saturating_add(bytes);
734    }
735
736    fn peer_upload_weight(stats: &PeerWireStats) -> f64 {
737        let raw_ratio = (stats.bytes_received.saturating_add(1024) as f64)
738            / (stats.bytes_sent.saturating_add(1024) as f64);
739        let bounded_ratio = raw_ratio / (1.0 + raw_ratio);
740        0.5 + 1.5 * bounded_ratio
741    }
742
743    fn choose_ready_response_job(
744        ready_jobs: &[(u64, String, usize, Instant, u64)],
745        stats: &HashMap<String, PeerWireStats>,
746    ) -> Option<(u64, f64)> {
747        ready_jobs
748            .iter()
749            .map(|job| {
750                let peer_stats = stats.get(&job.1).cloned().unwrap_or_default();
751                let finish = peer_stats.bandwidth_debt
752                    + (job.2 as f64 / Self::peer_upload_weight(&peer_stats));
753                (job.0, job.1.as_str(), job.4, finish)
754            })
755            .min_by(|left, right| {
756                left.3
757                    .partial_cmp(&right.3)
758                    .unwrap_or(std::cmp::Ordering::Equal)
759                    .then_with(|| left.2.cmp(&right.2))
760                    .then_with(|| left.1.cmp(right.1))
761            })
762            .map(|choice| (choice.0, choice.3))
763    }
764
765    async fn enqueue_response_send(
766        self: &Arc<Self>,
767        peer_id: String,
768        bytes: Vec<u8>,
769        ready_at: Instant,
770    ) {
771        let job_id = self.next_response_job_id.fetch_add(1, Ordering::Relaxed);
772        {
773            let mut queue = self.pending_response_sends.lock().await;
774            queue.push(PendingResponseSend {
775                job_id,
776                peer_id,
777                bytes,
778                ready_at,
779                queue_sequence: job_id,
780            });
781        }
782
783        if self
784            .response_scheduler_running
785            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
786            .is_ok()
787        {
788            let this = Arc::clone(self);
789            tokio::spawn(async move {
790                this.run_response_scheduler().await;
791            });
792        }
793    }
794
795    async fn run_response_scheduler(self: Arc<Self>) {
796        loop {
797            let snapshot = {
798                let queue = self.pending_response_sends.lock().await;
799                if queue.is_empty() {
800                    self.response_scheduler_running
801                        .store(false, Ordering::Release);
802                    return;
803                }
804                queue
805                    .iter()
806                    .map(|job| {
807                        (
808                            job.job_id,
809                            job.peer_id.clone(),
810                            job.bytes.len(),
811                            job.ready_at,
812                            job.queue_sequence,
813                        )
814                    })
815                    .collect::<Vec<_>>()
816            };
817
818            let now = Instant::now();
819            let mut earliest_ready_at: Option<Instant> = None;
820            let mut ready_jobs = Vec::new();
821            for job in &snapshot {
822                if job.3 <= now {
823                    ready_jobs.push(job.clone());
824                } else {
825                    earliest_ready_at = Some(match earliest_ready_at {
826                        Some(current) => current.min(job.3),
827                        None => job.3,
828                    });
829                }
830            }
831
832            if ready_jobs.is_empty() {
833                if let Some(ready_at) = earliest_ready_at {
834                    tokio::time::sleep(ready_at.saturating_duration_since(Instant::now())).await;
835                    continue;
836                }
837                self.response_scheduler_running
838                    .store(false, Ordering::Release);
839                return;
840            }
841
842            let (selected_job_id, selected_finish) = {
843                let stats = self.peer_wire_stats.read().await;
844                Self::choose_ready_response_job(&ready_jobs, &stats).expect("ready response job")
845            };
846
847            let selected = {
848                let mut queue = self.pending_response_sends.lock().await;
849                let Some(index) = queue.iter().position(|job| job.job_id == selected_job_id) else {
850                    continue;
851                };
852                queue.swap_remove(index)
853            };
854
855            let sent = if let Some(channel) = self.signaling.get_channel(&selected.peer_id).await {
856                channel.send(selected.bytes.clone()).await.is_ok()
857            } else {
858                false
859            };
860
861            let queued_peers = {
862                let queue = self.pending_response_sends.lock().await;
863                queue
864                    .iter()
865                    .map(|job| job.peer_id.clone())
866                    .collect::<HashSet<_>>()
867            };
868            let mut stats = self.peer_wire_stats.write().await;
869            let entry = stats.entry(selected.peer_id.clone()).or_default();
870            if sent {
871                entry.bytes_sent = entry.bytes_sent.saturating_add(selected.bytes.len() as u64);
872                entry.bandwidth_debt = selected_finish;
873            }
874            if queued_peers.is_empty() {
875                for peer_stats in stats.values_mut() {
876                    peer_stats.bandwidth_debt = 0.0;
877                }
878            } else {
879                let floor = queued_peers
880                    .iter()
881                    .filter_map(|peer_id| stats.get(peer_id).map(|peer| peer.bandwidth_debt))
882                    .fold(f64::INFINITY, f64::min);
883                if floor.is_finite() && floor > 0.0 {
884                    for peer_id in queued_peers {
885                        if let Some(peer_stats) = stats.get_mut(&peer_id) {
886                            peer_stats.bandwidth_debt =
887                                (peer_stats.bandwidth_debt - floor).max(0.0);
888                        }
889                    }
890                }
891            }
892        }
893    }
894
895    fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
896        let mut hasher = DefaultHasher::new();
897        peer_id.hash(&mut hasher);
898        hash.hash(&mut hasher);
899        salt.hash(&mut hasher);
900        let v = hasher.finish();
901        (v as f64) / (u64::MAX as f64)
902    }
903
904    fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
905        Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
906    }
907
908    fn peer_metadata_pointer_slot_hash() -> Hash {
909        hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
910    }
911
912    fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
913        let bytes = hex::decode(hash_hex)
914            .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
915        if bytes.len() != 32 {
916            return Err(StoreError::Other(format!(
917                "Invalid hash length {}, expected 32 bytes",
918                bytes.len()
919            )));
920        }
921        let mut hash = [0u8; 32];
922        hash.copy_from_slice(&bytes);
923        Ok(hash)
924    }
925
926    fn should_drop_response(&self, hash: &Hash) -> bool {
927        let p = self.response_behavior().drop_response_prob;
928        if p <= 0.0 {
929            return false;
930        }
931        self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
932    }
933
934    fn should_corrupt_response(&self, hash: &Hash) -> bool {
935        let p = self.response_behavior().corrupt_response_prob;
936        if p <= 0.0 {
937            return false;
938        }
939        self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
940    }
941
942    fn should_stall_response(&self, hash: &Hash) -> bool {
943        let p = self.response_behavior().stall_response_prob;
944        if p <= 0.0 {
945            return false;
946        }
947        self.deterministic_actor_draw(hash, 0x5A_11_5A_11_5A_11_5A_11) < p
948    }
949
950    fn response_send_delay(&self, hash: &Hash, payload_len: usize) -> Duration {
951        let behavior = self.response_behavior();
952        let mut total_ms = behavior
953            .extra_delay_ms
954            .saturating_add(behavior.first_byte_delay_ms);
955
956        if behavior.bytes_per_second > 0 && payload_len > 0 {
957            let throughput_ms = ((payload_len as u128) * 1000)
958                .div_ceil(behavior.bytes_per_second as u128)
959                .min(u64::MAX as u128) as u64;
960            total_ms = total_ms.saturating_add(throughput_ms);
961        }
962
963        if behavior.stall_delay_ms > 0 && self.should_stall_response(hash) {
964            total_ms = total_ms.saturating_add(behavior.stall_delay_ms);
965        }
966
967        Duration::from_millis(total_ms)
968    }
969
970    async fn ordered_connected_peers(&self, exclude_peer_id: Option<&str>) -> Vec<String> {
971        let current_peer_ids = self.signaling.peer_ids().await;
972        if current_peer_ids.is_empty() {
973            return Vec::new();
974        }
975
976        sync_selector_peers(&self.peer_selector, &current_peer_ids).await;
977        let hash_get_peer_ids: HashSet<String> = self
978            .signaling
979            .hash_get_peer_ids()
980            .await
981            .into_iter()
982            .collect();
983        let mut candidate_peer_ids: Vec<String> = current_peer_ids
984            .into_iter()
985            .filter(|peer_id| hash_get_peer_ids.contains(peer_id))
986            .filter(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude))
987            .collect();
988        if candidate_peer_ids.is_empty() {
989            return Vec::new();
990        }
991
992        let current_set: HashSet<&str> = candidate_peer_ids.iter().map(String::as_str).collect();
993        let mut selector = self.peer_selector.write().await;
994        let mut selector_order = selector.select_peers();
995        selector_order.retain(|peer_id| current_set.contains(peer_id.as_str()));
996        if selector_order.is_empty() {
997            let mut fallback = candidate_peer_ids;
998            fallback.sort();
999            return fallback;
1000        }
1001        let backed_off: HashMap<String, bool> = candidate_peer_ids
1002            .iter()
1003            .map(|peer_id| (peer_id.clone(), selector.is_peer_backed_off(peer_id)))
1004            .collect();
1005        drop(selector);
1006
1007        let rank: HashMap<&str, usize> = selector_order
1008            .iter()
1009            .enumerate()
1010            .map(|(idx, peer_id)| (peer_id.as_str(), idx))
1011            .collect();
1012        let active = self.peer_active_requests.read().await;
1013        candidate_peer_ids.sort_by(|left, right| {
1014            let left_backed_off = backed_off.get(left).copied().unwrap_or(false);
1015            let right_backed_off = backed_off.get(right).copied().unwrap_or(false);
1016            if left_backed_off != right_backed_off {
1017                return if left_backed_off {
1018                    std::cmp::Ordering::Greater
1019                } else {
1020                    std::cmp::Ordering::Less
1021                };
1022            }
1023            let left_rank = rank.get(left.as_str()).copied().unwrap_or(usize::MAX / 2);
1024            let right_rank = rank.get(right.as_str()).copied().unwrap_or(usize::MAX / 2);
1025            let left_load = active.get(left).copied().unwrap_or(0);
1026            let right_load = active.get(right).copied().unwrap_or(0);
1027            (left_rank + left_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY))
1028                .cmp(&(right_rank + right_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY)))
1029                .then_with(|| left.cmp(right))
1030        });
1031        candidate_peer_ids
1032    }
1033
1034    async fn reserve_peer_request(&self, peer_id: &str) {
1035        let mut active = self.peer_active_requests.write().await;
1036        *active.entry(peer_id.to_string()).or_insert(0) += 1;
1037    }
1038
1039    async fn release_peer_request(&self, peer_id: &str) {
1040        let mut active = self.peer_active_requests.write().await;
1041        let Some(count) = active.get_mut(peer_id) else {
1042            return;
1043        };
1044        if *count <= 1 {
1045            active.remove(peer_id);
1046        } else {
1047            *count -= 1;
1048        }
1049    }
1050
1051    async fn release_queried_peer_requests(&self, peer_ids: &[String]) {
1052        for peer_id in peer_ids {
1053            self.release_peer_request(peer_id).await;
1054        }
1055    }
1056
1057    fn requested_quote_mint(&self) -> Option<&str> {
1058        if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
1059            if self.routing.cashu_accepted_mints.is_empty()
1060                || self
1061                    .routing
1062                    .cashu_accepted_mints
1063                    .iter()
1064                    .any(|mint| mint == default_mint)
1065            {
1066                return Some(default_mint);
1067            }
1068        }
1069
1070        self.routing
1071            .cashu_accepted_mints
1072            .first()
1073            .map(String::as_str)
1074    }
1075
1076    fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
1077        if let Some(requested_mint) = requested_mint {
1078            if self.accepts_quote_mint(Some(requested_mint)) {
1079                return Some(requested_mint.to_string());
1080            }
1081        }
1082        if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
1083            return Some(default_mint.clone());
1084        }
1085        if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
1086            return Some(first_mint.clone());
1087        }
1088        requested_mint.map(str::to_string)
1089    }
1090
1091    fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
1092        if self.routing.cashu_accepted_mints.is_empty() {
1093            return true;
1094        }
1095
1096        let Some(mint_url) = mint_url else {
1097            return false;
1098        };
1099        self.routing
1100            .cashu_accepted_mints
1101            .iter()
1102            .any(|mint| mint == mint_url)
1103    }
1104
1105    fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
1106        let Some(mint_url) = mint_url else {
1107            return self.routing.cashu_default_mint.is_none()
1108                && self.routing.cashu_accepted_mints.is_empty();
1109        };
1110        self.routing.cashu_default_mint.as_deref() == Some(mint_url)
1111            || self
1112                .routing
1113                .cashu_accepted_mints
1114                .iter()
1115                .any(|mint| mint == mint_url)
1116    }
1117
1118    async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
1119        let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
1120        if base == 0 {
1121            return 0;
1122        }
1123
1124        let selector = self.peer_selector.read().await;
1125        let Some(stats) = selector.get_stats(peer_id) else {
1126            let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
1127            return if max_cap > 0 { base.min(max_cap) } else { base };
1128        };
1129
1130        if stats.cashu_payment_defaults > 0
1131            && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
1132        {
1133            return 0;
1134        }
1135
1136        let success_bonus = stats
1137            .successes
1138            .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
1139        let receipt_bonus = stats
1140            .cashu_payment_receipts
1141            .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
1142        let mut cap = base
1143            .saturating_add(success_bonus)
1144            .saturating_add(receipt_bonus);
1145        let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
1146        if max_cap > 0 {
1147            cap = cap.min(max_cap);
1148        }
1149        cap
1150    }
1151
1152    async fn should_accept_quote_response(
1153        &self,
1154        from_peer: &str,
1155        preferred_mint_url: Option<&str>,
1156        offered_payment_sat: u64,
1157        res: &DataQuoteResponse,
1158    ) -> bool {
1159        let Some(payment_sat) = res.p else {
1160            return false;
1161        };
1162        if payment_sat > offered_payment_sat {
1163            return false;
1164        }
1165
1166        let response_mint = res.m.as_deref();
1167        if response_mint == preferred_mint_url {
1168            return true;
1169        }
1170        if self.trusts_quote_mint(response_mint) {
1171            return true;
1172        }
1173        if response_mint.is_none() {
1174            return false;
1175        }
1176
1177        payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
1178    }
1179
1180    async fn issue_quote(
1181        &self,
1182        peer_id: &str,
1183        hash_key: &str,
1184        payment_sat: u64,
1185        ttl_ms: u32,
1186        mint_url: Option<&str>,
1187    ) -> u64 {
1188        let quote_id = {
1189            let mut next = self.next_quote_id.write().await;
1190            let quote_id = *next;
1191            *next = next.saturating_add(1);
1192            quote_id
1193        };
1194
1195        let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
1196        self.issued_quotes.write().await.insert(
1197            (peer_id.to_string(), hash_key.to_string(), quote_id),
1198            IssuedQuote {
1199                expires_at,
1200                payment_sat,
1201                mint_url: mint_url.map(str::to_string),
1202            },
1203        );
1204        quote_id
1205    }
1206
1207    async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
1208        let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
1209        let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
1210            return false;
1211        };
1212        quote.expires_at > Instant::now()
1213    }
1214
1215    async fn send_request_to_peer(
1216        &self,
1217        peer_id: &str,
1218        hash: &Hash,
1219        request_htl: u8,
1220        quote_id: Option<u64>,
1221    ) -> bool {
1222        if !should_forward_htl(request_htl) {
1223            return false;
1224        }
1225
1226        let channel = match self.signaling.get_channel(peer_id).await {
1227            Some(c) => c,
1228            None => return false,
1229        };
1230
1231        let htl_config = {
1232            let configs = self.htl_configs.read().await;
1233            configs
1234                .get(peer_id)
1235                .cloned()
1236                .unwrap_or_else(PeerHTLConfig::random)
1237        };
1238
1239        let send_htl = htl_config.decrement(request_htl);
1240        let req = match quote_id {
1241            Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
1242            None => create_request(hash, send_htl),
1243        };
1244        let request_bytes = encode_request(&req);
1245        let request_len = request_bytes.len() as u64;
1246
1247        {
1248            let mut selector = self.peer_selector.write().await;
1249            selector.record_request(peer_id, request_len);
1250        }
1251
1252        match channel.send(request_bytes).await {
1253            Ok(()) => {
1254                self.record_peer_wire_sent(peer_id, request_len).await;
1255                true
1256            }
1257            Err(_) => {
1258                self.peer_selector.write().await.record_failure(peer_id);
1259                false
1260            }
1261        }
1262    }
1263
1264    async fn send_quote_request_to_peer(
1265        &self,
1266        peer_id: &str,
1267        hash: &Hash,
1268        payment_sat: u64,
1269        ttl_ms: u32,
1270        mint_url: Option<&str>,
1271    ) -> bool {
1272        let channel = match self.signaling.get_channel(peer_id).await {
1273            Some(c) => c,
1274            None => return false,
1275        };
1276
1277        let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
1278        let request_bytes = encode_quote_request(&req);
1279        let request_len = request_bytes.len() as u64;
1280
1281        match channel.send(request_bytes).await {
1282            Ok(()) => {
1283                self.record_peer_wire_sent(peer_id, request_len).await;
1284                true
1285            }
1286            Err(_) => false,
1287        }
1288    }
1289
1290    pub async fn set_read_sources(&self, sources: Vec<Arc<dyn MeshReadSource>>) {
1291        let mut by_id = HashMap::new();
1292        let mut stats = self.read_source_stats.write().await;
1293        for source in sources {
1294            let source_id = source.id().to_string();
1295            by_id.insert(source_id.clone(), source);
1296            stats
1297                .entry(source_id)
1298                .or_insert_with(AdaptiveSourceStats::default);
1299        }
1300        *self.read_sources.write().await = by_id;
1301    }
1302
1303    async fn record_read_source_request(&self, source_id: &str) {
1304        let mut stats = self.read_source_stats.write().await;
1305        stats
1306            .entry(source_id.to_string())
1307            .or_insert_with(AdaptiveSourceStats::default)
1308            .requests += 1;
1309    }
1310
1311    async fn record_read_source_miss(&self, source_id: &str) {
1312        let mut stats = self.read_source_stats.write().await;
1313        stats
1314            .entry(source_id.to_string())
1315            .or_insert_with(AdaptiveSourceStats::default)
1316            .misses += 1;
1317    }
1318
1319    async fn record_read_source_success(&self, source_id: &str, elapsed_ms: u64) {
1320        let now = Instant::now();
1321        let mut stats = self.read_source_stats.write().await;
1322        let stats = stats
1323            .entry(source_id.to_string())
1324            .or_insert_with(AdaptiveSourceStats::default);
1325        stats.successes += 1;
1326        stats.last_success_at = Some(now);
1327        stats.backoff_level = 0;
1328        stats.backed_off_until = None;
1329        if stats.srtt_ms <= 0.0 {
1330            stats.srtt_ms = elapsed_ms as f64;
1331            stats.rttvar_ms = elapsed_ms as f64 / 2.0;
1332            return;
1333        }
1334        let elapsed = elapsed_ms as f64;
1335        stats.rttvar_ms = 0.75 * stats.rttvar_ms + 0.25 * (stats.srtt_ms - elapsed).abs();
1336        stats.srtt_ms = 0.875 * stats.srtt_ms + 0.125 * elapsed;
1337    }
1338
1339    async fn record_read_source_failure(&self, source_id: &str) {
1340        let now = Instant::now();
1341        let mut stats = self.read_source_stats.write().await;
1342        let stats = stats
1343            .entry(source_id.to_string())
1344            .or_insert_with(AdaptiveSourceStats::default);
1345        stats.failures += 1;
1346        stats.last_failure_at = Some(now);
1347        Self::apply_source_backoff(stats, now);
1348    }
1349
1350    async fn record_read_source_timeout(&self, source_id: &str) {
1351        let now = Instant::now();
1352        let mut stats = self.read_source_stats.write().await;
1353        let stats = stats
1354            .entry(source_id.to_string())
1355            .or_insert_with(AdaptiveSourceStats::default);
1356        stats.timeouts += 1;
1357        stats.last_failure_at = Some(now);
1358        Self::apply_source_backoff(stats, now);
1359    }
1360
1361    fn apply_source_backoff(stats: &mut AdaptiveSourceStats, now: Instant) {
1362        stats.backoff_level = stats.backoff_level.saturating_add(1);
1363        let backoff_ms = (INITIAL_SOURCE_BACKOFF_MS
1364            .saturating_mul(2u64.saturating_pow(stats.backoff_level.saturating_sub(1))))
1365        .min(MAX_SOURCE_BACKOFF_MS);
1366        stats.backed_off_until = Some(now + Duration::from_millis(backoff_ms));
1367    }
1368
1369    async fn ordered_read_sources(&self) -> Vec<Arc<dyn MeshReadSource>> {
1370        let sources = self.read_sources.read().await;
1371        if sources.is_empty() {
1372            return Vec::new();
1373        }
1374
1375        let mut available: Vec<Arc<dyn MeshReadSource>> = sources
1376            .values()
1377            .filter(|source| source.is_available())
1378            .cloned()
1379            .collect();
1380        if available.is_empty() {
1381            return Vec::new();
1382        }
1383
1384        let now = Instant::now();
1385        let stats = self.read_source_stats.read().await;
1386        let mut healthy: Vec<Arc<dyn MeshReadSource>> = available
1387            .iter()
1388            .filter(|source| {
1389                stats
1390                    .get(source.id())
1391                    .and_then(|s| s.backed_off_until)
1392                    .is_none_or(|until| until <= now)
1393            })
1394            .cloned()
1395            .collect();
1396        if !healthy.is_empty() {
1397            available = std::mem::take(&mut healthy);
1398        }
1399
1400        available.sort_by(|left, right| {
1401            let left_stats = stats.get(left.id()).cloned().unwrap_or_default();
1402            let right_stats = stats.get(right.id()).cloned().unwrap_or_default();
1403            adaptive_source_score(&right_stats, now)
1404                .partial_cmp(&adaptive_source_score(&left_stats, now))
1405                .unwrap_or(std::cmp::Ordering::Equal)
1406                .then_with(|| left.id().cmp(right.id()))
1407        });
1408        available
1409    }
1410
1411    async fn should_probe_multiple_read_sources(
1412        &self,
1413        ordered_sources: &[Arc<dyn MeshReadSource>],
1414    ) -> bool {
1415        if ordered_sources.len() <= 1 {
1416            return false;
1417        }
1418        let stats = self.read_source_stats.read().await;
1419        let best = stats
1420            .get(ordered_sources[0].id())
1421            .cloned()
1422            .unwrap_or_default();
1423        let second = stats
1424            .get(ordered_sources[1].id())
1425            .cloned()
1426            .unwrap_or_default();
1427        if !source_has_history(&best) || !source_has_history(&second) {
1428            return false;
1429        }
1430        let now = Instant::now();
1431        adaptive_source_score(&best, now) - adaptive_source_score(&second, now)
1432            < SOURCE_SCORE_TIE_DELTA
1433    }
1434
1435    async fn source_dispatch_for(&self, source_count: usize) -> RequestDispatchConfig {
1436        if source_count == 0 {
1437            return self.routing.dispatch;
1438        }
1439        let ordered_sources = self.ordered_read_sources().await;
1440        let probe_multiple = self
1441            .should_probe_multiple_read_sources(&ordered_sources)
1442            .await;
1443        let initial_fanout = if probe_multiple {
1444            source_count.min(2)
1445        } else {
1446            1
1447        };
1448        RequestDispatchConfig {
1449            initial_fanout,
1450            hedge_fanout: self.routing.dispatch.hedge_fanout,
1451            max_fanout: self.routing.dispatch.max_fanout.min(source_count),
1452            hedge_interval_ms: self.routing.dispatch.hedge_interval_ms,
1453        }
1454    }
1455
1456    /// Get peer count
1457    pub async fn peer_count(&self) -> usize {
1458        self.signaling.peer_count().await
1459    }
1460
1461    /// Check if we need more peers
1462    pub async fn needs_peers(&self) -> bool {
1463        self.signaling.needs_peers().await
1464    }
1465
1466    /// Re-broadcast hello to refresh discovery as topology changes.
1467    pub async fn send_hello(&self) -> Result<(), TransportError> {
1468        self.signaling.send_hello(vec![]).await
1469    }
1470
1471    /// Drain all currently available peer-link messages and handle them.
1472    ///
1473    /// This keeps the message pump logic shared between simulation and the
1474    /// default production wrapper instead of duplicating per-channel loops.
1475    pub async fn drain_available_data_messages(self: &Arc<Self>) -> DataPumpStats {
1476        let mut stats = DataPumpStats::default();
1477        let peer_ids = self.signaling.peer_ids().await;
1478        for peer_id in peer_ids {
1479            let Some(channel) = self.signaling.get_channel(&peer_id).await else {
1480                continue;
1481            };
1482
1483            while let Some(data) = channel.try_recv() {
1484                stats.processed += 1;
1485                stats.processed_bytes += data.len() as u64;
1486                if let Some(msg) = parse_message(&data) {
1487                    match msg {
1488                        DataMessage::Request(_) => stats.request_messages += 1,
1489                        DataMessage::Response(_) => stats.response_messages += 1,
1490                        DataMessage::QuoteRequest(_) => stats.quote_request_messages += 1,
1491                        DataMessage::QuoteResponse(_) => stats.quote_response_messages += 1,
1492                        DataMessage::Payment(_)
1493                        | DataMessage::PaymentAck(_)
1494                        | DataMessage::Chunk(_)
1495                        | DataMessage::PeerHints(_) => {}
1496                    }
1497                }
1498                self.handle_data_message(&peer_id, &data).await;
1499            }
1500        }
1501        stats
1502    }
1503
1504    /// Apply an out-of-band payment credit to a peer's routing priority.
1505    pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
1506        self.peer_selector
1507            .write()
1508            .await
1509            .record_cashu_payment(peer_id, amount_sat);
1510    }
1511
1512    /// Record a post-delivery payment we received from a peer.
1513    pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
1514        self.peer_selector
1515            .write()
1516            .await
1517            .record_cashu_receipt(peer_id, amount_sat);
1518    }
1519
1520    /// Record that a peer failed to pay after we delivered successfully.
1521    pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
1522        self.peer_selector
1523            .write()
1524            .await
1525            .record_cashu_payment_default(peer_id);
1526    }
1527
1528    /// Snapshot routing/selection summary for inspection/debugging.
1529    pub async fn selector_summary(&self) -> crate::peer_selector::SelectorSummary {
1530        self.peer_selector.read().await.summary()
1531    }
1532
1533    fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
1534        selector.is_peer_blocked_for_payment_defaults(
1535            peer_id,
1536            self.routing.cashu_payment_default_block_threshold,
1537        )
1538    }
1539
1540    /// Export live peer metadata for inspection/debugging.
1541    pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
1542        self.peer_selector
1543            .read()
1544            .await
1545            .export_peer_metadata_snapshot()
1546    }
1547
1548    /// Snapshot current peer metadata and persist it into `local_store`.
1549    ///
1550    /// Uses content-addressed storage for the snapshot body and a reserved
1551    /// mutable pointer slot for the "latest snapshot hash".
1552    pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
1553        let snapshot = self
1554            .peer_selector
1555            .read()
1556            .await
1557            .export_peer_metadata_snapshot();
1558        let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
1559            StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
1560        })?;
1561        let snapshot_hash = hashtree_core::sha256(&bytes);
1562        let _ = self.local_store.put(snapshot_hash, bytes).await?;
1563
1564        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
1565        let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
1566        let _ = self.local_store.delete(&pointer_slot).await?;
1567        let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
1568
1569        Ok(snapshot_hash)
1570    }
1571
1572    /// Load persisted peer metadata from `local_store` if available.
1573    pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
1574        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
1575        let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
1576            return Ok(false);
1577        };
1578        let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
1579            StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
1580        })?;
1581        let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
1582
1583        let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
1584            return Ok(false);
1585        };
1586        let snapshot: PeerMetadataSnapshot =
1587            serde_json::from_slice(&snapshot_bytes).map_err(|e| {
1588                StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
1589            })?;
1590        self.peer_selector
1591            .write()
1592            .await
1593            .import_peer_metadata_snapshot(&snapshot);
1594        Ok(true)
1595    }
1596
1597    /// Request data from peers after negotiating a paid quote.
1598    ///
1599    /// If quote negotiation fails or the quoted peer does not deliver, the store
1600    /// falls back to the normal unpaid retrieval path to preserve liveness.
1601    pub async fn get_with_quote(
1602        &self,
1603        hash: &Hash,
1604        payment_sat: u64,
1605        quote_ttl: Duration,
1606    ) -> Result<Option<Vec<u8>>, StoreError> {
1607        if let Some(data) = self.local_store.get(hash).await? {
1608            return Ok(Some(data));
1609        }
1610        Ok(self
1611            .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
1612            .await)
1613    }
1614
1615    async fn request_from_peers_with_quote(
1616        &self,
1617        hash: &Hash,
1618        payment_sat: u64,
1619        quote_ttl: Duration,
1620    ) -> Option<Vec<u8>> {
1621        let ordered_peer_ids = self.ordered_connected_peers(None).await;
1622        if ordered_peer_ids.is_empty() {
1623            return None;
1624        }
1625
1626        if let Some(quote) = self
1627            .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
1628            .await
1629        {
1630            if let Some(data) = self
1631                .request_from_single_peer(hash, &quote.peer_id, MAX_HTL, Some(quote.quote_id))
1632                .await
1633            {
1634                return Some(data);
1635            }
1636        }
1637
1638        self.request_from_mesh(hash).await
1639    }
1640
1641    async fn request_quote_from_peers(
1642        &self,
1643        hash: &Hash,
1644        payment_sat: u64,
1645        quote_ttl: Duration,
1646        ordered_peer_ids: &[String],
1647    ) -> Option<NegotiatedQuote> {
1648        if ordered_peer_ids.is_empty() {
1649            return None;
1650        }
1651        let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
1652        if ttl_ms == 0 {
1653            return None;
1654        }
1655        let requested_mint = self.requested_quote_mint().map(str::to_string);
1656
1657        let hash_key = hash_to_key(hash);
1658        let (tx, rx) = oneshot::channel();
1659        self.pending_quotes.write().await.insert(
1660            hash_key.clone(),
1661            PendingQuoteRequest {
1662                response_tx: tx,
1663                preferred_mint_url: requested_mint.clone(),
1664                offered_payment_sat: payment_sat,
1665            },
1666        );
1667
1668        let rx = Arc::new(Mutex::new(rx));
1669        let result = run_hedged_waves(
1670            ordered_peer_ids.len(),
1671            self.routing.dispatch,
1672            self.request_timeout,
1673            |range| {
1674                let wave_peer_ids = ordered_peer_ids[range].to_vec();
1675                let requested_mint = requested_mint.clone();
1676                let hash = *hash;
1677                async move {
1678                    let mut sent = 0usize;
1679                    for peer_id in wave_peer_ids {
1680                        if self
1681                            .send_quote_request_to_peer(
1682                                &peer_id,
1683                                &hash,
1684                                payment_sat,
1685                                ttl_ms,
1686                                requested_mint.as_deref(),
1687                            )
1688                            .await
1689                        {
1690                            sent += 1;
1691                        }
1692                    }
1693                    sent
1694                }
1695            },
1696            |wait| {
1697                let rx = rx.clone();
1698                async move {
1699                    let mut rx = rx.lock().await;
1700                    match tokio::time::timeout(wait, &mut *rx).await {
1701                        Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
1702                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1703                        Err(_) => HedgedWaveAction::Continue,
1704                    }
1705                }
1706            },
1707        )
1708        .await;
1709        let _ = self.pending_quotes.write().await.remove(&hash_key);
1710        result
1711    }
1712
1713    async fn request_from_single_peer(
1714        &self,
1715        hash: &Hash,
1716        peer_id: &str,
1717        request_htl: u8,
1718        quote_id: Option<u64>,
1719    ) -> Option<Vec<u8>> {
1720        let hash_key = hash_to_key(hash);
1721        let (tx, rx) = oneshot::channel();
1722        self.pending_requests.write().await.insert(
1723            hash_key.clone(),
1724            PendingRequest {
1725                response_tx: tx,
1726                started_at: Instant::now(),
1727                queried_peers: vec![peer_id.to_string()],
1728            },
1729        );
1730
1731        let mut rx = rx;
1732        if !self
1733            .send_request_to_peer(peer_id, hash, request_htl, quote_id)
1734            .await
1735        {
1736            let _ = self.pending_requests.write().await.remove(&hash_key);
1737            return None;
1738        }
1739        self.reserve_peer_request(peer_id).await;
1740
1741        if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
1742            if hashtree_core::sha256(&data) == *hash {
1743                let _ = self.local_store.put(*hash, data.clone()).await;
1744                return Some(data);
1745            }
1746        }
1747
1748        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1749            self.release_queried_peer_requests(&pending.queried_peers)
1750                .await;
1751            for peer_id in pending.queried_peers {
1752                self.peer_selector.write().await.record_timeout(&peer_id);
1753            }
1754        }
1755        let _ = self.take_forward_requesters(&hash_key).await;
1756        None
1757    }
1758
1759    async fn request_from_ordered_peers(
1760        &self,
1761        hash: &Hash,
1762        ordered_peer_ids: &[String],
1763        request_htl: u8,
1764    ) -> RouteFetchOutcome {
1765        let hash_key = hash_to_key(hash);
1766        let (tx, rx) = oneshot::channel();
1767        self.pending_requests.write().await.insert(
1768            hash_key.clone(),
1769            PendingRequest {
1770                response_tx: tx,
1771                started_at: Instant::now(),
1772                queried_peers: Vec::new(),
1773            },
1774        );
1775
1776        let rx = Arc::new(Mutex::new(rx));
1777        let result = run_hedged_waves(
1778            ordered_peer_ids.len(),
1779            self.routing.dispatch,
1780            self.request_timeout,
1781            |range| {
1782                let wave_peer_ids = ordered_peer_ids[range].to_vec();
1783                let hash = *hash;
1784                let hash_key = hash_key.clone();
1785                async move {
1786                    let mut sent = 0usize;
1787                    for peer_id in wave_peer_ids {
1788                        if self
1789                            .send_request_to_peer(&peer_id, &hash, request_htl, None)
1790                            .await
1791                        {
1792                            sent += 1;
1793                            self.reserve_peer_request(&peer_id).await;
1794                            if let Some(pending) =
1795                                self.pending_requests.write().await.get_mut(&hash_key)
1796                            {
1797                                pending.queried_peers.push(peer_id);
1798                            }
1799                        }
1800                    }
1801                    sent
1802                }
1803            },
1804            |wait| {
1805                let rx = rx.clone();
1806                async move {
1807                    let mut rx = rx.lock().await;
1808                    match tokio::time::timeout(wait, &mut *rx).await {
1809                        Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == *hash => {
1810                            HedgedWaveAction::Success(data)
1811                        }
1812                        Ok(Ok(Some(_))) => HedgedWaveAction::Continue,
1813                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1814                        Err(_) => HedgedWaveAction::Continue,
1815                    }
1816                }
1817            },
1818        )
1819        .await;
1820
1821        let Some(data) = result else {
1822            if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1823                self.release_queried_peer_requests(&pending.queried_peers)
1824                    .await;
1825                for peer_id in pending.queried_peers {
1826                    self.peer_selector.write().await.record_timeout(&peer_id);
1827                }
1828            }
1829            let _ = self.take_forward_requesters(&hash_key).await;
1830            return RouteFetchOutcome::Timeout;
1831        };
1832
1833        let _ = self.local_store.put(*hash, data.clone()).await;
1834        RouteFetchOutcome::Hit(data)
1835    }
1836
1837    async fn request_from_read_sources_inner(&self, hash: &Hash) -> RouteFetchOutcome {
1838        let ordered_sources = self.ordered_read_sources().await;
1839        if ordered_sources.is_empty() {
1840            return RouteFetchOutcome::Miss;
1841        }
1842
1843        let dispatch = normalize_dispatch_config(
1844            self.source_dispatch_for(ordered_sources.len()).await,
1845            ordered_sources.len(),
1846        );
1847        let wave_plan = build_hedged_wave_plan(ordered_sources.len(), dispatch);
1848        if wave_plan.is_empty() {
1849            return RouteFetchOutcome::Miss;
1850        }
1851
1852        let deadline = Instant::now() + self.request_timeout;
1853        let mut pending = FuturesUnordered::new();
1854        let mut pending_source_ids = HashSet::new();
1855        let mut saw_timeout = false;
1856        let mut next_source_idx = 0usize;
1857
1858        for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
1859            let from = next_source_idx;
1860            let to = (next_source_idx + wave_size).min(ordered_sources.len());
1861            next_source_idx = to;
1862
1863            for source in &ordered_sources[from..to] {
1864                let source = Arc::clone(source);
1865                let source_id = source.id().to_string();
1866                self.record_read_source_request(&source_id).await;
1867                pending_source_ids.insert(source_id.clone());
1868                let hash = *hash;
1869                pending.push(tokio::spawn(async move {
1870                    let started_at = Instant::now();
1871                    let result = std::panic::AssertUnwindSafe(source.get(&hash))
1872                        .catch_unwind()
1873                        .await;
1874                    match result {
1875                        Ok(Some(data)) => SourceFetchOutcome::Hit {
1876                            source_id,
1877                            data,
1878                            elapsed_ms: started_at.elapsed().as_millis().max(1) as u64,
1879                        },
1880                        Ok(None) => SourceFetchOutcome::Miss { source_id },
1881                        Err(_) => SourceFetchOutcome::Failure { source_id },
1882                    }
1883                }));
1884            }
1885
1886            let is_last_wave =
1887                wave_idx + 1 == wave_plan.len() || next_source_idx >= ordered_sources.len();
1888            let window_end = if is_last_wave {
1889                deadline
1890            } else {
1891                (Instant::now() + Duration::from_millis(dispatch.hedge_interval_ms)).min(deadline)
1892            };
1893
1894            while Instant::now() < window_end {
1895                let remaining = window_end.saturating_duration_since(Instant::now());
1896                let Some(result) = tokio::time::timeout(remaining, pending.next())
1897                    .await
1898                    .ok()
1899                    .flatten()
1900                else {
1901                    break;
1902                };
1903                let Ok(outcome) = result else {
1904                    continue;
1905                };
1906                match outcome {
1907                    SourceFetchOutcome::Hit {
1908                        source_id,
1909                        data,
1910                        elapsed_ms,
1911                    } => {
1912                        pending_source_ids.remove(&source_id);
1913                        self.record_read_source_success(&source_id, elapsed_ms)
1914                            .await;
1915                        return RouteFetchOutcome::Hit(data);
1916                    }
1917                    SourceFetchOutcome::Miss { source_id } => {
1918                        pending_source_ids.remove(&source_id);
1919                        self.record_read_source_miss(&source_id).await;
1920                    }
1921                    SourceFetchOutcome::Failure { source_id } => {
1922                        pending_source_ids.remove(&source_id);
1923                        self.record_read_source_failure(&source_id).await;
1924                    }
1925                }
1926            }
1927
1928            if Instant::now() >= deadline {
1929                break;
1930            }
1931        }
1932
1933        for source_id in pending_source_ids {
1934            saw_timeout = true;
1935            self.record_read_source_timeout(&source_id).await;
1936        }
1937        if saw_timeout {
1938            RouteFetchOutcome::Timeout
1939        } else {
1940            RouteFetchOutcome::Miss
1941        }
1942    }
1943
1944    async fn request_from_read_sources(&self, hash: &Hash) -> RouteFetchOutcome {
1945        let hash_key = hash_to_key(hash);
1946        let existing_wait = {
1947            let mut inflight = self.inflight_source_fetches.lock().await;
1948            if let Some(existing) = inflight.get_mut(&hash_key) {
1949                let (tx, rx) = oneshot::channel();
1950                existing.waiters.push(tx);
1951                Some(rx)
1952            } else {
1953                inflight.insert(
1954                    hash_key.clone(),
1955                    InflightSourceFetch {
1956                        waiters: Vec::new(),
1957                    },
1958                );
1959                None
1960            }
1961        };
1962
1963        if let Some(wait) = existing_wait {
1964            return wait.await.unwrap_or(RouteFetchOutcome::Timeout);
1965        }
1966
1967        let result = self.request_from_read_sources_inner(hash).await;
1968        if let RouteFetchOutcome::Hit(hit) = &result {
1969            let _ = self.local_store.put(*hash, hit.clone()).await;
1970        }
1971        self.complete_inflight_source_fetch(&hash_key, result.clone())
1972            .await;
1973
1974        result
1975    }
1976
1977    async fn complete_inflight_source_fetch(&self, hash_key: &str, result: RouteFetchOutcome) {
1978        let waiters = self
1979            .inflight_source_fetches
1980            .lock()
1981            .await
1982            .remove(hash_key)
1983            .map(|inflight| inflight.waiters)
1984            .unwrap_or_default();
1985        for waiter in waiters {
1986            let _ = waiter.send(result.clone());
1987        }
1988    }
1989
1990    async fn cancel_pending_peer_route(&self, hash: &Hash) {
1991        let hash_key = hash_to_key(hash);
1992        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1993            self.release_queried_peer_requests(&pending.queried_peers)
1994                .await;
1995        }
1996    }
1997
1998    async fn cancel_losing_route(&self, hash: &Hash, route: &ReadRoute, winner_data: &[u8]) {
1999        match route {
2000            ReadRoute::Peers(_) => self.cancel_pending_peer_route(hash).await,
2001            ReadRoute::Sources => {
2002                let hash_key = hash_to_key(hash);
2003                self.complete_inflight_source_fetch(
2004                    &hash_key,
2005                    RouteFetchOutcome::Hit(winner_data.to_vec()),
2006                )
2007                .await;
2008            }
2009        }
2010    }
2011
2012    async fn ranked_read_routes(&self, context: &MeshReadContext) -> Vec<RankedReadRoute> {
2013        let mut routes = Vec::new();
2014        let ordered_peers = if should_forward_htl(context.request_htl) {
2015            self.ordered_connected_peers(context.exclude_peer_id.as_deref())
2016                .await
2017        } else {
2018            Vec::new()
2019        };
2020        if !ordered_peers.is_empty() {
2021            let best_peer_id = ordered_peers[0].clone();
2022            let selector = self.peer_selector.read().await;
2023            let best_peer = selector.get_stats(&best_peer_id).cloned();
2024            let now = Instant::now();
2025            let (score, has_history) = match best_peer.as_ref() {
2026                Some(stats) => (
2027                    peer_endpoint_score(stats, now),
2028                    peer_endpoint_has_history(stats),
2029                ),
2030                None => (0.0, false),
2031            };
2032            routes.push(RankedReadRoute {
2033                route: ReadRoute::Peers(ordered_peers),
2034                best_endpoint_id: format!("peer:{best_peer_id}"),
2035                score,
2036                has_history,
2037            });
2038        }
2039        let ordered_sources = self.ordered_read_sources().await;
2040        if let Some(best_source) = ordered_sources.first() {
2041            let stats = self.read_source_stats.read().await;
2042            let best_source_stats = stats.get(best_source.id()).cloned().unwrap_or_default();
2043            let now = Instant::now();
2044            routes.push(RankedReadRoute {
2045                route: ReadRoute::Sources,
2046                best_endpoint_id: format!("source:{}", best_source.id()),
2047                score: adaptive_source_score(&best_source_stats, now),
2048                has_history: source_has_history(&best_source_stats),
2049            });
2050        }
2051        if routes.len() <= 1 {
2052            return routes;
2053        }
2054
2055        routes.sort_by(|left, right| {
2056            right
2057                .score
2058                .partial_cmp(&left.score)
2059                .unwrap_or(std::cmp::Ordering::Equal)
2060                .then_with(|| ranked_route_kind(&left.route).cmp(&ranked_route_kind(&right.route)))
2061                .then_with(|| left.best_endpoint_id.cmp(&right.best_endpoint_id))
2062                .then_with(|| left.route.id().cmp(right.route.id()))
2063        });
2064        routes
2065    }
2066
2067    fn should_probe_multiple_routes(&self, routes: &[RankedReadRoute]) -> bool {
2068        if routes.len() <= 1 {
2069            return false;
2070        }
2071        if !routes[0].has_history || !routes[1].has_history {
2072            return false;
2073        }
2074        (routes[0].score - routes[1].score) < SOURCE_SCORE_TIE_DELTA
2075    }
2076
2077    async fn run_read_route(
2078        &self,
2079        hash: &Hash,
2080        route: &ReadRoute,
2081        context: &MeshReadContext,
2082    ) -> RouteFetchOutcome {
2083        match route {
2084            ReadRoute::Peers(peer_ids) => {
2085                self.request_from_ordered_peers(hash, peer_ids, context.request_htl)
2086                    .await
2087            }
2088            ReadRoute::Sources => self.request_from_read_sources(hash).await,
2089        }
2090    }
2091
2092    async fn request_from_mesh_with_context(
2093        &self,
2094        hash: &Hash,
2095        context: &MeshReadContext,
2096    ) -> Option<Vec<u8>> {
2097        let routes = self.ranked_read_routes(context).await;
2098        match routes.as_slice() {
2099            [] => None,
2100            [ranked] => match self.run_read_route(hash, &ranked.route, context).await {
2101                RouteFetchOutcome::Hit(data) => Some(data),
2102                RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
2103            },
2104            [first, second, ..] => {
2105                if self.should_probe_multiple_routes(&routes) {
2106                    let first_fut = self.run_read_route(hash, &first.route, context);
2107                    let second_fut = self.run_read_route(hash, &second.route, context);
2108                    tokio::pin!(first_fut);
2109                    tokio::pin!(second_fut);
2110                    let mut first_done = false;
2111                    let mut second_done = false;
2112                    loop {
2113                        tokio::select! {
2114                            result = &mut first_fut, if !first_done => {
2115                                first_done = true;
2116                                if let RouteFetchOutcome::Hit(data) = result {
2117                                    if !second_done {
2118                                        self.cancel_losing_route(hash, &second.route, &data).await;
2119                                    }
2120                                    return Some(data);
2121                                }
2122                            }
2123                            result = &mut second_fut, if !second_done => {
2124                                second_done = true;
2125                                if let RouteFetchOutcome::Hit(data) = result {
2126                                    if !first_done {
2127                                        self.cancel_losing_route(hash, &first.route, &data).await;
2128                                    }
2129                                    return Some(data);
2130                                }
2131                            }
2132                            else => break,
2133                        }
2134                        if first_done && second_done {
2135                            break;
2136                        }
2137                    }
2138                    None
2139                } else {
2140                    match self.run_read_route(hash, &first.route, context).await {
2141                        RouteFetchOutcome::Hit(data) => return Some(data),
2142                        RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
2143                    }
2144                    for ranked in routes.iter().skip(1) {
2145                        match self.run_read_route(hash, &ranked.route, context).await {
2146                            RouteFetchOutcome::Hit(data) => return Some(data),
2147                            RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
2148                        }
2149                    }
2150                    None
2151                }
2152            }
2153        }
2154    }
2155
2156    async fn request_from_mesh(&self, hash: &Hash) -> Option<Vec<u8>> {
2157        self.request_from_mesh_with_context(hash, &MeshReadContext::default())
2158            .await
2159    }
2160
2161    async fn begin_forward_request(&self, hash_key: &str, requester_id: &str) -> bool {
2162        let mut pending = self.pending_forward_requests.write().await;
2163        if let Some(existing) = pending.get_mut(hash_key) {
2164            existing.requester_ids.insert(requester_id.to_string());
2165            return false;
2166        }
2167
2168        let mut requester_ids = HashSet::new();
2169        requester_ids.insert(requester_id.to_string());
2170        pending.insert(
2171            hash_key.to_string(),
2172            PendingForwardRequest { requester_ids },
2173        );
2174        true
2175    }
2176
2177    async fn was_recent_forward_miss(&self, hash_key: &str) -> bool {
2178        self.recent_forward_misses.lock().await.contains(hash_key)
2179    }
2180
2181    async fn mark_recent_forward_miss(&self, hash_key: &str) {
2182        let _ = self
2183            .recent_forward_misses
2184            .lock()
2185            .await
2186            .insert_if_new(hash_key.to_string());
2187    }
2188
2189    async fn take_forward_requesters(&self, hash_key: &str) -> Vec<String> {
2190        self.pending_forward_requests
2191            .write()
2192            .await
2193            .remove(hash_key)
2194            .map(|pending| pending.requester_ids.into_iter().collect())
2195            .unwrap_or_default()
2196    }
2197
2198    async fn complete_pending_response(
2199        self: &Arc<Self>,
2200        from_peer: &str,
2201        hash: &Hash,
2202        hash_key: String,
2203        payload: Vec<u8>,
2204    ) {
2205        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2206            self.release_queried_peer_requests(&pending.queried_peers)
2207                .await;
2208            let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
2209            self.peer_selector.write().await.record_success(
2210                from_peer,
2211                rtt_ms,
2212                payload.len() as u64,
2213            );
2214            let forward_requesters = self.take_forward_requesters(&hash_key).await;
2215            let response_bytes = if forward_requesters.is_empty() {
2216                None
2217            } else {
2218                Some(encode_response(&create_response(hash, payload.clone())))
2219            };
2220            let _ = pending.response_tx.send(Some(payload));
2221            if let Some(response_bytes) = response_bytes {
2222                for requester_id in forward_requesters {
2223                    Arc::clone(self)
2224                        .enqueue_response_send(requester_id, response_bytes.clone(), Instant::now())
2225                        .await;
2226                }
2227            }
2228        }
2229    }
2230
2231    async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
2232        if !res.a {
2233            return;
2234        }
2235
2236        let Some(quote_id) = res.q else {
2237            return;
2238        };
2239
2240        let hash_key = hash_to_key(&res.h);
2241        let (preferred_mint_url, offered_payment_sat) = {
2242            let pending_quotes = self.pending_quotes.read().await;
2243            let Some(pending) = pending_quotes.get(&hash_key) else {
2244                return;
2245            };
2246            (
2247                pending.preferred_mint_url.clone(),
2248                pending.offered_payment_sat,
2249            )
2250        };
2251        if !self
2252            .should_accept_quote_response(
2253                from_peer,
2254                preferred_mint_url.as_deref(),
2255                offered_payment_sat,
2256                &res,
2257            )
2258            .await
2259        {
2260            return;
2261        }
2262        let mut pending_quotes = self.pending_quotes.write().await;
2263        if let Some(pending) = pending_quotes.remove(&hash_key) {
2264            let _ = pending.response_tx.send(Some(NegotiatedQuote {
2265                peer_id: from_peer.to_string(),
2266                quote_id,
2267                mint_url: res.m,
2268            }));
2269        }
2270    }
2271
2272    async fn handle_response_message(
2273        self: &Arc<Self>,
2274        from_peer: &str,
2275        res: crate::protocol::DataResponse,
2276    ) {
2277        let hash_key = hash_to_key(&res.h);
2278        let hash = match crate::protocol::bytes_to_hash(&res.h) {
2279            Some(h) => h,
2280            None => return,
2281        };
2282
2283        // Ignore malformed/corrupt payload and keep waiting for a valid response.
2284        if hashtree_core::sha256(&res.d) != hash {
2285            self.peer_selector.write().await.record_failure(from_peer);
2286            if self.debug {
2287                println!("[MeshStoreCore] Ignoring invalid response payload for {hash_key}");
2288            }
2289            return;
2290        }
2291
2292        self.complete_pending_response(from_peer, &hash, hash_key, res.d)
2293            .await;
2294    }
2295
2296    async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
2297        let hash = match crate::protocol::bytes_to_hash(&req.h) {
2298            Some(h) => h,
2299            None => return,
2300        };
2301        let hash_key = hash_to_key(&hash);
2302
2303        {
2304            let selector = self.peer_selector.read().await;
2305            if self.should_refuse_requests_from_peer(&selector, from_peer) {
2306                if self.debug {
2307                    println!(
2308                        "[MeshStoreCore] Refusing quote request from delinquent peer {}",
2309                        from_peer
2310                    );
2311                }
2312                return;
2313            }
2314        }
2315
2316        let chosen_mint = self.choose_quote_mint(req.m.as_deref());
2317        let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
2318            && !self.should_drop_response(&hash)
2319            && !self.should_corrupt_response(&hash);
2320
2321        let res = if can_serve {
2322            let quote_id = self
2323                .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
2324                .await;
2325            create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
2326        } else {
2327            create_quote_response_unavailable(&hash)
2328        };
2329        let response_bytes = encode_quote_response(&res);
2330        if let Some(channel) = self.signaling.get_channel(from_peer).await {
2331            if channel.send(response_bytes.clone()).await.is_ok() {
2332                self.record_peer_wire_sent(from_peer, response_bytes.len() as u64)
2333                    .await;
2334            }
2335        }
2336    }
2337
2338    async fn handle_request_message(
2339        self: &Arc<Self>,
2340        from_peer: &str,
2341        req: crate::protocol::DataRequest,
2342    ) {
2343        let hash = match crate::protocol::bytes_to_hash(&req.h) {
2344            Some(h) => h,
2345            None => return,
2346        };
2347        let hash_key = hash_to_key(&hash);
2348
2349        if let Some(quote_id) = req.q {
2350            if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
2351                if self.debug {
2352                    println!(
2353                        "[MeshStoreCore] Refusing request with invalid or expired quote {} from {}",
2354                        quote_id, from_peer
2355                    );
2356                }
2357                return;
2358            }
2359        }
2360
2361        let allow_peer_forwarding = {
2362            let selector = self.peer_selector.read().await;
2363            !self.should_refuse_requests_from_peer(&selector, from_peer)
2364        };
2365
2366        // Check local store
2367        if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
2368            if self.should_drop_response(&hash) {
2369                if self.debug {
2370                    println!(
2371                        "[MeshStoreCore] Dropping response for {} due to actor profile",
2372                        hash_to_key(&hash)
2373                    );
2374                }
2375                return;
2376            }
2377
2378            let response_delay = self.response_send_delay(&hash, data.len());
2379            if self.should_corrupt_response(&hash) {
2380                if data.is_empty() {
2381                    data.push(0x80);
2382                } else {
2383                    data[0] ^= 0x80;
2384                }
2385            }
2386
2387            // Send response
2388            let res = create_response(&hash, data);
2389            let response_bytes = encode_response(&res);
2390            let ready_at = Instant::now() + response_delay;
2391            Arc::clone(self)
2392                .enqueue_response_send(from_peer.to_string(), response_bytes, ready_at)
2393                .await;
2394            return;
2395        }
2396
2397        if self.pending_requests.read().await.contains_key(&hash_key) {
2398            let _ = self.begin_forward_request(&hash_key, from_peer).await;
2399            return;
2400        }
2401
2402        if self.was_recent_forward_miss(&hash_key).await {
2403            if self.debug {
2404                println!(
2405                    "[MeshStoreCore] Suppressing recently missed forwarded request for {}",
2406                    hash_key
2407                );
2408            }
2409            return;
2410        }
2411
2412        if !self.begin_forward_request(&hash_key, from_peer).await {
2413            return;
2414        }
2415
2416        let from_peer = from_peer.to_string();
2417        let this = Arc::clone(self);
2418        let request_htl = req.htl;
2419        tokio::spawn(async move {
2420            let result = if allow_peer_forwarding {
2421                let context = MeshReadContext {
2422                    exclude_peer_id: Some(from_peer.clone()),
2423                    request_htl,
2424                };
2425                this.request_from_mesh_with_context(&hash, &context).await
2426            } else {
2427                if this.debug {
2428                    println!(
2429                        "[MeshStoreCore] Serving request from delinquent peer {} via read sources only",
2430                        from_peer
2431                    );
2432                }
2433                match this.request_from_read_sources(&hash).await {
2434                    RouteFetchOutcome::Hit(data) => Some(data),
2435                    RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
2436                }
2437            };
2438            let requester_ids = this.take_forward_requesters(&hash_key).await;
2439            if let Some(data) = result {
2440                let ready_at = Instant::now() + this.response_send_delay(&hash, data.len());
2441                let res = create_response(&hash, data);
2442                let response_bytes = encode_response(&res);
2443                for requester_id in requester_ids {
2444                    Arc::clone(&this)
2445                        .enqueue_response_send(requester_id, response_bytes.clone(), ready_at)
2446                        .await;
2447                }
2448            } else {
2449                this.mark_recent_forward_miss(&hash_key).await;
2450            }
2451        });
2452    }
2453
2454    /// Handle incoming data message
2455    pub async fn handle_data_message(self: &Arc<Self>, from_peer: &str, data: &[u8]) {
2456        self.record_peer_wire_received(from_peer, data.len() as u64)
2457            .await;
2458        let parsed = match parse_message(data) {
2459            Some(m) => m,
2460            None => return,
2461        };
2462
2463        match parsed {
2464            DataMessage::Request(req) => {
2465                self.handle_request_message(from_peer, req).await;
2466            }
2467            DataMessage::Response(res) => {
2468                self.handle_response_message(from_peer, res).await;
2469            }
2470            DataMessage::QuoteRequest(req) => {
2471                self.handle_quote_request_message(from_peer, req).await;
2472            }
2473            DataMessage::QuoteResponse(res) => {
2474                self.handle_quote_response_message(from_peer, res).await;
2475            }
2476            DataMessage::Payment(_)
2477            | DataMessage::PaymentAck(_)
2478            | DataMessage::Chunk(_)
2479            | DataMessage::PeerHints(_) => {}
2480        }
2481    }
2482}
2483
2484#[async_trait]
2485impl<S, R, F> Store for MeshStoreCore<S, R, F>
2486where
2487    S: Store + Send + Sync + 'static,
2488    R: SignalingTransport + Send + Sync + 'static,
2489    F: PeerLinkFactory + Send + Sync + 'static,
2490{
2491    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2492        self.local_store.put(hash, data).await
2493    }
2494
2495    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2496        // Try local first
2497        if let Some(data) = self.local_store.get(hash).await? {
2498            return Ok(Some(data));
2499        }
2500
2501        // Try peers
2502        Ok(self.request_from_mesh(hash).await)
2503    }
2504
2505    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2506        self.local_store.has(hash).await
2507    }
2508
2509    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2510        self.local_store.delete(hash).await
2511    }
2512}
2513
2514#[cfg(test)]
2515mod tests;
2516
2517/// Type alias for simulation store.
2518pub type SimMeshStore<S> =
2519    MeshStoreCore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
2520
2521/// Type alias for the default production core (Nostr signaling + WebRTC links).
2522pub type ProductionMeshStore<S> =
2523    MeshStoreCore<S, crate::nostr::NostrRelayTransport, crate::real_factory::WebRtcPeerLinkFactory>;