Skip to main content

hashtree_network/
mesh_store_core.rs

1//! Shared routed mesh store core.
2//!
3//! This module provides a concrete store wrapper that works with any local storage
4//! backend plus any signaling transport and peer-link factory. Both production
5//! (Nostr websockets + WebRTC) and simulation (mocks) use this same code.
6
7use async_trait::async_trait;
8use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
9use std::collections::hash_map::DefaultHasher;
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::hash::{Hash as _, Hasher};
13use std::ops::Range;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{oneshot, Mutex, RwLock};
18use tokio::time::Instant;
19
20use hashtree_core::{Hash, Store, StoreError};
21
22use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
23use crate::protocol::{
24    create_quote_request, create_quote_response_available, create_quote_response_unavailable,
25    create_request, create_request_with_quote, create_response, encode_quote_request,
26    encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
27    DataMessage, DataQuoteRequest, DataQuoteResponse,
28};
29use crate::signaling::MeshRouter;
30use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
31use crate::types::{should_forward_htl, PeerHTLConfig, SignalingMessage, TimedSeenSet, MAX_HTL};
32
33// Keep the on-disk namespace stable across the crate rename so existing peer
34// metadata does not disappear for users upgrading from the old package name.
35const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
36const RECENT_FORWARD_MISS_CAPACITY: usize = 4096;
37const MIN_RECENT_FORWARD_MISS_TTL_MS: u64 = 250;
38
39/// Pending request awaiting response
40struct PendingRequest {
41    response_tx: oneshot::Sender<Option<Vec<u8>>>,
42    started_at: Instant,
43    queried_peers: Vec<String>,
44}
45
46struct PendingQuoteRequest {
47    response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
48    preferred_mint_url: Option<String>,
49    offered_payment_sat: u64,
50}
51
52struct PendingForwardRequest {
53    requester_ids: HashSet<String>,
54}
55
56#[derive(Debug, Clone, Default)]
57struct PeerWireStats {
58    bytes_sent: u64,
59    bytes_received: u64,
60    bandwidth_debt: f64,
61}
62
63struct PendingResponseSend {
64    job_id: u64,
65    peer_id: String,
66    bytes: Vec<u8>,
67    ready_at: Instant,
68    queue_sequence: u64,
69}
70
71#[async_trait]
72pub trait MeshReadSource: Send + Sync {
73    fn id(&self) -> &str;
74
75    fn is_available(&self) -> bool {
76        true
77    }
78
79    async fn get(&self, hash: &Hash) -> Option<Vec<u8>>;
80}
81
82#[derive(Debug, Clone)]
83struct NegotiatedQuote {
84    peer_id: String,
85    quote_id: u64,
86    #[allow(dead_code)]
87    mint_url: Option<String>,
88}
89
90struct IssuedQuote {
91    expires_at: Instant,
92    #[allow(dead_code)]
93    payment_sat: u64,
94    #[allow(dead_code)]
95    mint_url: Option<String>,
96}
97
98#[derive(Debug, Clone, Default)]
99struct AdaptiveSourceStats {
100    requests: u64,
101    successes: u64,
102    misses: u64,
103    failures: u64,
104    timeouts: u64,
105    srtt_ms: f64,
106    rttvar_ms: f64,
107    backoff_level: u32,
108    backed_off_until: Option<Instant>,
109    last_success_at: Option<Instant>,
110    last_failure_at: Option<Instant>,
111}
112
113#[derive(Debug, Clone)]
114enum RouteFetchOutcome {
115    Hit(Vec<u8>),
116    Miss,
117    Timeout,
118}
119
120struct InflightSourceFetch {
121    waiters: Vec<oneshot::Sender<RouteFetchOutcome>>,
122}
123
124enum SourceFetchOutcome {
125    Hit {
126        source_id: String,
127        data: Vec<u8>,
128        elapsed_ms: u64,
129    },
130    Miss {
131        source_id: String,
132    },
133    Failure {
134        source_id: String,
135    },
136}
137
138const INITIAL_SOURCE_BACKOFF_MS: u64 = 250;
139const MAX_SOURCE_BACKOFF_MS: u64 = 10_000;
140const SOURCE_SCORE_TIE_DELTA: f64 = 0.15;
141const RECENT_SOURCE_SUCCESS_WINDOW: Duration = Duration::from_secs(60);
142const ACTIVE_PEER_REQUEST_RANK_PENALTY: usize = 3;
143
144fn source_reliability_score(stats: &AdaptiveSourceStats) -> f64 {
145    (stats.successes as f64 + 1.0) / (stats.requests as f64 + 2.0)
146}
147
148fn source_latency_score(stats: &AdaptiveSourceStats) -> f64 {
149    if stats.srtt_ms <= 0.0 {
150        return 0.5;
151    }
152    (500.0 / (stats.srtt_ms + 50.0)).min(1.0)
153}
154
155fn source_has_history(stats: &AdaptiveSourceStats) -> bool {
156    stats.requests > 0
157        || stats.successes > 0
158        || stats.misses > 0
159        || stats.failures > 0
160        || stats.timeouts > 0
161}
162
163fn adaptive_source_score(stats: &AdaptiveSourceStats, now: Instant) -> f64 {
164    if let Some(backed_off_until) = stats.backed_off_until {
165        if backed_off_until > now {
166            return f64::NEG_INFINITY;
167        }
168    }
169
170    let miss_penalty = if stats.requests > 0 {
171        (stats.misses as f64 / stats.requests as f64) * 0.15
172    } else {
173        0.0
174    };
175    let failure_penalty = if stats.requests > 0 {
176        ((stats.failures + stats.timeouts) as f64 / stats.requests as f64) * 0.3
177    } else {
178        0.0
179    };
180    let recency_bonus = if stats
181        .last_success_at
182        .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
183    {
184        0.1
185    } else {
186        0.0
187    };
188
189    0.6 * source_reliability_score(stats) + 0.3 * source_latency_score(stats) + recency_bonus
190        - miss_penalty
191        - failure_penalty
192}
193
194fn peer_endpoint_has_history(stats: &crate::peer_selector::PeerStats) -> bool {
195    stats.requests_sent > 0 || stats.successes > 0 || stats.failures > 0 || stats.timeouts > 0
196}
197
198fn peer_endpoint_score(stats: &crate::peer_selector::PeerStats, now: Instant) -> f64 {
199    if stats.backed_off_until.is_some_and(|until| until > now) {
200        return f64::NEG_INFINITY;
201    }
202
203    let miss_penalty = 0.0;
204    let failure_penalty = if stats.requests_sent > 0 {
205        ((stats.failures + stats.timeouts) as f64 / stats.requests_sent as f64) * 0.3
206    } else {
207        0.0
208    };
209    let recency_bonus = if stats
210        .last_success
211        .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
212    {
213        0.1
214    } else {
215        0.0
216    };
217
218    0.6 * stats.success_rate()
219        + 0.3
220            * source_latency_score(&AdaptiveSourceStats {
221                srtt_ms: stats.srtt_ms,
222                ..AdaptiveSourceStats::default()
223            })
224        + recency_bonus
225        - miss_penalty
226        - failure_penalty
227}
228
229#[derive(Clone)]
230enum ReadRoute {
231    Peers(Vec<String>),
232    Sources,
233}
234
235impl ReadRoute {
236    fn id(&self) -> &'static str {
237        match self {
238            Self::Peers(_) => "peers",
239            Self::Sources => "sources",
240        }
241    }
242}
243
244struct RankedReadRoute {
245    route: ReadRoute,
246    best_endpoint_id: String,
247    score: f64,
248    has_history: bool,
249}
250
251fn ranked_route_kind(route: &ReadRoute) -> u8 {
252    match route {
253        ReadRoute::Sources => 0,
254        ReadRoute::Peers(_) => 1,
255    }
256}
257
258#[derive(Debug, Clone)]
259struct MeshReadContext {
260    exclude_peer_id: Option<String>,
261    request_htl: u8,
262}
263
264impl Default for MeshReadContext {
265    fn default() -> Self {
266        Self {
267            exclude_peer_id: None,
268            request_htl: MAX_HTL,
269        }
270    }
271}
272
273/// Aggregate stats from draining currently available peer-link messages.
274#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
275pub struct DataPumpStats {
276    pub processed: usize,
277    pub request_messages: usize,
278    pub response_messages: usize,
279    pub quote_request_messages: u64,
280    pub quote_response_messages: u64,
281    pub processed_bytes: u64,
282}
283
284/// Request dispatch strategy for peer queries.
285///
286/// `MeshStoreCore` supports two practical retrieval modes:
287/// - Flood (`usize::MAX` fanout): maximize success/latency at bandwidth cost.
288/// - Staged hedging: probe a subset first, then expand.
289#[derive(Debug, Clone, Copy)]
290pub struct RequestDispatchConfig {
291    /// Number of peers queried immediately.
292    pub initial_fanout: usize,
293    /// Number of additional peers to query on each hedge step.
294    pub hedge_fanout: usize,
295    /// Total peers allowed for this request.
296    pub max_fanout: usize,
297    /// Delay between hedge waves (ms). `0` means send all waves immediately.
298    pub hedge_interval_ms: u64,
299}
300
301impl Default for RequestDispatchConfig {
302    fn default() -> Self {
303        Self {
304            initial_fanout: usize::MAX,
305            hedge_fanout: usize::MAX,
306            max_fanout: usize::MAX,
307            hedge_interval_ms: 0,
308        }
309    }
310}
311
312/// Normalize fanout config against current peer availability.
313pub fn normalize_dispatch_config(
314    dispatch: RequestDispatchConfig,
315    available_peers: usize,
316) -> RequestDispatchConfig {
317    let mut cfg = dispatch;
318    let cap = if cfg.max_fanout == 0 {
319        available_peers
320    } else {
321        cfg.max_fanout.min(available_peers)
322    };
323    cfg.max_fanout = cap;
324    cfg.initial_fanout = if cfg.initial_fanout == 0 {
325        1
326    } else {
327        cfg.initial_fanout.min(cap.max(1))
328    };
329    cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
330        1
331    } else {
332        cfg.hedge_fanout.min(cap.max(1))
333    };
334    cfg
335}
336
337/// Build wave sizes for staged hedged dispatch.
338pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
339    if peer_count == 0 {
340        return Vec::new();
341    }
342    let cap = dispatch.max_fanout.min(peer_count);
343    if cap == 0 {
344        return Vec::new();
345    }
346
347    let mut plan = Vec::new();
348    let mut sent = 0usize;
349    let first = dispatch.initial_fanout.min(cap).max(1);
350    plan.push(first);
351    sent += first;
352
353    while sent < cap {
354        let next = dispatch.hedge_fanout.min(cap - sent).max(1);
355        plan.push(next);
356        sent += next;
357    }
358    plan
359}
360
361/// Outcome returned after waiting on a hedged dispatch wave.
362#[derive(Debug)]
363pub enum HedgedWaveAction<T> {
364    Continue,
365    Success(T),
366    Abort,
367}
368
369/// Run a staged hedged dispatch over peer index ranges.
370///
371/// This scheduler is shared by the reusable `MeshStoreCore` and the native
372/// `hashtree-cli` mesh path so tests and production use the same wave timing.
373pub async fn run_hedged_waves<T, SendWave, SendWaveFut, WaitWave, WaitWaveFut>(
374    peer_count: usize,
375    dispatch: RequestDispatchConfig,
376    request_timeout: Duration,
377    mut send_wave: SendWave,
378    mut wait_wave: WaitWave,
379) -> Option<T>
380where
381    SendWave: FnMut(Range<usize>) -> SendWaveFut,
382    SendWaveFut: Future<Output = usize>,
383    WaitWave: FnMut(Duration) -> WaitWaveFut,
384    WaitWaveFut: Future<Output = HedgedWaveAction<T>>,
385{
386    let dispatch = normalize_dispatch_config(dispatch, peer_count);
387    let wave_plan = build_hedged_wave_plan(peer_count, dispatch);
388    if wave_plan.is_empty() {
389        return None;
390    }
391
392    let deadline = Instant::now() + request_timeout;
393    let mut sent_total = 0usize;
394    let mut next_peer_idx = 0usize;
395
396    for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
397        let from = next_peer_idx;
398        let to = (next_peer_idx + wave_size).min(peer_count);
399        next_peer_idx = to;
400
401        if from == to {
402            continue;
403        }
404
405        sent_total += send_wave(from..to).await;
406        if sent_total == 0 {
407            if next_peer_idx >= peer_count {
408                break;
409            }
410            continue;
411        }
412
413        let now = Instant::now();
414        if now >= deadline {
415            break;
416        }
417        let remaining = deadline.saturating_duration_since(now);
418        let is_last_wave = wave_idx + 1 == wave_plan.len() || next_peer_idx >= peer_count;
419        let wait = if is_last_wave {
420            remaining
421        } else if dispatch.hedge_interval_ms == 0 {
422            Duration::ZERO
423        } else {
424            Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
425        };
426
427        if wait.is_zero() {
428            continue;
429        }
430
431        match wait_wave(wait).await {
432            HedgedWaveAction::Continue => {}
433            HedgedWaveAction::Success(value) => return Some(value),
434            HedgedWaveAction::Abort => break,
435        }
436    }
437
438    None
439}
440
441/// Keep selector membership aligned with currently connected peer IDs.
442pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
443    let mut selector = selector.write().await;
444    let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
445    let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
446    for peer_id in known {
447        if !current.contains(peer_id.as_str()) {
448            selector.remove_peer(&peer_id);
449        }
450    }
451    for peer_id in current_peer_ids {
452        selector.add_peer(peer_id.clone());
453    }
454}
455
456/// Response behavior profile for simulation/game-theory actors.
457///
458/// Defaults to honest behavior (always respond correctly, no extra delay).
459#[derive(Debug, Clone, Copy)]
460pub struct ResponseBehaviorConfig {
461    /// Probability that a node drops a response even when it has data.
462    pub drop_response_prob: f64,
463    /// Probability that a node responds with corrupted payload.
464    pub corrupt_response_prob: f64,
465    /// Baseline response delay before a peer starts sending any data.
466    pub extra_delay_ms: u64,
467    /// Additional delay before the first response byte becomes available.
468    pub first_byte_delay_ms: u64,
469    /// Sustained throughput for delivering large payloads. `0` disables size-based slowdown.
470    pub bytes_per_second: u64,
471    /// Probability that an otherwise honest response experiences an extra stall.
472    pub stall_response_prob: f64,
473    /// Extra delay injected when a stall event happens.
474    pub stall_delay_ms: u64,
475}
476
477impl Default for ResponseBehaviorConfig {
478    fn default() -> Self {
479        Self {
480            drop_response_prob: 0.0,
481            corrupt_response_prob: 0.0,
482            extra_delay_ms: 0,
483            first_byte_delay_ms: 0,
484            bytes_per_second: 0,
485            stall_response_prob: 0.0,
486            stall_delay_ms: 0,
487        }
488    }
489}
490
491impl ResponseBehaviorConfig {
492    fn normalized(self) -> Self {
493        Self {
494            drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
495            corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
496            extra_delay_ms: self.extra_delay_ms,
497            first_byte_delay_ms: self.first_byte_delay_ms,
498            bytes_per_second: self.bytes_per_second,
499            stall_response_prob: self.stall_response_prob.clamp(0.0, 1.0),
500            stall_delay_ms: self.stall_delay_ms,
501        }
502    }
503}
504
505/// Routing policy for request ordering + dispatch fanout.
506#[derive(Debug, Clone)]
507pub struct MeshRoutingConfig {
508    pub selection_strategy: SelectionStrategy,
509    pub fairness_enabled: bool,
510    /// Blend weight for payment-priority ranking in selector (`0.0` disables).
511    pub cashu_payment_weight: f64,
512    /// Refuse serving peers that have reached this many unpaid post-delivery settlements.
513    /// `0` disables refusal and only keeps metadata/downranking.
514    pub cashu_payment_default_block_threshold: u64,
515    /// Cashu mint URLs this node is willing to use for settlement.
516    pub cashu_accepted_mints: Vec<String>,
517    /// Preferred Cashu mint URL when initiating paid retrieval.
518    pub cashu_default_mint: Option<String>,
519    /// Baseline cap for accepting a peer-suggested mint outside the trusted set.
520    pub cashu_peer_suggested_mint_base_cap_sat: u64,
521    /// Additional sats allowed per successful delivery from that peer.
522    pub cashu_peer_suggested_mint_success_step_sat: u64,
523    /// Additional sats allowed per successful post-delivery payment received from that peer.
524    pub cashu_peer_suggested_mint_receipt_step_sat: u64,
525    /// Hard upper bound for any single peer-suggested mint quote we accept.
526    pub cashu_peer_suggested_mint_max_cap_sat: u64,
527    pub dispatch: RequestDispatchConfig,
528    pub response_behavior: ResponseBehaviorConfig,
529}
530
531impl Default for MeshRoutingConfig {
532    fn default() -> Self {
533        Self {
534            selection_strategy: SelectionStrategy::Weighted,
535            fairness_enabled: true,
536            cashu_payment_weight: 0.0,
537            cashu_payment_default_block_threshold: 0,
538            cashu_accepted_mints: Vec::new(),
539            cashu_default_mint: None,
540            cashu_peer_suggested_mint_base_cap_sat: 0,
541            cashu_peer_suggested_mint_success_step_sat: 0,
542            cashu_peer_suggested_mint_receipt_step_sat: 0,
543            cashu_peer_suggested_mint_max_cap_sat: 0,
544            dispatch: RequestDispatchConfig::default(),
545            response_behavior: ResponseBehaviorConfig::default(),
546        }
547    }
548}
549
550/// Routed mesh store core that works with any storage backend and transport
551/// implementation.
552///
553/// This is the shared code between production and simulation.
554/// - Production: `MeshStoreCore<LmdbStore, NostrRelayTransport, WebRtcPeerLinkFactory>`
555/// - Simulation: `MeshStoreCore<MemoryStore, MockRelayTransport, MockConnectionFactory>`
556pub struct MeshStoreCore<S, R, F>
557where
558    S: Store + Send + Sync + 'static,
559    R: SignalingTransport + Send + Sync + 'static,
560    F: PeerLinkFactory + Send + Sync + 'static,
561{
562    /// Local backing store
563    local_store: Arc<S>,
564    /// Mesh router (handles peer discovery and connection)
565    signaling: Arc<MeshRouter<R, F>>,
566    /// Per-peer HTL config
567    htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
568    /// Pending requests we sent
569    pending_requests: RwLock<HashMap<String, PendingRequest>>,
570    /// Pending quote negotiations keyed by requested hash.
571    pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
572    /// Forwarded peer requests currently being resolved through the mesh/upstream.
573    pending_forward_requests: RwLock<HashMap<String, PendingForwardRequest>>,
574    /// Bounded negative cache for recently forwarded misses/timeouts.
575    recent_forward_misses: Mutex<TimedSeenSet>,
576    /// Quotes we issued to peers and will accept exactly once until expiry.
577    issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
578    /// Monotonic quote identifier generator.
579    next_quote_id: RwLock<u64>,
580    /// Non-peer read sources such as upstream Blossom servers.
581    read_sources: RwLock<HashMap<String, Arc<dyn MeshReadSource>>>,
582    /// Adaptive health stats for non-peer read sources.
583    read_source_stats: RwLock<HashMap<String, AdaptiveSourceStats>>,
584    /// Shared in-flight upstream reads keyed by hash.
585    inflight_source_fetches: Mutex<HashMap<String, InflightSourceFetch>>,
586    /// Adaptive selector for peer ordering.
587    peer_selector: RwLock<PeerSelector>,
588    /// Active per-peer in-flight reads so concurrent block fetches spread across peers.
589    peer_active_requests: RwLock<HashMap<String, usize>>,
590    /// Actual wire traffic stats used for upload-side reciprocity scheduling.
591    peer_wire_stats: RwLock<HashMap<String, PeerWireStats>>,
592    /// Pending content responses waiting for upload arbitration.
593    pending_response_sends: Mutex<Vec<PendingResponseSend>>,
594    /// Upload response scheduler state.
595    response_scheduler_running: AtomicBool,
596    /// Monotonic id for queued response sends.
597    next_response_job_id: AtomicU64,
598    /// Routing/dispatch configuration.
599    routing: MeshRoutingConfig,
600    /// Request timeout
601    request_timeout: Duration,
602    /// Debug mode
603    debug: bool,
604    /// Running flag
605    running: RwLock<bool>,
606}
607
608impl<S, R, F> MeshStoreCore<S, R, F>
609where
610    S: Store + Send + Sync + 'static,
611    R: SignalingTransport + Send + Sync + 'static,
612    F: PeerLinkFactory + Send + Sync + 'static,
613{
614    /// Create a new routed mesh store core.
615    pub fn new(
616        local_store: Arc<S>,
617        signaling: Arc<MeshRouter<R, F>>,
618        request_timeout: Duration,
619        debug: bool,
620    ) -> Self {
621        Self::new_with_routing(
622            local_store,
623            signaling,
624            request_timeout,
625            debug,
626            Default::default(),
627        )
628    }
629
630    /// Create a new routed mesh store core with explicit routing configuration.
631    pub fn new_with_routing(
632        local_store: Arc<S>,
633        signaling: Arc<MeshRouter<R, F>>,
634        request_timeout: Duration,
635        debug: bool,
636        routing: MeshRoutingConfig,
637    ) -> Self {
638        let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
639        selector.set_fairness(routing.fairness_enabled);
640        selector.set_cashu_payment_weight(routing.cashu_payment_weight);
641        Self {
642            local_store,
643            signaling,
644            htl_configs: RwLock::new(HashMap::new()),
645            pending_requests: RwLock::new(HashMap::new()),
646            pending_quotes: RwLock::new(HashMap::new()),
647            pending_forward_requests: RwLock::new(HashMap::new()),
648            recent_forward_misses: Mutex::new(TimedSeenSet::new(
649                RECENT_FORWARD_MISS_CAPACITY,
650                Self::recent_forward_miss_ttl(request_timeout),
651            )),
652            issued_quotes: RwLock::new(HashMap::new()),
653            next_quote_id: RwLock::new(1),
654            read_sources: RwLock::new(HashMap::new()),
655            read_source_stats: RwLock::new(HashMap::new()),
656            inflight_source_fetches: Mutex::new(HashMap::new()),
657            peer_selector: RwLock::new(selector),
658            peer_active_requests: RwLock::new(HashMap::new()),
659            peer_wire_stats: RwLock::new(HashMap::new()),
660            pending_response_sends: Mutex::new(Vec::new()),
661            response_scheduler_running: AtomicBool::new(false),
662            next_response_job_id: AtomicU64::new(1),
663            routing,
664            request_timeout,
665            debug,
666            running: RwLock::new(false),
667        }
668    }
669
670    fn recent_forward_miss_ttl(request_timeout: Duration) -> Duration {
671        let ttl_ms = request_timeout
672            .as_millis()
673            .saturating_mul(2)
674            .max(MIN_RECENT_FORWARD_MISS_TTL_MS as u128)
675            .min(u64::MAX as u128) as u64;
676        Duration::from_millis(ttl_ms)
677    }
678
679    /// Start the store (begin listening for messages)
680    pub async fn start(&self) -> Result<(), TransportError> {
681        *self.running.write().await = true;
682
683        // Send initial hello
684        self.signaling.send_hello(vec![]).await?;
685
686        Ok(())
687    }
688
689    /// Stop the store
690    pub async fn stop(&self) {
691        *self.running.write().await = false;
692    }
693
694    /// Process incoming signaling message
695    pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
696        // When a new peer connects, initialize their HTL config
697        let peer_id = msg.peer_id().to_string();
698        {
699            let mut configs = self.htl_configs.write().await;
700            if !configs.contains_key(&peer_id) {
701                configs.insert(peer_id.clone(), PeerHTLConfig::random());
702            }
703        }
704        self.peer_selector.write().await.add_peer(peer_id);
705
706        self.signaling.handle_message(msg).await
707    }
708
709    /// Get signaling manager reference
710    pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
711        &self.signaling
712    }
713
714    fn response_behavior(&self) -> ResponseBehaviorConfig {
715        self.routing.response_behavior.normalized()
716    }
717
718    async fn record_peer_wire_sent(&self, peer_id: &str, bytes: u64) {
719        if bytes == 0 {
720            return;
721        }
722        let mut stats = self.peer_wire_stats.write().await;
723        let entry = stats.entry(peer_id.to_string()).or_default();
724        entry.bytes_sent = entry.bytes_sent.saturating_add(bytes);
725    }
726
727    async fn record_peer_wire_received(&self, peer_id: &str, bytes: u64) {
728        if bytes == 0 {
729            return;
730        }
731        let mut stats = self.peer_wire_stats.write().await;
732        let entry = stats.entry(peer_id.to_string()).or_default();
733        entry.bytes_received = entry.bytes_received.saturating_add(bytes);
734    }
735
736    fn peer_upload_weight(stats: &PeerWireStats) -> f64 {
737        let raw_ratio = (stats.bytes_received.saturating_add(1024) as f64)
738            / (stats.bytes_sent.saturating_add(1024) as f64);
739        let bounded_ratio = raw_ratio / (1.0 + raw_ratio);
740        0.5 + 1.5 * bounded_ratio
741    }
742
743    fn choose_ready_response_job(
744        ready_jobs: &[(u64, String, usize, Instant, u64)],
745        stats: &HashMap<String, PeerWireStats>,
746    ) -> Option<(u64, f64)> {
747        ready_jobs
748            .iter()
749            .map(|job| {
750                let peer_stats = stats.get(&job.1).cloned().unwrap_or_default();
751                let finish = peer_stats.bandwidth_debt
752                    + (job.2 as f64 / Self::peer_upload_weight(&peer_stats));
753                (job.0, job.1.as_str(), job.4, finish)
754            })
755            .min_by(|left, right| {
756                left.3
757                    .partial_cmp(&right.3)
758                    .unwrap_or(std::cmp::Ordering::Equal)
759                    .then_with(|| left.2.cmp(&right.2))
760                    .then_with(|| left.1.cmp(right.1))
761            })
762            .map(|choice| (choice.0, choice.3))
763    }
764
765    async fn enqueue_response_send(
766        self: &Arc<Self>,
767        peer_id: String,
768        bytes: Vec<u8>,
769        ready_at: Instant,
770    ) {
771        let job_id = self.next_response_job_id.fetch_add(1, Ordering::Relaxed);
772        {
773            let mut queue = self.pending_response_sends.lock().await;
774            queue.push(PendingResponseSend {
775                job_id,
776                peer_id,
777                bytes,
778                ready_at,
779                queue_sequence: job_id,
780            });
781        }
782
783        if self
784            .response_scheduler_running
785            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
786            .is_ok()
787        {
788            let this = Arc::clone(self);
789            tokio::spawn(async move {
790                this.run_response_scheduler().await;
791            });
792        }
793    }
794
795    async fn run_response_scheduler(self: Arc<Self>) {
796        loop {
797            let snapshot = {
798                let queue = self.pending_response_sends.lock().await;
799                if queue.is_empty() {
800                    self.response_scheduler_running
801                        .store(false, Ordering::Release);
802                    return;
803                }
804                queue
805                    .iter()
806                    .map(|job| {
807                        (
808                            job.job_id,
809                            job.peer_id.clone(),
810                            job.bytes.len(),
811                            job.ready_at,
812                            job.queue_sequence,
813                        )
814                    })
815                    .collect::<Vec<_>>()
816            };
817
818            let now = Instant::now();
819            let mut earliest_ready_at: Option<Instant> = None;
820            let mut ready_jobs = Vec::new();
821            for job in &snapshot {
822                if job.3 <= now {
823                    ready_jobs.push(job.clone());
824                } else {
825                    earliest_ready_at = Some(match earliest_ready_at {
826                        Some(current) => current.min(job.3),
827                        None => job.3,
828                    });
829                }
830            }
831
832            if ready_jobs.is_empty() {
833                if let Some(ready_at) = earliest_ready_at {
834                    tokio::time::sleep(ready_at.saturating_duration_since(Instant::now())).await;
835                    continue;
836                }
837                self.response_scheduler_running
838                    .store(false, Ordering::Release);
839                return;
840            }
841
842            let (selected_job_id, selected_finish) = {
843                let stats = self.peer_wire_stats.read().await;
844                Self::choose_ready_response_job(&ready_jobs, &stats).expect("ready response job")
845            };
846
847            let selected = {
848                let mut queue = self.pending_response_sends.lock().await;
849                let Some(index) = queue.iter().position(|job| job.job_id == selected_job_id) else {
850                    continue;
851                };
852                queue.swap_remove(index)
853            };
854
855            let sent = if let Some(channel) = self.signaling.get_channel(&selected.peer_id).await {
856                channel.send(selected.bytes.clone()).await.is_ok()
857            } else {
858                false
859            };
860
861            let queued_peers = {
862                let queue = self.pending_response_sends.lock().await;
863                queue
864                    .iter()
865                    .map(|job| job.peer_id.clone())
866                    .collect::<HashSet<_>>()
867            };
868            let mut stats = self.peer_wire_stats.write().await;
869            let entry = stats.entry(selected.peer_id.clone()).or_default();
870            if sent {
871                entry.bytes_sent = entry.bytes_sent.saturating_add(selected.bytes.len() as u64);
872                entry.bandwidth_debt = selected_finish;
873            }
874            if queued_peers.is_empty() {
875                for peer_stats in stats.values_mut() {
876                    peer_stats.bandwidth_debt = 0.0;
877                }
878            } else {
879                let floor = queued_peers
880                    .iter()
881                    .filter_map(|peer_id| stats.get(peer_id).map(|peer| peer.bandwidth_debt))
882                    .fold(f64::INFINITY, f64::min);
883                if floor.is_finite() && floor > 0.0 {
884                    for peer_id in queued_peers {
885                        if let Some(peer_stats) = stats.get_mut(&peer_id) {
886                            peer_stats.bandwidth_debt =
887                                (peer_stats.bandwidth_debt - floor).max(0.0);
888                        }
889                    }
890                }
891            }
892        }
893    }
894
895    fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
896        let mut hasher = DefaultHasher::new();
897        peer_id.hash(&mut hasher);
898        hash.hash(&mut hasher);
899        salt.hash(&mut hasher);
900        let v = hasher.finish();
901        (v as f64) / (u64::MAX as f64)
902    }
903
904    fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
905        Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
906    }
907
908    fn peer_metadata_pointer_slot_hash() -> Hash {
909        hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
910    }
911
912    fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
913        let bytes = hex::decode(hash_hex)
914            .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
915        if bytes.len() != 32 {
916            return Err(StoreError::Other(format!(
917                "Invalid hash length {}, expected 32 bytes",
918                bytes.len()
919            )));
920        }
921        let mut hash = [0u8; 32];
922        hash.copy_from_slice(&bytes);
923        Ok(hash)
924    }
925
926    fn should_drop_response(&self, hash: &Hash) -> bool {
927        let p = self.response_behavior().drop_response_prob;
928        if p <= 0.0 {
929            return false;
930        }
931        self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
932    }
933
934    fn should_corrupt_response(&self, hash: &Hash) -> bool {
935        let p = self.response_behavior().corrupt_response_prob;
936        if p <= 0.0 {
937            return false;
938        }
939        self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
940    }
941
942    fn should_stall_response(&self, hash: &Hash) -> bool {
943        let p = self.response_behavior().stall_response_prob;
944        if p <= 0.0 {
945            return false;
946        }
947        self.deterministic_actor_draw(hash, 0x5A_11_5A_11_5A_11_5A_11) < p
948    }
949
950    fn response_send_delay(&self, hash: &Hash, payload_len: usize) -> Duration {
951        let behavior = self.response_behavior();
952        let mut total_ms = behavior
953            .extra_delay_ms
954            .saturating_add(behavior.first_byte_delay_ms);
955
956        if behavior.bytes_per_second > 0 && payload_len > 0 {
957            let throughput_ms = ((payload_len as u128) * 1000)
958                .div_ceil(behavior.bytes_per_second as u128)
959                .min(u64::MAX as u128) as u64;
960            total_ms = total_ms.saturating_add(throughput_ms);
961        }
962
963        if behavior.stall_delay_ms > 0 && self.should_stall_response(hash) {
964            total_ms = total_ms.saturating_add(behavior.stall_delay_ms);
965        }
966
967        Duration::from_millis(total_ms)
968    }
969
970    async fn ordered_connected_peers(&self, exclude_peer_id: Option<&str>) -> Vec<String> {
971        let current_peer_ids = self.signaling.peer_ids().await;
972        if current_peer_ids.is_empty() {
973            return Vec::new();
974        }
975
976        sync_selector_peers(&self.peer_selector, &current_peer_ids).await;
977        let hash_get_peer_ids: HashSet<String> = self
978            .signaling
979            .hash_get_peer_ids()
980            .await
981            .into_iter()
982            .collect();
983        let mut candidate_peer_ids: Vec<String> = current_peer_ids
984            .into_iter()
985            .filter(|peer_id| hash_get_peer_ids.contains(peer_id))
986            .filter(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude))
987            .collect();
988        if candidate_peer_ids.is_empty() {
989            return Vec::new();
990        }
991
992        let current_set: HashSet<&str> = candidate_peer_ids.iter().map(String::as_str).collect();
993        let mut selector = self.peer_selector.write().await;
994        let mut selector_order = selector.select_peers();
995        selector_order.retain(|peer_id| current_set.contains(peer_id.as_str()));
996        if selector_order.is_empty() {
997            let mut fallback = candidate_peer_ids;
998            fallback.sort();
999            return fallback;
1000        }
1001        let backed_off: HashMap<String, bool> = candidate_peer_ids
1002            .iter()
1003            .map(|peer_id| (peer_id.clone(), selector.is_peer_backed_off(peer_id)))
1004            .collect();
1005        drop(selector);
1006
1007        let rank: HashMap<&str, usize> = selector_order
1008            .iter()
1009            .enumerate()
1010            .map(|(idx, peer_id)| (peer_id.as_str(), idx))
1011            .collect();
1012        let active = self.peer_active_requests.read().await;
1013        candidate_peer_ids.sort_by(|left, right| {
1014            let left_backed_off = backed_off.get(left).copied().unwrap_or(false);
1015            let right_backed_off = backed_off.get(right).copied().unwrap_or(false);
1016            if left_backed_off != right_backed_off {
1017                return if left_backed_off {
1018                    std::cmp::Ordering::Greater
1019                } else {
1020                    std::cmp::Ordering::Less
1021                };
1022            }
1023            let left_rank = rank.get(left.as_str()).copied().unwrap_or(usize::MAX / 2);
1024            let right_rank = rank.get(right.as_str()).copied().unwrap_or(usize::MAX / 2);
1025            let left_load = active.get(left).copied().unwrap_or(0);
1026            let right_load = active.get(right).copied().unwrap_or(0);
1027            (left_rank + left_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY))
1028                .cmp(&(right_rank + right_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY)))
1029                .then_with(|| left.cmp(right))
1030        });
1031        candidate_peer_ids
1032    }
1033
1034    async fn reserve_peer_request(&self, peer_id: &str) {
1035        let mut active = self.peer_active_requests.write().await;
1036        *active.entry(peer_id.to_string()).or_insert(0) += 1;
1037    }
1038
1039    async fn release_peer_request(&self, peer_id: &str) {
1040        let mut active = self.peer_active_requests.write().await;
1041        let Some(count) = active.get_mut(peer_id) else {
1042            return;
1043        };
1044        if *count <= 1 {
1045            active.remove(peer_id);
1046        } else {
1047            *count -= 1;
1048        }
1049    }
1050
1051    async fn release_queried_peer_requests(&self, peer_ids: &[String]) {
1052        for peer_id in peer_ids {
1053            self.release_peer_request(peer_id).await;
1054        }
1055    }
1056
1057    fn requested_quote_mint(&self) -> Option<&str> {
1058        if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
1059            if self.routing.cashu_accepted_mints.is_empty()
1060                || self
1061                    .routing
1062                    .cashu_accepted_mints
1063                    .iter()
1064                    .any(|mint| mint == default_mint)
1065            {
1066                return Some(default_mint);
1067            }
1068        }
1069
1070        self.routing
1071            .cashu_accepted_mints
1072            .first()
1073            .map(String::as_str)
1074    }
1075
1076    fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
1077        if let Some(requested_mint) = requested_mint {
1078            if self.accepts_quote_mint(Some(requested_mint)) {
1079                return Some(requested_mint.to_string());
1080            }
1081        }
1082        if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
1083            return Some(default_mint.clone());
1084        }
1085        if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
1086            return Some(first_mint.clone());
1087        }
1088        requested_mint.map(str::to_string)
1089    }
1090
1091    fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
1092        if self.routing.cashu_accepted_mints.is_empty() {
1093            return true;
1094        }
1095
1096        let Some(mint_url) = mint_url else {
1097            return false;
1098        };
1099        self.routing
1100            .cashu_accepted_mints
1101            .iter()
1102            .any(|mint| mint == mint_url)
1103    }
1104
1105    fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
1106        let Some(mint_url) = mint_url else {
1107            return self.routing.cashu_default_mint.is_none()
1108                && self.routing.cashu_accepted_mints.is_empty();
1109        };
1110        self.routing.cashu_default_mint.as_deref() == Some(mint_url)
1111            || self
1112                .routing
1113                .cashu_accepted_mints
1114                .iter()
1115                .any(|mint| mint == mint_url)
1116    }
1117
1118    async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
1119        let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
1120        if base == 0 {
1121            return 0;
1122        }
1123
1124        let selector = self.peer_selector.read().await;
1125        let Some(stats) = selector.get_stats(peer_id) else {
1126            let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
1127            return if max_cap > 0 { base.min(max_cap) } else { base };
1128        };
1129
1130        if stats.cashu_payment_defaults > 0
1131            && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
1132        {
1133            return 0;
1134        }
1135
1136        let success_bonus = stats
1137            .successes
1138            .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
1139        let receipt_bonus = stats
1140            .cashu_payment_receipts
1141            .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
1142        let mut cap = base
1143            .saturating_add(success_bonus)
1144            .saturating_add(receipt_bonus);
1145        let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
1146        if max_cap > 0 {
1147            cap = cap.min(max_cap);
1148        }
1149        cap
1150    }
1151
1152    async fn should_accept_quote_response(
1153        &self,
1154        from_peer: &str,
1155        preferred_mint_url: Option<&str>,
1156        offered_payment_sat: u64,
1157        res: &DataQuoteResponse,
1158    ) -> bool {
1159        let Some(payment_sat) = res.p else {
1160            return false;
1161        };
1162        if payment_sat > offered_payment_sat {
1163            return false;
1164        }
1165
1166        let response_mint = res.m.as_deref();
1167        if response_mint == preferred_mint_url {
1168            return true;
1169        }
1170        if self.trusts_quote_mint(response_mint) {
1171            return true;
1172        }
1173        if response_mint.is_none() {
1174            return false;
1175        }
1176
1177        payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
1178    }
1179
1180    async fn issue_quote(
1181        &self,
1182        peer_id: &str,
1183        hash_key: &str,
1184        payment_sat: u64,
1185        ttl_ms: u32,
1186        mint_url: Option<&str>,
1187    ) -> u64 {
1188        let quote_id = {
1189            let mut next = self.next_quote_id.write().await;
1190            let quote_id = *next;
1191            *next = next.saturating_add(1);
1192            quote_id
1193        };
1194
1195        let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
1196        self.issued_quotes.write().await.insert(
1197            (peer_id.to_string(), hash_key.to_string(), quote_id),
1198            IssuedQuote {
1199                expires_at,
1200                payment_sat,
1201                mint_url: mint_url.map(str::to_string),
1202            },
1203        );
1204        quote_id
1205    }
1206
1207    async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
1208        let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
1209        let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
1210            return false;
1211        };
1212        quote.expires_at > Instant::now()
1213    }
1214
1215    async fn send_request_to_peer(
1216        &self,
1217        peer_id: &str,
1218        hash: &Hash,
1219        request_htl: u8,
1220        quote_id: Option<u64>,
1221    ) -> bool {
1222        if !should_forward_htl(request_htl) {
1223            return false;
1224        }
1225
1226        let channel = match self.signaling.get_channel(peer_id).await {
1227            Some(c) => c,
1228            None => return false,
1229        };
1230
1231        let htl_config = {
1232            let configs = self.htl_configs.read().await;
1233            configs
1234                .get(peer_id)
1235                .cloned()
1236                .unwrap_or_else(PeerHTLConfig::random)
1237        };
1238
1239        let send_htl = htl_config.decrement(request_htl);
1240        let req = match quote_id {
1241            Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
1242            None => create_request(hash, send_htl),
1243        };
1244        let request_bytes = encode_request(&req);
1245        let request_len = request_bytes.len() as u64;
1246
1247        {
1248            let mut selector = self.peer_selector.write().await;
1249            selector.record_request(peer_id, request_len);
1250        }
1251
1252        match channel.send(request_bytes).await {
1253            Ok(()) => {
1254                self.record_peer_wire_sent(peer_id, request_len).await;
1255                true
1256            }
1257            Err(_) => {
1258                self.peer_selector.write().await.record_failure(peer_id);
1259                false
1260            }
1261        }
1262    }
1263
1264    async fn send_quote_request_to_peer(
1265        &self,
1266        peer_id: &str,
1267        hash: &Hash,
1268        payment_sat: u64,
1269        ttl_ms: u32,
1270        mint_url: Option<&str>,
1271    ) -> bool {
1272        let channel = match self.signaling.get_channel(peer_id).await {
1273            Some(c) => c,
1274            None => return false,
1275        };
1276
1277        let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
1278        let request_bytes = encode_quote_request(&req);
1279        let request_len = request_bytes.len() as u64;
1280
1281        match channel.send(request_bytes).await {
1282            Ok(()) => {
1283                self.record_peer_wire_sent(peer_id, request_len).await;
1284                true
1285            }
1286            Err(_) => false,
1287        }
1288    }
1289
1290    pub async fn set_read_sources(&self, sources: Vec<Arc<dyn MeshReadSource>>) {
1291        let mut by_id = HashMap::new();
1292        let mut stats = self.read_source_stats.write().await;
1293        for source in sources {
1294            let source_id = source.id().to_string();
1295            by_id.insert(source_id.clone(), source);
1296            stats
1297                .entry(source_id)
1298                .or_insert_with(AdaptiveSourceStats::default);
1299        }
1300        *self.read_sources.write().await = by_id;
1301    }
1302
1303    async fn record_read_source_request(&self, source_id: &str) {
1304        let mut stats = self.read_source_stats.write().await;
1305        stats
1306            .entry(source_id.to_string())
1307            .or_insert_with(AdaptiveSourceStats::default)
1308            .requests += 1;
1309    }
1310
1311    async fn record_read_source_miss(&self, source_id: &str) {
1312        let mut stats = self.read_source_stats.write().await;
1313        stats
1314            .entry(source_id.to_string())
1315            .or_insert_with(AdaptiveSourceStats::default)
1316            .misses += 1;
1317    }
1318
1319    async fn record_read_source_success(&self, source_id: &str, elapsed_ms: u64) {
1320        let now = Instant::now();
1321        let mut stats = self.read_source_stats.write().await;
1322        let stats = stats
1323            .entry(source_id.to_string())
1324            .or_insert_with(AdaptiveSourceStats::default);
1325        stats.successes += 1;
1326        stats.last_success_at = Some(now);
1327        stats.backoff_level = 0;
1328        stats.backed_off_until = None;
1329        if stats.srtt_ms <= 0.0 {
1330            stats.srtt_ms = elapsed_ms as f64;
1331            stats.rttvar_ms = elapsed_ms as f64 / 2.0;
1332            return;
1333        }
1334        let elapsed = elapsed_ms as f64;
1335        stats.rttvar_ms = 0.75 * stats.rttvar_ms + 0.25 * (stats.srtt_ms - elapsed).abs();
1336        stats.srtt_ms = 0.875 * stats.srtt_ms + 0.125 * elapsed;
1337    }
1338
1339    async fn record_read_source_failure(&self, source_id: &str) {
1340        let now = Instant::now();
1341        let mut stats = self.read_source_stats.write().await;
1342        let stats = stats
1343            .entry(source_id.to_string())
1344            .or_insert_with(AdaptiveSourceStats::default);
1345        stats.failures += 1;
1346        stats.last_failure_at = Some(now);
1347        Self::apply_source_backoff(stats, now);
1348    }
1349
1350    async fn record_read_source_timeout(&self, source_id: &str) {
1351        let now = Instant::now();
1352        let mut stats = self.read_source_stats.write().await;
1353        let stats = stats
1354            .entry(source_id.to_string())
1355            .or_insert_with(AdaptiveSourceStats::default);
1356        stats.timeouts += 1;
1357        stats.last_failure_at = Some(now);
1358        Self::apply_source_backoff(stats, now);
1359    }
1360
1361    fn apply_source_backoff(stats: &mut AdaptiveSourceStats, now: Instant) {
1362        stats.backoff_level = stats.backoff_level.saturating_add(1);
1363        let backoff_ms = (INITIAL_SOURCE_BACKOFF_MS
1364            .saturating_mul(2u64.saturating_pow(stats.backoff_level.saturating_sub(1))))
1365        .min(MAX_SOURCE_BACKOFF_MS);
1366        stats.backed_off_until = Some(now + Duration::from_millis(backoff_ms));
1367    }
1368
1369    async fn ordered_read_sources(&self) -> Vec<Arc<dyn MeshReadSource>> {
1370        let sources = self.read_sources.read().await;
1371        if sources.is_empty() {
1372            return Vec::new();
1373        }
1374
1375        let mut available: Vec<Arc<dyn MeshReadSource>> = sources
1376            .values()
1377            .filter(|source| source.is_available())
1378            .cloned()
1379            .collect();
1380        if available.is_empty() {
1381            return Vec::new();
1382        }
1383
1384        let now = Instant::now();
1385        let stats = self.read_source_stats.read().await;
1386        let mut healthy: Vec<Arc<dyn MeshReadSource>> = available
1387            .iter()
1388            .filter(|source| {
1389                stats
1390                    .get(source.id())
1391                    .and_then(|s| s.backed_off_until)
1392                    .is_none_or(|until| until <= now)
1393            })
1394            .cloned()
1395            .collect();
1396        if !healthy.is_empty() {
1397            available = std::mem::take(&mut healthy);
1398        }
1399
1400        available.sort_by(|left, right| {
1401            let left_stats = stats.get(left.id()).cloned().unwrap_or_default();
1402            let right_stats = stats.get(right.id()).cloned().unwrap_or_default();
1403            adaptive_source_score(&right_stats, now)
1404                .partial_cmp(&adaptive_source_score(&left_stats, now))
1405                .unwrap_or(std::cmp::Ordering::Equal)
1406                .then_with(|| left.id().cmp(right.id()))
1407        });
1408        available
1409    }
1410
1411    async fn should_probe_multiple_read_sources(
1412        &self,
1413        ordered_sources: &[Arc<dyn MeshReadSource>],
1414    ) -> bool {
1415        if ordered_sources.len() <= 1 {
1416            return false;
1417        }
1418        let stats = self.read_source_stats.read().await;
1419        let best = stats
1420            .get(ordered_sources[0].id())
1421            .cloned()
1422            .unwrap_or_default();
1423        let second = stats
1424            .get(ordered_sources[1].id())
1425            .cloned()
1426            .unwrap_or_default();
1427        if !source_has_history(&best) || !source_has_history(&second) {
1428            return false;
1429        }
1430        let now = Instant::now();
1431        adaptive_source_score(&best, now) - adaptive_source_score(&second, now)
1432            < SOURCE_SCORE_TIE_DELTA
1433    }
1434
1435    async fn source_dispatch_for(&self, source_count: usize) -> RequestDispatchConfig {
1436        if source_count == 0 {
1437            return self.routing.dispatch;
1438        }
1439        let ordered_sources = self.ordered_read_sources().await;
1440        let probe_multiple = self
1441            .should_probe_multiple_read_sources(&ordered_sources)
1442            .await;
1443        let initial_fanout = if probe_multiple {
1444            source_count.min(2)
1445        } else {
1446            1
1447        };
1448        RequestDispatchConfig {
1449            initial_fanout,
1450            hedge_fanout: self.routing.dispatch.hedge_fanout,
1451            max_fanout: self.routing.dispatch.max_fanout.min(source_count),
1452            hedge_interval_ms: self.routing.dispatch.hedge_interval_ms,
1453        }
1454    }
1455
1456    /// Get peer count
1457    pub async fn peer_count(&self) -> usize {
1458        self.signaling.peer_count().await
1459    }
1460
1461    /// Check if we need more peers
1462    pub async fn needs_peers(&self) -> bool {
1463        self.signaling.needs_peers().await
1464    }
1465
1466    /// Re-broadcast hello to refresh discovery as topology changes.
1467    pub async fn send_hello(&self) -> Result<(), TransportError> {
1468        self.signaling.send_hello(vec![]).await
1469    }
1470
1471    /// Drain all currently available peer-link messages and handle them.
1472    ///
1473    /// This keeps the message pump logic shared between simulation and the
1474    /// default production wrapper instead of duplicating per-channel loops.
1475    pub async fn drain_available_data_messages(self: &Arc<Self>) -> DataPumpStats {
1476        let mut stats = DataPumpStats::default();
1477        let peer_ids = self.signaling.peer_ids().await;
1478        for peer_id in peer_ids {
1479            let Some(channel) = self.signaling.get_channel(&peer_id).await else {
1480                continue;
1481            };
1482
1483            while let Some(data) = channel.try_recv() {
1484                stats.processed += 1;
1485                stats.processed_bytes += data.len() as u64;
1486                if let Some(msg) = parse_message(&data) {
1487                    match msg {
1488                        DataMessage::Request(_) => stats.request_messages += 1,
1489                        DataMessage::Response(_) => stats.response_messages += 1,
1490                        DataMessage::QuoteRequest(_) => stats.quote_request_messages += 1,
1491                        DataMessage::QuoteResponse(_) => stats.quote_response_messages += 1,
1492                        DataMessage::Payment(_)
1493                        | DataMessage::PaymentAck(_)
1494                        | DataMessage::Chunk(_) => {}
1495                    }
1496                }
1497                self.handle_data_message(&peer_id, &data).await;
1498            }
1499        }
1500        stats
1501    }
1502
1503    /// Apply an out-of-band payment credit to a peer's routing priority.
1504    pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
1505        self.peer_selector
1506            .write()
1507            .await
1508            .record_cashu_payment(peer_id, amount_sat);
1509    }
1510
1511    /// Record a post-delivery payment we received from a peer.
1512    pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
1513        self.peer_selector
1514            .write()
1515            .await
1516            .record_cashu_receipt(peer_id, amount_sat);
1517    }
1518
1519    /// Record that a peer failed to pay after we delivered successfully.
1520    pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
1521        self.peer_selector
1522            .write()
1523            .await
1524            .record_cashu_payment_default(peer_id);
1525    }
1526
1527    /// Snapshot routing/selection summary for inspection/debugging.
1528    pub async fn selector_summary(&self) -> crate::peer_selector::SelectorSummary {
1529        self.peer_selector.read().await.summary()
1530    }
1531
1532    fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
1533        selector.is_peer_blocked_for_payment_defaults(
1534            peer_id,
1535            self.routing.cashu_payment_default_block_threshold,
1536        )
1537    }
1538
1539    /// Export live peer metadata for inspection/debugging.
1540    pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
1541        self.peer_selector
1542            .read()
1543            .await
1544            .export_peer_metadata_snapshot()
1545    }
1546
1547    /// Snapshot current peer metadata and persist it into `local_store`.
1548    ///
1549    /// Uses content-addressed storage for the snapshot body and a reserved
1550    /// mutable pointer slot for the "latest snapshot hash".
1551    pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
1552        let snapshot = self
1553            .peer_selector
1554            .read()
1555            .await
1556            .export_peer_metadata_snapshot();
1557        let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
1558            StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
1559        })?;
1560        let snapshot_hash = hashtree_core::sha256(&bytes);
1561        let _ = self.local_store.put(snapshot_hash, bytes).await?;
1562
1563        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
1564        let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
1565        let _ = self.local_store.delete(&pointer_slot).await?;
1566        let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
1567
1568        Ok(snapshot_hash)
1569    }
1570
1571    /// Load persisted peer metadata from `local_store` if available.
1572    pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
1573        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
1574        let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
1575            return Ok(false);
1576        };
1577        let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
1578            StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
1579        })?;
1580        let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
1581
1582        let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
1583            return Ok(false);
1584        };
1585        let snapshot: PeerMetadataSnapshot =
1586            serde_json::from_slice(&snapshot_bytes).map_err(|e| {
1587                StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
1588            })?;
1589        self.peer_selector
1590            .write()
1591            .await
1592            .import_peer_metadata_snapshot(&snapshot);
1593        Ok(true)
1594    }
1595
1596    /// Request data from peers after negotiating a paid quote.
1597    ///
1598    /// If quote negotiation fails or the quoted peer does not deliver, the store
1599    /// falls back to the normal unpaid retrieval path to preserve liveness.
1600    pub async fn get_with_quote(
1601        &self,
1602        hash: &Hash,
1603        payment_sat: u64,
1604        quote_ttl: Duration,
1605    ) -> Result<Option<Vec<u8>>, StoreError> {
1606        if let Some(data) = self.local_store.get(hash).await? {
1607            return Ok(Some(data));
1608        }
1609        Ok(self
1610            .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
1611            .await)
1612    }
1613
1614    async fn request_from_peers_with_quote(
1615        &self,
1616        hash: &Hash,
1617        payment_sat: u64,
1618        quote_ttl: Duration,
1619    ) -> Option<Vec<u8>> {
1620        let ordered_peer_ids = self.ordered_connected_peers(None).await;
1621        if ordered_peer_ids.is_empty() {
1622            return None;
1623        }
1624
1625        if let Some(quote) = self
1626            .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
1627            .await
1628        {
1629            if let Some(data) = self
1630                .request_from_single_peer(hash, &quote.peer_id, MAX_HTL, Some(quote.quote_id))
1631                .await
1632            {
1633                return Some(data);
1634            }
1635        }
1636
1637        self.request_from_mesh(hash).await
1638    }
1639
1640    async fn request_quote_from_peers(
1641        &self,
1642        hash: &Hash,
1643        payment_sat: u64,
1644        quote_ttl: Duration,
1645        ordered_peer_ids: &[String],
1646    ) -> Option<NegotiatedQuote> {
1647        if ordered_peer_ids.is_empty() {
1648            return None;
1649        }
1650        let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
1651        if ttl_ms == 0 {
1652            return None;
1653        }
1654        let requested_mint = self.requested_quote_mint().map(str::to_string);
1655
1656        let hash_key = hash_to_key(hash);
1657        let (tx, rx) = oneshot::channel();
1658        self.pending_quotes.write().await.insert(
1659            hash_key.clone(),
1660            PendingQuoteRequest {
1661                response_tx: tx,
1662                preferred_mint_url: requested_mint.clone(),
1663                offered_payment_sat: payment_sat,
1664            },
1665        );
1666
1667        let rx = Arc::new(Mutex::new(rx));
1668        let result = run_hedged_waves(
1669            ordered_peer_ids.len(),
1670            self.routing.dispatch,
1671            self.request_timeout,
1672            |range| {
1673                let wave_peer_ids = ordered_peer_ids[range].to_vec();
1674                let requested_mint = requested_mint.clone();
1675                let hash = *hash;
1676                async move {
1677                    let mut sent = 0usize;
1678                    for peer_id in wave_peer_ids {
1679                        if self
1680                            .send_quote_request_to_peer(
1681                                &peer_id,
1682                                &hash,
1683                                payment_sat,
1684                                ttl_ms,
1685                                requested_mint.as_deref(),
1686                            )
1687                            .await
1688                        {
1689                            sent += 1;
1690                        }
1691                    }
1692                    sent
1693                }
1694            },
1695            |wait| {
1696                let rx = rx.clone();
1697                async move {
1698                    let mut rx = rx.lock().await;
1699                    match tokio::time::timeout(wait, &mut *rx).await {
1700                        Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
1701                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1702                        Err(_) => HedgedWaveAction::Continue,
1703                    }
1704                }
1705            },
1706        )
1707        .await;
1708        let _ = self.pending_quotes.write().await.remove(&hash_key);
1709        result
1710    }
1711
1712    async fn request_from_single_peer(
1713        &self,
1714        hash: &Hash,
1715        peer_id: &str,
1716        request_htl: u8,
1717        quote_id: Option<u64>,
1718    ) -> Option<Vec<u8>> {
1719        let hash_key = hash_to_key(hash);
1720        let (tx, rx) = oneshot::channel();
1721        self.pending_requests.write().await.insert(
1722            hash_key.clone(),
1723            PendingRequest {
1724                response_tx: tx,
1725                started_at: Instant::now(),
1726                queried_peers: vec![peer_id.to_string()],
1727            },
1728        );
1729
1730        let mut rx = rx;
1731        if !self
1732            .send_request_to_peer(peer_id, hash, request_htl, quote_id)
1733            .await
1734        {
1735            let _ = self.pending_requests.write().await.remove(&hash_key);
1736            return None;
1737        }
1738        self.reserve_peer_request(peer_id).await;
1739
1740        if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
1741            if hashtree_core::sha256(&data) == *hash {
1742                let _ = self.local_store.put(*hash, data.clone()).await;
1743                return Some(data);
1744            }
1745        }
1746
1747        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1748            self.release_queried_peer_requests(&pending.queried_peers)
1749                .await;
1750            for peer_id in pending.queried_peers {
1751                self.peer_selector.write().await.record_timeout(&peer_id);
1752            }
1753        }
1754        let _ = self.take_forward_requesters(&hash_key).await;
1755        None
1756    }
1757
1758    async fn request_from_ordered_peers(
1759        &self,
1760        hash: &Hash,
1761        ordered_peer_ids: &[String],
1762        request_htl: u8,
1763    ) -> RouteFetchOutcome {
1764        let hash_key = hash_to_key(hash);
1765        let (tx, rx) = oneshot::channel();
1766        self.pending_requests.write().await.insert(
1767            hash_key.clone(),
1768            PendingRequest {
1769                response_tx: tx,
1770                started_at: Instant::now(),
1771                queried_peers: Vec::new(),
1772            },
1773        );
1774
1775        let rx = Arc::new(Mutex::new(rx));
1776        let result = run_hedged_waves(
1777            ordered_peer_ids.len(),
1778            self.routing.dispatch,
1779            self.request_timeout,
1780            |range| {
1781                let wave_peer_ids = ordered_peer_ids[range].to_vec();
1782                let hash = *hash;
1783                let hash_key = hash_key.clone();
1784                async move {
1785                    let mut sent = 0usize;
1786                    for peer_id in wave_peer_ids {
1787                        if self
1788                            .send_request_to_peer(&peer_id, &hash, request_htl, None)
1789                            .await
1790                        {
1791                            sent += 1;
1792                            self.reserve_peer_request(&peer_id).await;
1793                            if let Some(pending) =
1794                                self.pending_requests.write().await.get_mut(&hash_key)
1795                            {
1796                                pending.queried_peers.push(peer_id);
1797                            }
1798                        }
1799                    }
1800                    sent
1801                }
1802            },
1803            |wait| {
1804                let rx = rx.clone();
1805                async move {
1806                    let mut rx = rx.lock().await;
1807                    match tokio::time::timeout(wait, &mut *rx).await {
1808                        Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == *hash => {
1809                            HedgedWaveAction::Success(data)
1810                        }
1811                        Ok(Ok(Some(_))) => HedgedWaveAction::Continue,
1812                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1813                        Err(_) => HedgedWaveAction::Continue,
1814                    }
1815                }
1816            },
1817        )
1818        .await;
1819
1820        let Some(data) = result else {
1821            if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1822                self.release_queried_peer_requests(&pending.queried_peers)
1823                    .await;
1824                for peer_id in pending.queried_peers {
1825                    self.peer_selector.write().await.record_timeout(&peer_id);
1826                }
1827            }
1828            let _ = self.take_forward_requesters(&hash_key).await;
1829            return RouteFetchOutcome::Timeout;
1830        };
1831
1832        let _ = self.local_store.put(*hash, data.clone()).await;
1833        RouteFetchOutcome::Hit(data)
1834    }
1835
1836    async fn request_from_read_sources_inner(&self, hash: &Hash) -> RouteFetchOutcome {
1837        let ordered_sources = self.ordered_read_sources().await;
1838        if ordered_sources.is_empty() {
1839            return RouteFetchOutcome::Miss;
1840        }
1841
1842        let dispatch = normalize_dispatch_config(
1843            self.source_dispatch_for(ordered_sources.len()).await,
1844            ordered_sources.len(),
1845        );
1846        let wave_plan = build_hedged_wave_plan(ordered_sources.len(), dispatch);
1847        if wave_plan.is_empty() {
1848            return RouteFetchOutcome::Miss;
1849        }
1850
1851        let deadline = Instant::now() + self.request_timeout;
1852        let mut pending = FuturesUnordered::new();
1853        let mut pending_source_ids = HashSet::new();
1854        let mut saw_timeout = false;
1855        let mut next_source_idx = 0usize;
1856
1857        for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
1858            let from = next_source_idx;
1859            let to = (next_source_idx + wave_size).min(ordered_sources.len());
1860            next_source_idx = to;
1861
1862            for source in ordered_sources[from..to].iter().cloned() {
1863                let source_id = source.id().to_string();
1864                self.record_read_source_request(&source_id).await;
1865                pending_source_ids.insert(source_id.clone());
1866                let hash = *hash;
1867                pending.push(tokio::spawn(async move {
1868                    let started_at = Instant::now();
1869                    let result = std::panic::AssertUnwindSafe(source.get(&hash))
1870                        .catch_unwind()
1871                        .await;
1872                    match result {
1873                        Ok(Some(data)) => SourceFetchOutcome::Hit {
1874                            source_id,
1875                            data,
1876                            elapsed_ms: started_at.elapsed().as_millis().max(1) as u64,
1877                        },
1878                        Ok(None) => SourceFetchOutcome::Miss { source_id },
1879                        Err(_) => SourceFetchOutcome::Failure { source_id },
1880                    }
1881                }));
1882            }
1883
1884            let is_last_wave =
1885                wave_idx + 1 == wave_plan.len() || next_source_idx >= ordered_sources.len();
1886            let window_end = if is_last_wave {
1887                deadline
1888            } else {
1889                (Instant::now() + Duration::from_millis(dispatch.hedge_interval_ms)).min(deadline)
1890            };
1891
1892            while Instant::now() < window_end {
1893                let remaining = window_end.saturating_duration_since(Instant::now());
1894                let Some(result) = tokio::time::timeout(remaining, pending.next())
1895                    .await
1896                    .ok()
1897                    .flatten()
1898                else {
1899                    break;
1900                };
1901                let Ok(outcome) = result else {
1902                    continue;
1903                };
1904                match outcome {
1905                    SourceFetchOutcome::Hit {
1906                        source_id,
1907                        data,
1908                        elapsed_ms,
1909                    } => {
1910                        pending_source_ids.remove(&source_id);
1911                        self.record_read_source_success(&source_id, elapsed_ms)
1912                            .await;
1913                        return RouteFetchOutcome::Hit(data);
1914                    }
1915                    SourceFetchOutcome::Miss { source_id } => {
1916                        pending_source_ids.remove(&source_id);
1917                        self.record_read_source_miss(&source_id).await;
1918                    }
1919                    SourceFetchOutcome::Failure { source_id } => {
1920                        pending_source_ids.remove(&source_id);
1921                        self.record_read_source_failure(&source_id).await;
1922                    }
1923                }
1924            }
1925
1926            if Instant::now() >= deadline {
1927                break;
1928            }
1929        }
1930
1931        for source_id in pending_source_ids {
1932            saw_timeout = true;
1933            self.record_read_source_timeout(&source_id).await;
1934        }
1935        if saw_timeout {
1936            RouteFetchOutcome::Timeout
1937        } else {
1938            RouteFetchOutcome::Miss
1939        }
1940    }
1941
1942    async fn request_from_read_sources(&self, hash: &Hash) -> RouteFetchOutcome {
1943        let hash_key = hash_to_key(hash);
1944        let existing_wait = {
1945            let mut inflight = self.inflight_source_fetches.lock().await;
1946            if let Some(existing) = inflight.get_mut(&hash_key) {
1947                let (tx, rx) = oneshot::channel();
1948                existing.waiters.push(tx);
1949                Some(rx)
1950            } else {
1951                inflight.insert(
1952                    hash_key.clone(),
1953                    InflightSourceFetch {
1954                        waiters: Vec::new(),
1955                    },
1956                );
1957                None
1958            }
1959        };
1960
1961        if let Some(wait) = existing_wait {
1962            return wait.await.unwrap_or(RouteFetchOutcome::Timeout);
1963        }
1964
1965        let result = self.request_from_read_sources_inner(hash).await;
1966        if let RouteFetchOutcome::Hit(hit) = &result {
1967            let _ = self.local_store.put(*hash, hit.clone()).await;
1968        }
1969        self.complete_inflight_source_fetch(&hash_key, result.clone())
1970            .await;
1971
1972        result
1973    }
1974
1975    async fn complete_inflight_source_fetch(&self, hash_key: &str, result: RouteFetchOutcome) {
1976        let waiters = self
1977            .inflight_source_fetches
1978            .lock()
1979            .await
1980            .remove(hash_key)
1981            .map(|inflight| inflight.waiters)
1982            .unwrap_or_default();
1983        for waiter in waiters {
1984            let _ = waiter.send(result.clone());
1985        }
1986    }
1987
1988    async fn cancel_pending_peer_route(&self, hash: &Hash) {
1989        let hash_key = hash_to_key(hash);
1990        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1991            self.release_queried_peer_requests(&pending.queried_peers)
1992                .await;
1993        }
1994    }
1995
1996    async fn cancel_losing_route(&self, hash: &Hash, route: &ReadRoute, winner_data: &[u8]) {
1997        match route {
1998            ReadRoute::Peers(_) => self.cancel_pending_peer_route(hash).await,
1999            ReadRoute::Sources => {
2000                let hash_key = hash_to_key(hash);
2001                self.complete_inflight_source_fetch(
2002                    &hash_key,
2003                    RouteFetchOutcome::Hit(winner_data.to_vec()),
2004                )
2005                .await;
2006            }
2007        }
2008    }
2009
2010    async fn ranked_read_routes(&self, context: &MeshReadContext) -> Vec<RankedReadRoute> {
2011        let mut routes = Vec::new();
2012        let ordered_peers = if should_forward_htl(context.request_htl) {
2013            self.ordered_connected_peers(context.exclude_peer_id.as_deref())
2014                .await
2015        } else {
2016            Vec::new()
2017        };
2018        if !ordered_peers.is_empty() {
2019            let best_peer_id = ordered_peers[0].clone();
2020            let selector = self.peer_selector.read().await;
2021            let best_peer = selector.get_stats(&best_peer_id).cloned();
2022            let now = Instant::now();
2023            let (score, has_history) = match best_peer.as_ref() {
2024                Some(stats) => (
2025                    peer_endpoint_score(stats, now),
2026                    peer_endpoint_has_history(stats),
2027                ),
2028                None => (0.0, false),
2029            };
2030            routes.push(RankedReadRoute {
2031                route: ReadRoute::Peers(ordered_peers),
2032                best_endpoint_id: format!("peer:{best_peer_id}"),
2033                score,
2034                has_history,
2035            });
2036        }
2037        let ordered_sources = self.ordered_read_sources().await;
2038        if let Some(best_source) = ordered_sources.first() {
2039            let stats = self.read_source_stats.read().await;
2040            let best_source_stats = stats.get(best_source.id()).cloned().unwrap_or_default();
2041            let now = Instant::now();
2042            routes.push(RankedReadRoute {
2043                route: ReadRoute::Sources,
2044                best_endpoint_id: format!("source:{}", best_source.id()),
2045                score: adaptive_source_score(&best_source_stats, now),
2046                has_history: source_has_history(&best_source_stats),
2047            });
2048        }
2049        if routes.len() <= 1 {
2050            return routes;
2051        }
2052
2053        routes.sort_by(|left, right| {
2054            right
2055                .score
2056                .partial_cmp(&left.score)
2057                .unwrap_or(std::cmp::Ordering::Equal)
2058                .then_with(|| ranked_route_kind(&left.route).cmp(&ranked_route_kind(&right.route)))
2059                .then_with(|| left.best_endpoint_id.cmp(&right.best_endpoint_id))
2060                .then_with(|| left.route.id().cmp(right.route.id()))
2061        });
2062        routes
2063    }
2064
2065    fn should_probe_multiple_routes(&self, routes: &[RankedReadRoute]) -> bool {
2066        if routes.len() <= 1 {
2067            return false;
2068        }
2069        if !routes[0].has_history || !routes[1].has_history {
2070            return false;
2071        }
2072        (routes[0].score - routes[1].score) < SOURCE_SCORE_TIE_DELTA
2073    }
2074
2075    async fn run_read_route(
2076        &self,
2077        hash: &Hash,
2078        route: &ReadRoute,
2079        context: &MeshReadContext,
2080    ) -> RouteFetchOutcome {
2081        match route {
2082            ReadRoute::Peers(peer_ids) => {
2083                self.request_from_ordered_peers(hash, peer_ids, context.request_htl)
2084                    .await
2085            }
2086            ReadRoute::Sources => self.request_from_read_sources(hash).await,
2087        }
2088    }
2089
2090    async fn request_from_mesh_with_context(
2091        &self,
2092        hash: &Hash,
2093        context: &MeshReadContext,
2094    ) -> Option<Vec<u8>> {
2095        let routes = self.ranked_read_routes(context).await;
2096        match routes.as_slice() {
2097            [] => None,
2098            [ranked] => match self.run_read_route(hash, &ranked.route, context).await {
2099                RouteFetchOutcome::Hit(data) => Some(data),
2100                RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
2101            },
2102            [first, second, ..] => {
2103                if self.should_probe_multiple_routes(&routes) {
2104                    let first_fut = self.run_read_route(hash, &first.route, context);
2105                    let second_fut = self.run_read_route(hash, &second.route, context);
2106                    tokio::pin!(first_fut);
2107                    tokio::pin!(second_fut);
2108                    let mut first_done = false;
2109                    let mut second_done = false;
2110                    loop {
2111                        tokio::select! {
2112                            result = &mut first_fut, if !first_done => {
2113                                first_done = true;
2114                                if let RouteFetchOutcome::Hit(data) = result {
2115                                    if !second_done {
2116                                        self.cancel_losing_route(hash, &second.route, &data).await;
2117                                    }
2118                                    return Some(data);
2119                                }
2120                            }
2121                            result = &mut second_fut, if !second_done => {
2122                                second_done = true;
2123                                if let RouteFetchOutcome::Hit(data) = result {
2124                                    if !first_done {
2125                                        self.cancel_losing_route(hash, &first.route, &data).await;
2126                                    }
2127                                    return Some(data);
2128                                }
2129                            }
2130                            else => break,
2131                        }
2132                        if first_done && second_done {
2133                            break;
2134                        }
2135                    }
2136                    None
2137                } else {
2138                    match self.run_read_route(hash, &first.route, context).await {
2139                        RouteFetchOutcome::Hit(data) => return Some(data),
2140                        RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
2141                    }
2142                    for ranked in routes.iter().skip(1) {
2143                        match self.run_read_route(hash, &ranked.route, context).await {
2144                            RouteFetchOutcome::Hit(data) => return Some(data),
2145                            RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
2146                        }
2147                    }
2148                    None
2149                }
2150            }
2151        }
2152    }
2153
2154    async fn request_from_mesh(&self, hash: &Hash) -> Option<Vec<u8>> {
2155        self.request_from_mesh_with_context(hash, &MeshReadContext::default())
2156            .await
2157    }
2158
2159    async fn begin_forward_request(&self, hash_key: &str, requester_id: &str) -> bool {
2160        let mut pending = self.pending_forward_requests.write().await;
2161        if let Some(existing) = pending.get_mut(hash_key) {
2162            existing.requester_ids.insert(requester_id.to_string());
2163            return false;
2164        }
2165
2166        let mut requester_ids = HashSet::new();
2167        requester_ids.insert(requester_id.to_string());
2168        pending.insert(
2169            hash_key.to_string(),
2170            PendingForwardRequest { requester_ids },
2171        );
2172        true
2173    }
2174
2175    async fn was_recent_forward_miss(&self, hash_key: &str) -> bool {
2176        self.recent_forward_misses.lock().await.contains(hash_key)
2177    }
2178
2179    async fn mark_recent_forward_miss(&self, hash_key: &str) {
2180        let _ = self
2181            .recent_forward_misses
2182            .lock()
2183            .await
2184            .insert_if_new(hash_key.to_string());
2185    }
2186
2187    async fn take_forward_requesters(&self, hash_key: &str) -> Vec<String> {
2188        self.pending_forward_requests
2189            .write()
2190            .await
2191            .remove(hash_key)
2192            .map(|pending| pending.requester_ids.into_iter().collect())
2193            .unwrap_or_default()
2194    }
2195
2196    async fn complete_pending_response(
2197        self: &Arc<Self>,
2198        from_peer: &str,
2199        hash: &Hash,
2200        hash_key: String,
2201        payload: Vec<u8>,
2202    ) {
2203        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2204            self.release_queried_peer_requests(&pending.queried_peers)
2205                .await;
2206            let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
2207            self.peer_selector.write().await.record_success(
2208                from_peer,
2209                rtt_ms,
2210                payload.len() as u64,
2211            );
2212            let forward_requesters = self.take_forward_requesters(&hash_key).await;
2213            let response_bytes = if forward_requesters.is_empty() {
2214                None
2215            } else {
2216                Some(encode_response(&create_response(hash, payload.clone())))
2217            };
2218            let _ = pending.response_tx.send(Some(payload));
2219            if let Some(response_bytes) = response_bytes {
2220                for requester_id in forward_requesters {
2221                    Arc::clone(&self)
2222                        .enqueue_response_send(requester_id, response_bytes.clone(), Instant::now())
2223                        .await;
2224                }
2225            }
2226        }
2227    }
2228
2229    async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
2230        if !res.a {
2231            return;
2232        }
2233
2234        let Some(quote_id) = res.q else {
2235            return;
2236        };
2237
2238        let hash_key = hash_to_key(&res.h);
2239        let (preferred_mint_url, offered_payment_sat) = {
2240            let pending_quotes = self.pending_quotes.read().await;
2241            let Some(pending) = pending_quotes.get(&hash_key) else {
2242                return;
2243            };
2244            (
2245                pending.preferred_mint_url.clone(),
2246                pending.offered_payment_sat,
2247            )
2248        };
2249        if !self
2250            .should_accept_quote_response(
2251                from_peer,
2252                preferred_mint_url.as_deref(),
2253                offered_payment_sat,
2254                &res,
2255            )
2256            .await
2257        {
2258            return;
2259        }
2260        let mut pending_quotes = self.pending_quotes.write().await;
2261        if let Some(pending) = pending_quotes.remove(&hash_key) {
2262            let _ = pending.response_tx.send(Some(NegotiatedQuote {
2263                peer_id: from_peer.to_string(),
2264                quote_id,
2265                mint_url: res.m,
2266            }));
2267        }
2268    }
2269
2270    async fn handle_response_message(
2271        self: &Arc<Self>,
2272        from_peer: &str,
2273        res: crate::protocol::DataResponse,
2274    ) {
2275        let hash_key = hash_to_key(&res.h);
2276        let hash = match crate::protocol::bytes_to_hash(&res.h) {
2277            Some(h) => h,
2278            None => return,
2279        };
2280
2281        // Ignore malformed/corrupt payload and keep waiting for a valid response.
2282        if hashtree_core::sha256(&res.d) != hash {
2283            self.peer_selector.write().await.record_failure(from_peer);
2284            if self.debug {
2285                println!("[MeshStoreCore] Ignoring invalid response payload for {hash_key}");
2286            }
2287            return;
2288        }
2289
2290        self.complete_pending_response(from_peer, &hash, hash_key, res.d)
2291            .await;
2292    }
2293
2294    async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
2295        let hash = match crate::protocol::bytes_to_hash(&req.h) {
2296            Some(h) => h,
2297            None => return,
2298        };
2299        let hash_key = hash_to_key(&hash);
2300
2301        {
2302            let selector = self.peer_selector.read().await;
2303            if self.should_refuse_requests_from_peer(&selector, from_peer) {
2304                if self.debug {
2305                    println!(
2306                        "[MeshStoreCore] Refusing quote request from delinquent peer {}",
2307                        from_peer
2308                    );
2309                }
2310                return;
2311            }
2312        }
2313
2314        let chosen_mint = self.choose_quote_mint(req.m.as_deref());
2315        let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
2316            && !self.should_drop_response(&hash)
2317            && !self.should_corrupt_response(&hash);
2318
2319        let res = if can_serve {
2320            let quote_id = self
2321                .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
2322                .await;
2323            create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
2324        } else {
2325            create_quote_response_unavailable(&hash)
2326        };
2327        let response_bytes = encode_quote_response(&res);
2328        if let Some(channel) = self.signaling.get_channel(from_peer).await {
2329            if channel.send(response_bytes.clone()).await.is_ok() {
2330                self.record_peer_wire_sent(from_peer, response_bytes.len() as u64)
2331                    .await;
2332            }
2333        }
2334    }
2335
2336    async fn handle_request_message(
2337        self: &Arc<Self>,
2338        from_peer: &str,
2339        req: crate::protocol::DataRequest,
2340    ) {
2341        let hash = match crate::protocol::bytes_to_hash(&req.h) {
2342            Some(h) => h,
2343            None => return,
2344        };
2345        let hash_key = hash_to_key(&hash);
2346
2347        if let Some(quote_id) = req.q {
2348            if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
2349                if self.debug {
2350                    println!(
2351                        "[MeshStoreCore] Refusing request with invalid or expired quote {} from {}",
2352                        quote_id, from_peer
2353                    );
2354                }
2355                return;
2356            }
2357        }
2358
2359        let allow_peer_forwarding = {
2360            let selector = self.peer_selector.read().await;
2361            !self.should_refuse_requests_from_peer(&selector, from_peer)
2362        };
2363
2364        // Check local store
2365        if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
2366            if self.should_drop_response(&hash) {
2367                if self.debug {
2368                    println!(
2369                        "[MeshStoreCore] Dropping response for {} due to actor profile",
2370                        hash_to_key(&hash)
2371                    );
2372                }
2373                return;
2374            }
2375
2376            let response_delay = self.response_send_delay(&hash, data.len());
2377            if self.should_corrupt_response(&hash) {
2378                if data.is_empty() {
2379                    data.push(0x80);
2380                } else {
2381                    data[0] ^= 0x80;
2382                }
2383            }
2384
2385            // Send response
2386            let res = create_response(&hash, data);
2387            let response_bytes = encode_response(&res);
2388            let ready_at = Instant::now() + response_delay;
2389            Arc::clone(self)
2390                .enqueue_response_send(from_peer.to_string(), response_bytes, ready_at)
2391                .await;
2392            return;
2393        }
2394
2395        if self.pending_requests.read().await.contains_key(&hash_key) {
2396            let _ = self.begin_forward_request(&hash_key, from_peer).await;
2397            return;
2398        }
2399
2400        if self.was_recent_forward_miss(&hash_key).await {
2401            if self.debug {
2402                println!(
2403                    "[MeshStoreCore] Suppressing recently missed forwarded request for {}",
2404                    hash_key
2405                );
2406            }
2407            return;
2408        }
2409
2410        if !self.begin_forward_request(&hash_key, from_peer).await {
2411            return;
2412        }
2413
2414        let from_peer = from_peer.to_string();
2415        let this = Arc::clone(self);
2416        let request_htl = req.htl;
2417        tokio::spawn(async move {
2418            let result = if allow_peer_forwarding {
2419                let context = MeshReadContext {
2420                    exclude_peer_id: Some(from_peer.clone()),
2421                    request_htl,
2422                };
2423                this.request_from_mesh_with_context(&hash, &context).await
2424            } else {
2425                if this.debug {
2426                    println!(
2427                        "[MeshStoreCore] Serving request from delinquent peer {} via read sources only",
2428                        from_peer
2429                    );
2430                }
2431                match this.request_from_read_sources(&hash).await {
2432                    RouteFetchOutcome::Hit(data) => Some(data),
2433                    RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
2434                }
2435            };
2436            let requester_ids = this.take_forward_requesters(&hash_key).await;
2437            if let Some(data) = result {
2438                let ready_at = Instant::now() + this.response_send_delay(&hash, data.len());
2439                let res = create_response(&hash, data);
2440                let response_bytes = encode_response(&res);
2441                for requester_id in requester_ids {
2442                    Arc::clone(&this)
2443                        .enqueue_response_send(requester_id, response_bytes.clone(), ready_at)
2444                        .await;
2445                }
2446            } else {
2447                this.mark_recent_forward_miss(&hash_key).await;
2448            }
2449        });
2450    }
2451
2452    /// Handle incoming data message
2453    pub async fn handle_data_message(self: &Arc<Self>, from_peer: &str, data: &[u8]) {
2454        self.record_peer_wire_received(from_peer, data.len() as u64)
2455            .await;
2456        let parsed = match parse_message(data) {
2457            Some(m) => m,
2458            None => return,
2459        };
2460
2461        match parsed {
2462            DataMessage::Request(req) => {
2463                self.handle_request_message(from_peer, req).await;
2464            }
2465            DataMessage::Response(res) => {
2466                self.handle_response_message(from_peer, res).await;
2467            }
2468            DataMessage::QuoteRequest(req) => {
2469                self.handle_quote_request_message(from_peer, req).await;
2470            }
2471            DataMessage::QuoteResponse(res) => {
2472                self.handle_quote_response_message(from_peer, res).await;
2473            }
2474            DataMessage::Payment(_) | DataMessage::PaymentAck(_) | DataMessage::Chunk(_) => {}
2475        }
2476    }
2477}
2478
2479#[async_trait]
2480impl<S, R, F> Store for MeshStoreCore<S, R, F>
2481where
2482    S: Store + Send + Sync + 'static,
2483    R: SignalingTransport + Send + Sync + 'static,
2484    F: PeerLinkFactory + Send + Sync + 'static,
2485{
2486    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2487        self.local_store.put(hash, data).await
2488    }
2489
2490    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2491        // Try local first
2492        if let Some(data) = self.local_store.get(hash).await? {
2493            return Ok(Some(data));
2494        }
2495
2496        // Try peers
2497        Ok(self.request_from_mesh(hash).await)
2498    }
2499
2500    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2501        self.local_store.has(hash).await
2502    }
2503
2504    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2505        self.local_store.delete(hash).await
2506    }
2507}
2508
2509#[cfg(test)]
2510mod tests;
2511
2512/// Type alias for simulation store.
2513pub type SimMeshStore<S> =
2514    MeshStoreCore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
2515
2516/// Type alias for the default production core (Nostr signaling + WebRTC links).
2517pub type ProductionMeshStore<S> =
2518    MeshStoreCore<S, crate::nostr::NostrRelayTransport, crate::real_factory::WebRtcPeerLinkFactory>;