Skip to main content

hashtree_network/
mesh_store_core.rs

1//! Shared routed mesh store core.
2//!
3//! This module provides a concrete store wrapper that works with any local storage
4//! backend plus any signaling transport and peer-link factory. Both production
5//! (Nostr websockets + WebRTC) and simulation (mocks) use this same code.
6
7use async_trait::async_trait;
8use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
9use std::collections::hash_map::DefaultHasher;
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::hash::{Hash as _, Hasher};
13use std::ops::Range;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{oneshot, Mutex, RwLock};
18use tokio::time::Instant;
19
20use hashtree_core::{Hash, Store, StoreError};
21
22use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
23use crate::protocol::{
24    create_quote_request, create_quote_response_available, create_quote_response_unavailable,
25    create_request, create_request_with_quote, create_response, encode_quote_request,
26    encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
27    DataMessage, DataQuoteRequest, DataQuoteResponse,
28};
29use crate::signaling::MeshRouter;
30use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
31use crate::types::{should_forward_htl, PeerHTLConfig, SignalingMessage, TimedSeenSet, MAX_HTL};
32
33// Keep the on-disk namespace stable across the crate rename so existing peer
34// metadata does not disappear for users upgrading from the old package name.
35const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
36const RECENT_FORWARD_MISS_CAPACITY: usize = 4096;
37const MIN_RECENT_FORWARD_MISS_TTL_MS: u64 = 250;
38
39/// Pending request awaiting response
40struct PendingRequest {
41    response_tx: oneshot::Sender<Option<Vec<u8>>>,
42    started_at: Instant,
43    queried_peers: Vec<String>,
44}
45
46struct PendingQuoteRequest {
47    response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
48    preferred_mint_url: Option<String>,
49    offered_payment_sat: u64,
50}
51
52struct PendingForwardRequest {
53    requester_ids: HashSet<String>,
54}
55
56#[derive(Debug, Clone, Default)]
57struct PeerWireStats {
58    bytes_sent: u64,
59    bytes_received: u64,
60    bandwidth_debt: f64,
61}
62
63struct PendingResponseSend {
64    job_id: u64,
65    peer_id: String,
66    bytes: Vec<u8>,
67    ready_at: Instant,
68    queue_sequence: u64,
69}
70
71#[async_trait]
72pub trait MeshReadSource: Send + Sync {
73    fn id(&self) -> &str;
74
75    fn is_available(&self) -> bool {
76        true
77    }
78
79    async fn get(&self, hash: &Hash) -> Option<Vec<u8>>;
80}
81
82#[derive(Debug, Clone)]
83struct NegotiatedQuote {
84    peer_id: String,
85    quote_id: u64,
86    #[allow(dead_code)]
87    mint_url: Option<String>,
88}
89
90struct IssuedQuote {
91    expires_at: Instant,
92    #[allow(dead_code)]
93    payment_sat: u64,
94    #[allow(dead_code)]
95    mint_url: Option<String>,
96}
97
98#[derive(Debug, Clone, Default)]
99struct AdaptiveSourceStats {
100    requests: u64,
101    successes: u64,
102    misses: u64,
103    failures: u64,
104    timeouts: u64,
105    srtt_ms: f64,
106    rttvar_ms: f64,
107    backoff_level: u32,
108    backed_off_until: Option<Instant>,
109    last_success_at: Option<Instant>,
110    last_failure_at: Option<Instant>,
111}
112
113#[derive(Debug, Clone)]
114enum RouteFetchOutcome {
115    Hit(Vec<u8>),
116    Miss,
117    Timeout,
118}
119
120struct InflightSourceFetch {
121    waiters: Vec<oneshot::Sender<RouteFetchOutcome>>,
122}
123
124enum SourceFetchOutcome {
125    Hit {
126        source_id: String,
127        data: Vec<u8>,
128        elapsed_ms: u64,
129    },
130    Miss {
131        source_id: String,
132    },
133    Failure {
134        source_id: String,
135    },
136}
137
138const INITIAL_SOURCE_BACKOFF_MS: u64 = 250;
139const MAX_SOURCE_BACKOFF_MS: u64 = 10_000;
140const SOURCE_SCORE_TIE_DELTA: f64 = 0.15;
141const RECENT_SOURCE_SUCCESS_WINDOW: Duration = Duration::from_secs(60);
142const ACTIVE_PEER_REQUEST_RANK_PENALTY: usize = 3;
143
144fn source_reliability_score(stats: &AdaptiveSourceStats) -> f64 {
145    (stats.successes as f64 + 1.0) / (stats.requests as f64 + 2.0)
146}
147
148fn source_latency_score(stats: &AdaptiveSourceStats) -> f64 {
149    if stats.srtt_ms <= 0.0 {
150        return 0.5;
151    }
152    (500.0 / (stats.srtt_ms + 50.0)).min(1.0)
153}
154
155fn source_has_history(stats: &AdaptiveSourceStats) -> bool {
156    stats.requests > 0
157        || stats.successes > 0
158        || stats.misses > 0
159        || stats.failures > 0
160        || stats.timeouts > 0
161}
162
163fn adaptive_source_score(stats: &AdaptiveSourceStats, now: Instant) -> f64 {
164    if let Some(backed_off_until) = stats.backed_off_until {
165        if backed_off_until > now {
166            return f64::NEG_INFINITY;
167        }
168    }
169
170    let miss_penalty = if stats.requests > 0 {
171        (stats.misses as f64 / stats.requests as f64) * 0.15
172    } else {
173        0.0
174    };
175    let failure_penalty = if stats.requests > 0 {
176        ((stats.failures + stats.timeouts) as f64 / stats.requests as f64) * 0.3
177    } else {
178        0.0
179    };
180    let recency_bonus = if stats
181        .last_success_at
182        .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
183    {
184        0.1
185    } else {
186        0.0
187    };
188
189    0.6 * source_reliability_score(stats) + 0.3 * source_latency_score(stats) + recency_bonus
190        - miss_penalty
191        - failure_penalty
192}
193
194fn peer_endpoint_has_history(stats: &crate::peer_selector::PeerStats) -> bool {
195    stats.requests_sent > 0 || stats.successes > 0 || stats.failures > 0 || stats.timeouts > 0
196}
197
198fn peer_endpoint_score(stats: &crate::peer_selector::PeerStats, now: Instant) -> f64 {
199    if stats.backed_off_until.is_some_and(|until| until > now) {
200        return f64::NEG_INFINITY;
201    }
202
203    let miss_penalty = 0.0;
204    let failure_penalty = if stats.requests_sent > 0 {
205        ((stats.failures + stats.timeouts) as f64 / stats.requests_sent as f64) * 0.3
206    } else {
207        0.0
208    };
209    let recency_bonus = if stats
210        .last_success
211        .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
212    {
213        0.1
214    } else {
215        0.0
216    };
217
218    0.6 * stats.success_rate()
219        + 0.3
220            * source_latency_score(&AdaptiveSourceStats {
221                srtt_ms: stats.srtt_ms,
222                ..AdaptiveSourceStats::default()
223            })
224        + recency_bonus
225        - miss_penalty
226        - failure_penalty
227}
228
229#[derive(Clone)]
230enum ReadRoute {
231    Peers(Vec<String>),
232    Sources,
233}
234
235impl ReadRoute {
236    fn id(&self) -> &'static str {
237        match self {
238            Self::Peers(_) => "peers",
239            Self::Sources => "sources",
240        }
241    }
242}
243
244struct RankedReadRoute {
245    route: ReadRoute,
246    best_endpoint_id: String,
247    score: f64,
248    has_history: bool,
249}
250
251fn ranked_route_kind(route: &ReadRoute) -> u8 {
252    match route {
253        ReadRoute::Sources => 0,
254        ReadRoute::Peers(_) => 1,
255    }
256}
257
258#[derive(Debug, Clone)]
259struct MeshReadContext {
260    exclude_peer_id: Option<String>,
261    request_htl: u8,
262}
263
264impl Default for MeshReadContext {
265    fn default() -> Self {
266        Self {
267            exclude_peer_id: None,
268            request_htl: MAX_HTL,
269        }
270    }
271}
272
273/// Aggregate stats from draining currently available peer-link messages.
274#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
275pub struct DataPumpStats {
276    pub processed: usize,
277    pub request_messages: usize,
278    pub response_messages: usize,
279    pub quote_request_messages: u64,
280    pub quote_response_messages: u64,
281    pub processed_bytes: u64,
282}
283
284/// Request dispatch strategy for peer queries.
285///
286/// `MeshStoreCore` supports two practical retrieval modes:
287/// - Flood (`usize::MAX` fanout): maximize success/latency at bandwidth cost.
288/// - Staged hedging: probe a subset first, then expand.
289#[derive(Debug, Clone, Copy)]
290pub struct RequestDispatchConfig {
291    /// Number of peers queried immediately.
292    pub initial_fanout: usize,
293    /// Number of additional peers to query on each hedge step.
294    pub hedge_fanout: usize,
295    /// Total peers allowed for this request.
296    pub max_fanout: usize,
297    /// Delay between hedge waves (ms). `0` means send all waves immediately.
298    pub hedge_interval_ms: u64,
299}
300
301impl Default for RequestDispatchConfig {
302    fn default() -> Self {
303        Self {
304            initial_fanout: usize::MAX,
305            hedge_fanout: usize::MAX,
306            max_fanout: usize::MAX,
307            hedge_interval_ms: 0,
308        }
309    }
310}
311
312/// Normalize fanout config against current peer availability.
313pub fn normalize_dispatch_config(
314    dispatch: RequestDispatchConfig,
315    available_peers: usize,
316) -> RequestDispatchConfig {
317    let mut cfg = dispatch;
318    let cap = if cfg.max_fanout == 0 {
319        available_peers
320    } else {
321        cfg.max_fanout.min(available_peers)
322    };
323    cfg.max_fanout = cap;
324    cfg.initial_fanout = if cfg.initial_fanout == 0 {
325        1
326    } else {
327        cfg.initial_fanout.min(cap.max(1))
328    };
329    cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
330        1
331    } else {
332        cfg.hedge_fanout.min(cap.max(1))
333    };
334    cfg
335}
336
337/// Build wave sizes for staged hedged dispatch.
338pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
339    if peer_count == 0 {
340        return Vec::new();
341    }
342    let cap = dispatch.max_fanout.min(peer_count);
343    if cap == 0 {
344        return Vec::new();
345    }
346
347    let mut plan = Vec::new();
348    let mut sent = 0usize;
349    let first = dispatch.initial_fanout.min(cap).max(1);
350    plan.push(first);
351    sent += first;
352
353    while sent < cap {
354        let next = dispatch.hedge_fanout.min(cap - sent).max(1);
355        plan.push(next);
356        sent += next;
357    }
358    plan
359}
360
361/// Outcome returned after waiting on a hedged dispatch wave.
362#[derive(Debug)]
363pub enum HedgedWaveAction<T> {
364    Continue,
365    Success(T),
366    Abort,
367}
368
369/// Run a staged hedged dispatch over peer index ranges.
370///
371/// This scheduler is shared by the reusable `MeshStoreCore` and the native
372/// `hashtree-cli` mesh path so tests and production use the same wave timing.
373pub async fn run_hedged_waves<T, SendWave, SendWaveFut, WaitWave, WaitWaveFut>(
374    peer_count: usize,
375    dispatch: RequestDispatchConfig,
376    request_timeout: Duration,
377    mut send_wave: SendWave,
378    mut wait_wave: WaitWave,
379) -> Option<T>
380where
381    SendWave: FnMut(Range<usize>) -> SendWaveFut,
382    SendWaveFut: Future<Output = usize>,
383    WaitWave: FnMut(Duration) -> WaitWaveFut,
384    WaitWaveFut: Future<Output = HedgedWaveAction<T>>,
385{
386    let dispatch = normalize_dispatch_config(dispatch, peer_count);
387    let wave_plan = build_hedged_wave_plan(peer_count, dispatch);
388    if wave_plan.is_empty() {
389        return None;
390    }
391
392    let deadline = Instant::now() + request_timeout;
393    let mut sent_total = 0usize;
394    let mut next_peer_idx = 0usize;
395
396    for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
397        let from = next_peer_idx;
398        let to = (next_peer_idx + wave_size).min(peer_count);
399        next_peer_idx = to;
400
401        if from == to {
402            continue;
403        }
404
405        sent_total += send_wave(from..to).await;
406        if sent_total == 0 {
407            if next_peer_idx >= peer_count {
408                break;
409            }
410            continue;
411        }
412
413        let now = Instant::now();
414        if now >= deadline {
415            break;
416        }
417        let remaining = deadline.saturating_duration_since(now);
418        let is_last_wave = wave_idx + 1 == wave_plan.len() || next_peer_idx >= peer_count;
419        let wait = if is_last_wave {
420            remaining
421        } else if dispatch.hedge_interval_ms == 0 {
422            Duration::ZERO
423        } else {
424            Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
425        };
426
427        if wait.is_zero() {
428            continue;
429        }
430
431        match wait_wave(wait).await {
432            HedgedWaveAction::Continue => {}
433            HedgedWaveAction::Success(value) => return Some(value),
434            HedgedWaveAction::Abort => break,
435        }
436    }
437
438    None
439}
440
441/// Keep selector membership aligned with currently connected peer IDs.
442pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
443    let mut selector = selector.write().await;
444    let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
445    let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
446    for peer_id in known {
447        if !current.contains(peer_id.as_str()) {
448            selector.remove_peer(&peer_id);
449        }
450    }
451    for peer_id in current_peer_ids {
452        selector.add_peer(peer_id.clone());
453    }
454}
455
456/// Response behavior profile for simulation/game-theory actors.
457///
458/// Defaults to honest behavior (always respond correctly, no extra delay).
459#[derive(Debug, Clone, Copy)]
460pub struct ResponseBehaviorConfig {
461    /// Probability that a node drops a response even when it has data.
462    pub drop_response_prob: f64,
463    /// Probability that a node responds with corrupted payload.
464    pub corrupt_response_prob: f64,
465    /// Baseline response delay before a peer starts sending any data.
466    pub extra_delay_ms: u64,
467    /// Additional delay before the first response byte becomes available.
468    pub first_byte_delay_ms: u64,
469    /// Sustained throughput for delivering large payloads. `0` disables size-based slowdown.
470    pub bytes_per_second: u64,
471    /// Probability that an otherwise honest response experiences an extra stall.
472    pub stall_response_prob: f64,
473    /// Extra delay injected when a stall event happens.
474    pub stall_delay_ms: u64,
475}
476
477impl Default for ResponseBehaviorConfig {
478    fn default() -> Self {
479        Self {
480            drop_response_prob: 0.0,
481            corrupt_response_prob: 0.0,
482            extra_delay_ms: 0,
483            first_byte_delay_ms: 0,
484            bytes_per_second: 0,
485            stall_response_prob: 0.0,
486            stall_delay_ms: 0,
487        }
488    }
489}
490
491impl ResponseBehaviorConfig {
492    fn normalized(self) -> Self {
493        Self {
494            drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
495            corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
496            extra_delay_ms: self.extra_delay_ms,
497            first_byte_delay_ms: self.first_byte_delay_ms,
498            bytes_per_second: self.bytes_per_second,
499            stall_response_prob: self.stall_response_prob.clamp(0.0, 1.0),
500            stall_delay_ms: self.stall_delay_ms,
501        }
502    }
503}
504
505/// Routing policy for request ordering + dispatch fanout.
506#[derive(Debug, Clone)]
507pub struct MeshRoutingConfig {
508    pub selection_strategy: SelectionStrategy,
509    pub fairness_enabled: bool,
510    /// Blend weight for payment-priority ranking in selector (`0.0` disables).
511    pub cashu_payment_weight: f64,
512    /// Refuse serving peers that have reached this many unpaid post-delivery settlements.
513    /// `0` disables refusal and only keeps metadata/downranking.
514    pub cashu_payment_default_block_threshold: u64,
515    /// Cashu mint URLs this node is willing to use for settlement.
516    pub cashu_accepted_mints: Vec<String>,
517    /// Preferred Cashu mint URL when initiating paid retrieval.
518    pub cashu_default_mint: Option<String>,
519    /// Baseline cap for accepting a peer-suggested mint outside the trusted set.
520    pub cashu_peer_suggested_mint_base_cap_sat: u64,
521    /// Additional sats allowed per successful delivery from that peer.
522    pub cashu_peer_suggested_mint_success_step_sat: u64,
523    /// Additional sats allowed per successful post-delivery payment received from that peer.
524    pub cashu_peer_suggested_mint_receipt_step_sat: u64,
525    /// Hard upper bound for any single peer-suggested mint quote we accept.
526    pub cashu_peer_suggested_mint_max_cap_sat: u64,
527    pub dispatch: RequestDispatchConfig,
528    pub response_behavior: ResponseBehaviorConfig,
529}
530
531impl Default for MeshRoutingConfig {
532    fn default() -> Self {
533        Self {
534            selection_strategy: SelectionStrategy::Weighted,
535            fairness_enabled: true,
536            cashu_payment_weight: 0.0,
537            cashu_payment_default_block_threshold: 0,
538            cashu_accepted_mints: Vec::new(),
539            cashu_default_mint: None,
540            cashu_peer_suggested_mint_base_cap_sat: 0,
541            cashu_peer_suggested_mint_success_step_sat: 0,
542            cashu_peer_suggested_mint_receipt_step_sat: 0,
543            cashu_peer_suggested_mint_max_cap_sat: 0,
544            dispatch: RequestDispatchConfig::default(),
545            response_behavior: ResponseBehaviorConfig::default(),
546        }
547    }
548}
549
550/// Routed mesh store core that works with any storage backend and transport
551/// implementation.
552///
553/// This is the shared code between production and simulation.
554/// - Production: `MeshStoreCore<LmdbStore, NostrRelayTransport, WebRtcPeerLinkFactory>`
555/// - Simulation: `MeshStoreCore<MemoryStore, MockRelayTransport, MockConnectionFactory>`
556pub struct MeshStoreCore<S, R, F>
557where
558    S: Store + Send + Sync + 'static,
559    R: SignalingTransport + Send + Sync + 'static,
560    F: PeerLinkFactory + Send + Sync + 'static,
561{
562    /// Local backing store
563    local_store: Arc<S>,
564    /// Mesh router (handles peer discovery and connection)
565    signaling: Arc<MeshRouter<R, F>>,
566    /// Per-peer HTL config
567    htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
568    /// Pending requests we sent
569    pending_requests: RwLock<HashMap<String, PendingRequest>>,
570    /// Pending quote negotiations keyed by requested hash.
571    pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
572    /// Forwarded peer requests currently being resolved through the mesh/upstream.
573    pending_forward_requests: RwLock<HashMap<String, PendingForwardRequest>>,
574    /// Bounded negative cache for recently forwarded misses/timeouts.
575    recent_forward_misses: Mutex<TimedSeenSet>,
576    /// Quotes we issued to peers and will accept exactly once until expiry.
577    issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
578    /// Monotonic quote identifier generator.
579    next_quote_id: RwLock<u64>,
580    /// Non-peer read sources such as upstream Blossom servers.
581    read_sources: RwLock<HashMap<String, Arc<dyn MeshReadSource>>>,
582    /// Adaptive health stats for non-peer read sources.
583    read_source_stats: RwLock<HashMap<String, AdaptiveSourceStats>>,
584    /// Shared in-flight upstream reads keyed by hash.
585    inflight_source_fetches: Mutex<HashMap<String, InflightSourceFetch>>,
586    /// Adaptive selector for peer ordering.
587    peer_selector: RwLock<PeerSelector>,
588    /// Active per-peer in-flight reads so concurrent block fetches spread across peers.
589    peer_active_requests: RwLock<HashMap<String, usize>>,
590    /// Actual wire traffic stats used for upload-side reciprocity scheduling.
591    peer_wire_stats: RwLock<HashMap<String, PeerWireStats>>,
592    /// Pending content responses waiting for upload arbitration.
593    pending_response_sends: Mutex<Vec<PendingResponseSend>>,
594    /// Upload response scheduler state.
595    response_scheduler_running: AtomicBool,
596    /// Monotonic id for queued response sends.
597    next_response_job_id: AtomicU64,
598    /// Routing/dispatch configuration.
599    routing: MeshRoutingConfig,
600    /// Request timeout
601    request_timeout: Duration,
602    /// Debug mode
603    debug: bool,
604    /// Running flag
605    running: RwLock<bool>,
606}
607
608impl<S, R, F> MeshStoreCore<S, R, F>
609where
610    S: Store + Send + Sync + 'static,
611    R: SignalingTransport + Send + Sync + 'static,
612    F: PeerLinkFactory + Send + Sync + 'static,
613{
614    /// Create a new routed mesh store core.
615    pub fn new(
616        local_store: Arc<S>,
617        signaling: Arc<MeshRouter<R, F>>,
618        request_timeout: Duration,
619        debug: bool,
620    ) -> Self {
621        Self::new_with_routing(
622            local_store,
623            signaling,
624            request_timeout,
625            debug,
626            Default::default(),
627        )
628    }
629
630    /// Create a new routed mesh store core with explicit routing configuration.
631    pub fn new_with_routing(
632        local_store: Arc<S>,
633        signaling: Arc<MeshRouter<R, F>>,
634        request_timeout: Duration,
635        debug: bool,
636        routing: MeshRoutingConfig,
637    ) -> Self {
638        let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
639        selector.set_fairness(routing.fairness_enabled);
640        selector.set_cashu_payment_weight(routing.cashu_payment_weight);
641        Self {
642            local_store,
643            signaling,
644            htl_configs: RwLock::new(HashMap::new()),
645            pending_requests: RwLock::new(HashMap::new()),
646            pending_quotes: RwLock::new(HashMap::new()),
647            pending_forward_requests: RwLock::new(HashMap::new()),
648            recent_forward_misses: Mutex::new(TimedSeenSet::new(
649                RECENT_FORWARD_MISS_CAPACITY,
650                Self::recent_forward_miss_ttl(request_timeout),
651            )),
652            issued_quotes: RwLock::new(HashMap::new()),
653            next_quote_id: RwLock::new(1),
654            read_sources: RwLock::new(HashMap::new()),
655            read_source_stats: RwLock::new(HashMap::new()),
656            inflight_source_fetches: Mutex::new(HashMap::new()),
657            peer_selector: RwLock::new(selector),
658            peer_active_requests: RwLock::new(HashMap::new()),
659            peer_wire_stats: RwLock::new(HashMap::new()),
660            pending_response_sends: Mutex::new(Vec::new()),
661            response_scheduler_running: AtomicBool::new(false),
662            next_response_job_id: AtomicU64::new(1),
663            routing,
664            request_timeout,
665            debug,
666            running: RwLock::new(false),
667        }
668    }
669
670    fn recent_forward_miss_ttl(request_timeout: Duration) -> Duration {
671        let ttl_ms = request_timeout
672            .as_millis()
673            .saturating_mul(2)
674            .max(MIN_RECENT_FORWARD_MISS_TTL_MS as u128)
675            .min(u64::MAX as u128) as u64;
676        Duration::from_millis(ttl_ms)
677    }
678
679    /// Start the store (begin listening for messages)
680    pub async fn start(&self) -> Result<(), TransportError> {
681        *self.running.write().await = true;
682
683        // Send initial hello
684        self.signaling.send_hello(vec![]).await?;
685
686        Ok(())
687    }
688
689    /// Stop the store
690    pub async fn stop(&self) {
691        *self.running.write().await = false;
692    }
693
694    /// Process incoming signaling message
695    pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
696        // When a new peer connects, initialize their HTL config
697        let peer_id = msg.peer_id().to_string();
698        {
699            let mut configs = self.htl_configs.write().await;
700            if !configs.contains_key(&peer_id) {
701                configs.insert(peer_id.clone(), PeerHTLConfig::random());
702            }
703        }
704        self.peer_selector.write().await.add_peer(peer_id);
705
706        self.signaling.handle_message(msg).await
707    }
708
709    /// Get signaling manager reference
710    pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
711        &self.signaling
712    }
713
714    fn response_behavior(&self) -> ResponseBehaviorConfig {
715        self.routing.response_behavior.normalized()
716    }
717
718    async fn record_peer_wire_sent(&self, peer_id: &str, bytes: u64) {
719        if bytes == 0 {
720            return;
721        }
722        let mut stats = self.peer_wire_stats.write().await;
723        let entry = stats.entry(peer_id.to_string()).or_default();
724        entry.bytes_sent = entry.bytes_sent.saturating_add(bytes);
725    }
726
727    async fn record_peer_wire_received(&self, peer_id: &str, bytes: u64) {
728        if bytes == 0 {
729            return;
730        }
731        let mut stats = self.peer_wire_stats.write().await;
732        let entry = stats.entry(peer_id.to_string()).or_default();
733        entry.bytes_received = entry.bytes_received.saturating_add(bytes);
734    }
735
736    fn peer_upload_weight(stats: &PeerWireStats) -> f64 {
737        let raw_ratio = (stats.bytes_received.saturating_add(1024) as f64)
738            / (stats.bytes_sent.saturating_add(1024) as f64);
739        let bounded_ratio = raw_ratio / (1.0 + raw_ratio);
740        0.5 + 1.5 * bounded_ratio
741    }
742
743    fn choose_ready_response_job(
744        ready_jobs: &[(u64, String, usize, Instant, u64)],
745        stats: &HashMap<String, PeerWireStats>,
746    ) -> Option<(u64, f64)> {
747        ready_jobs
748            .iter()
749            .map(|job| {
750                let peer_stats = stats.get(&job.1).cloned().unwrap_or_default();
751                let finish = peer_stats.bandwidth_debt
752                    + (job.2 as f64 / Self::peer_upload_weight(&peer_stats));
753                (job.0, job.1.as_str(), job.4, finish)
754            })
755            .min_by(|left, right| {
756                left.3
757                    .partial_cmp(&right.3)
758                    .unwrap_or(std::cmp::Ordering::Equal)
759                    .then_with(|| left.2.cmp(&right.2))
760                    .then_with(|| left.1.cmp(right.1))
761            })
762            .map(|choice| (choice.0, choice.3))
763    }
764
765    async fn enqueue_response_send(
766        self: &Arc<Self>,
767        peer_id: String,
768        bytes: Vec<u8>,
769        ready_at: Instant,
770    ) {
771        let job_id = self.next_response_job_id.fetch_add(1, Ordering::Relaxed);
772        {
773            let mut queue = self.pending_response_sends.lock().await;
774            queue.push(PendingResponseSend {
775                job_id,
776                peer_id,
777                bytes,
778                ready_at,
779                queue_sequence: job_id,
780            });
781        }
782
783        if self
784            .response_scheduler_running
785            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
786            .is_ok()
787        {
788            let this = Arc::clone(self);
789            tokio::spawn(async move {
790                this.run_response_scheduler().await;
791            });
792        }
793    }
794
795    async fn run_response_scheduler(self: Arc<Self>) {
796        loop {
797            let snapshot = {
798                let queue = self.pending_response_sends.lock().await;
799                if queue.is_empty() {
800                    self.response_scheduler_running
801                        .store(false, Ordering::Release);
802                    return;
803                }
804                queue
805                    .iter()
806                    .map(|job| {
807                        (
808                            job.job_id,
809                            job.peer_id.clone(),
810                            job.bytes.len(),
811                            job.ready_at,
812                            job.queue_sequence,
813                        )
814                    })
815                    .collect::<Vec<_>>()
816            };
817
818            let now = Instant::now();
819            let mut earliest_ready_at: Option<Instant> = None;
820            let mut ready_jobs = Vec::new();
821            for job in &snapshot {
822                if job.3 <= now {
823                    ready_jobs.push(job.clone());
824                } else {
825                    earliest_ready_at = Some(match earliest_ready_at {
826                        Some(current) => current.min(job.3),
827                        None => job.3,
828                    });
829                }
830            }
831
832            if ready_jobs.is_empty() {
833                if let Some(ready_at) = earliest_ready_at {
834                    tokio::time::sleep(ready_at.saturating_duration_since(Instant::now())).await;
835                    continue;
836                }
837                self.response_scheduler_running
838                    .store(false, Ordering::Release);
839                return;
840            }
841
842            let (selected_job_id, selected_finish) = {
843                let stats = self.peer_wire_stats.read().await;
844                Self::choose_ready_response_job(&ready_jobs, &stats).expect("ready response job")
845            };
846
847            let selected = {
848                let mut queue = self.pending_response_sends.lock().await;
849                let Some(index) = queue.iter().position(|job| job.job_id == selected_job_id) else {
850                    continue;
851                };
852                queue.swap_remove(index)
853            };
854
855            let sent = if let Some(channel) = self.signaling.get_channel(&selected.peer_id).await {
856                channel.send(selected.bytes.clone()).await.is_ok()
857            } else {
858                false
859            };
860
861            let queued_peers = {
862                let queue = self.pending_response_sends.lock().await;
863                queue
864                    .iter()
865                    .map(|job| job.peer_id.clone())
866                    .collect::<HashSet<_>>()
867            };
868            let mut stats = self.peer_wire_stats.write().await;
869            let entry = stats.entry(selected.peer_id.clone()).or_default();
870            if sent {
871                entry.bytes_sent = entry.bytes_sent.saturating_add(selected.bytes.len() as u64);
872                entry.bandwidth_debt = selected_finish;
873            }
874            if queued_peers.is_empty() {
875                for peer_stats in stats.values_mut() {
876                    peer_stats.bandwidth_debt = 0.0;
877                }
878            } else {
879                let floor = queued_peers
880                    .iter()
881                    .filter_map(|peer_id| stats.get(peer_id).map(|peer| peer.bandwidth_debt))
882                    .fold(f64::INFINITY, f64::min);
883                if floor.is_finite() && floor > 0.0 {
884                    for peer_id in queued_peers {
885                        if let Some(peer_stats) = stats.get_mut(&peer_id) {
886                            peer_stats.bandwidth_debt =
887                                (peer_stats.bandwidth_debt - floor).max(0.0);
888                        }
889                    }
890                }
891            }
892        }
893    }
894
895    fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
896        let mut hasher = DefaultHasher::new();
897        peer_id.hash(&mut hasher);
898        hash.hash(&mut hasher);
899        salt.hash(&mut hasher);
900        let v = hasher.finish();
901        (v as f64) / (u64::MAX as f64)
902    }
903
904    fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
905        Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
906    }
907
908    fn peer_metadata_pointer_slot_hash() -> Hash {
909        hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
910    }
911
912    fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
913        let bytes = hex::decode(hash_hex)
914            .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
915        if bytes.len() != 32 {
916            return Err(StoreError::Other(format!(
917                "Invalid hash length {}, expected 32 bytes",
918                bytes.len()
919            )));
920        }
921        let mut hash = [0u8; 32];
922        hash.copy_from_slice(&bytes);
923        Ok(hash)
924    }
925
926    fn should_drop_response(&self, hash: &Hash) -> bool {
927        let p = self.response_behavior().drop_response_prob;
928        if p <= 0.0 {
929            return false;
930        }
931        self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
932    }
933
934    fn should_corrupt_response(&self, hash: &Hash) -> bool {
935        let p = self.response_behavior().corrupt_response_prob;
936        if p <= 0.0 {
937            return false;
938        }
939        self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
940    }
941
942    fn should_stall_response(&self, hash: &Hash) -> bool {
943        let p = self.response_behavior().stall_response_prob;
944        if p <= 0.0 {
945            return false;
946        }
947        self.deterministic_actor_draw(hash, 0x5A_11_5A_11_5A_11_5A_11) < p
948    }
949
950    fn response_send_delay(&self, hash: &Hash, payload_len: usize) -> Duration {
951        let behavior = self.response_behavior();
952        let mut total_ms = behavior
953            .extra_delay_ms
954            .saturating_add(behavior.first_byte_delay_ms);
955
956        if behavior.bytes_per_second > 0 && payload_len > 0 {
957            let throughput_ms = ((payload_len as u128) * 1000)
958                .div_ceil(behavior.bytes_per_second as u128)
959                .min(u64::MAX as u128) as u64;
960            total_ms = total_ms.saturating_add(throughput_ms);
961        }
962
963        if behavior.stall_delay_ms > 0 && self.should_stall_response(hash) {
964            total_ms = total_ms.saturating_add(behavior.stall_delay_ms);
965        }
966
967        Duration::from_millis(total_ms)
968    }
969
970    async fn ordered_connected_peers(&self, exclude_peer_id: Option<&str>) -> Vec<String> {
971        let current_peer_ids = self.signaling.peer_ids().await;
972        if current_peer_ids.is_empty() {
973            return Vec::new();
974        }
975
976        sync_selector_peers(&self.peer_selector, &current_peer_ids).await;
977        let mut candidate_peer_ids: Vec<String> = current_peer_ids
978            .into_iter()
979            .filter(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude))
980            .collect();
981        if candidate_peer_ids.is_empty() {
982            return Vec::new();
983        }
984
985        let current_set: HashSet<&str> = candidate_peer_ids.iter().map(String::as_str).collect();
986        let mut selector = self.peer_selector.write().await;
987        let mut selector_order = selector.select_peers();
988        selector_order.retain(|peer_id| current_set.contains(peer_id.as_str()));
989        if selector_order.is_empty() {
990            let mut fallback = candidate_peer_ids;
991            fallback.sort();
992            return fallback;
993        }
994        let backed_off: HashMap<String, bool> = candidate_peer_ids
995            .iter()
996            .map(|peer_id| (peer_id.clone(), selector.is_peer_backed_off(peer_id)))
997            .collect();
998        drop(selector);
999
1000        let rank: HashMap<&str, usize> = selector_order
1001            .iter()
1002            .enumerate()
1003            .map(|(idx, peer_id)| (peer_id.as_str(), idx))
1004            .collect();
1005        let active = self.peer_active_requests.read().await;
1006        candidate_peer_ids.sort_by(|left, right| {
1007            let left_backed_off = backed_off.get(left).copied().unwrap_or(false);
1008            let right_backed_off = backed_off.get(right).copied().unwrap_or(false);
1009            if left_backed_off != right_backed_off {
1010                return if left_backed_off {
1011                    std::cmp::Ordering::Greater
1012                } else {
1013                    std::cmp::Ordering::Less
1014                };
1015            }
1016            let left_rank = rank.get(left.as_str()).copied().unwrap_or(usize::MAX / 2);
1017            let right_rank = rank.get(right.as_str()).copied().unwrap_or(usize::MAX / 2);
1018            let left_load = active.get(left).copied().unwrap_or(0);
1019            let right_load = active.get(right).copied().unwrap_or(0);
1020            (left_rank + left_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY))
1021                .cmp(&(right_rank + right_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY)))
1022                .then_with(|| left.cmp(right))
1023        });
1024        candidate_peer_ids
1025    }
1026
1027    async fn reserve_peer_request(&self, peer_id: &str) {
1028        let mut active = self.peer_active_requests.write().await;
1029        *active.entry(peer_id.to_string()).or_insert(0) += 1;
1030    }
1031
1032    async fn release_peer_request(&self, peer_id: &str) {
1033        let mut active = self.peer_active_requests.write().await;
1034        let Some(count) = active.get_mut(peer_id) else {
1035            return;
1036        };
1037        if *count <= 1 {
1038            active.remove(peer_id);
1039        } else {
1040            *count -= 1;
1041        }
1042    }
1043
1044    async fn release_queried_peer_requests(&self, peer_ids: &[String]) {
1045        for peer_id in peer_ids {
1046            self.release_peer_request(peer_id).await;
1047        }
1048    }
1049
1050    fn requested_quote_mint(&self) -> Option<&str> {
1051        if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
1052            if self.routing.cashu_accepted_mints.is_empty()
1053                || self
1054                    .routing
1055                    .cashu_accepted_mints
1056                    .iter()
1057                    .any(|mint| mint == default_mint)
1058            {
1059                return Some(default_mint);
1060            }
1061        }
1062
1063        self.routing
1064            .cashu_accepted_mints
1065            .first()
1066            .map(String::as_str)
1067    }
1068
1069    fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
1070        if let Some(requested_mint) = requested_mint {
1071            if self.accepts_quote_mint(Some(requested_mint)) {
1072                return Some(requested_mint.to_string());
1073            }
1074        }
1075        if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
1076            return Some(default_mint.clone());
1077        }
1078        if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
1079            return Some(first_mint.clone());
1080        }
1081        requested_mint.map(str::to_string)
1082    }
1083
1084    fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
1085        if self.routing.cashu_accepted_mints.is_empty() {
1086            return true;
1087        }
1088
1089        let Some(mint_url) = mint_url else {
1090            return false;
1091        };
1092        self.routing
1093            .cashu_accepted_mints
1094            .iter()
1095            .any(|mint| mint == mint_url)
1096    }
1097
1098    fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
1099        let Some(mint_url) = mint_url else {
1100            return self.routing.cashu_default_mint.is_none()
1101                && self.routing.cashu_accepted_mints.is_empty();
1102        };
1103        self.routing.cashu_default_mint.as_deref() == Some(mint_url)
1104            || self
1105                .routing
1106                .cashu_accepted_mints
1107                .iter()
1108                .any(|mint| mint == mint_url)
1109    }
1110
1111    async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
1112        let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
1113        if base == 0 {
1114            return 0;
1115        }
1116
1117        let selector = self.peer_selector.read().await;
1118        let Some(stats) = selector.get_stats(peer_id) else {
1119            let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
1120            return if max_cap > 0 { base.min(max_cap) } else { base };
1121        };
1122
1123        if stats.cashu_payment_defaults > 0
1124            && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
1125        {
1126            return 0;
1127        }
1128
1129        let success_bonus = stats
1130            .successes
1131            .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
1132        let receipt_bonus = stats
1133            .cashu_payment_receipts
1134            .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
1135        let mut cap = base
1136            .saturating_add(success_bonus)
1137            .saturating_add(receipt_bonus);
1138        let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
1139        if max_cap > 0 {
1140            cap = cap.min(max_cap);
1141        }
1142        cap
1143    }
1144
1145    async fn should_accept_quote_response(
1146        &self,
1147        from_peer: &str,
1148        preferred_mint_url: Option<&str>,
1149        offered_payment_sat: u64,
1150        res: &DataQuoteResponse,
1151    ) -> bool {
1152        let Some(payment_sat) = res.p else {
1153            return false;
1154        };
1155        if payment_sat > offered_payment_sat {
1156            return false;
1157        }
1158
1159        let response_mint = res.m.as_deref();
1160        if response_mint == preferred_mint_url {
1161            return true;
1162        }
1163        if self.trusts_quote_mint(response_mint) {
1164            return true;
1165        }
1166        if response_mint.is_none() {
1167            return false;
1168        }
1169
1170        payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
1171    }
1172
1173    async fn issue_quote(
1174        &self,
1175        peer_id: &str,
1176        hash_key: &str,
1177        payment_sat: u64,
1178        ttl_ms: u32,
1179        mint_url: Option<&str>,
1180    ) -> u64 {
1181        let quote_id = {
1182            let mut next = self.next_quote_id.write().await;
1183            let quote_id = *next;
1184            *next = next.saturating_add(1);
1185            quote_id
1186        };
1187
1188        let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
1189        self.issued_quotes.write().await.insert(
1190            (peer_id.to_string(), hash_key.to_string(), quote_id),
1191            IssuedQuote {
1192                expires_at,
1193                payment_sat,
1194                mint_url: mint_url.map(str::to_string),
1195            },
1196        );
1197        quote_id
1198    }
1199
1200    async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
1201        let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
1202        let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
1203            return false;
1204        };
1205        quote.expires_at > Instant::now()
1206    }
1207
1208    async fn send_request_to_peer(
1209        &self,
1210        peer_id: &str,
1211        hash: &Hash,
1212        request_htl: u8,
1213        quote_id: Option<u64>,
1214    ) -> bool {
1215        if !should_forward_htl(request_htl) {
1216            return false;
1217        }
1218
1219        let channel = match self.signaling.get_channel(peer_id).await {
1220            Some(c) => c,
1221            None => return false,
1222        };
1223
1224        let htl_config = {
1225            let configs = self.htl_configs.read().await;
1226            configs
1227                .get(peer_id)
1228                .cloned()
1229                .unwrap_or_else(PeerHTLConfig::random)
1230        };
1231
1232        let send_htl = htl_config.decrement(request_htl);
1233        let req = match quote_id {
1234            Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
1235            None => create_request(hash, send_htl),
1236        };
1237        let request_bytes = encode_request(&req);
1238        let request_len = request_bytes.len() as u64;
1239
1240        {
1241            let mut selector = self.peer_selector.write().await;
1242            selector.record_request(peer_id, request_len);
1243        }
1244
1245        match channel.send(request_bytes).await {
1246            Ok(()) => {
1247                self.record_peer_wire_sent(peer_id, request_len).await;
1248                true
1249            }
1250            Err(_) => {
1251                self.peer_selector.write().await.record_failure(peer_id);
1252                false
1253            }
1254        }
1255    }
1256
1257    async fn send_quote_request_to_peer(
1258        &self,
1259        peer_id: &str,
1260        hash: &Hash,
1261        payment_sat: u64,
1262        ttl_ms: u32,
1263        mint_url: Option<&str>,
1264    ) -> bool {
1265        let channel = match self.signaling.get_channel(peer_id).await {
1266            Some(c) => c,
1267            None => return false,
1268        };
1269
1270        let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
1271        let request_bytes = encode_quote_request(&req);
1272        let request_len = request_bytes.len() as u64;
1273
1274        match channel.send(request_bytes).await {
1275            Ok(()) => {
1276                self.record_peer_wire_sent(peer_id, request_len).await;
1277                true
1278            }
1279            Err(_) => false,
1280        }
1281    }
1282
1283    pub async fn set_read_sources(&self, sources: Vec<Arc<dyn MeshReadSource>>) {
1284        let mut by_id = HashMap::new();
1285        let mut stats = self.read_source_stats.write().await;
1286        for source in sources {
1287            let source_id = source.id().to_string();
1288            by_id.insert(source_id.clone(), source);
1289            stats
1290                .entry(source_id)
1291                .or_insert_with(AdaptiveSourceStats::default);
1292        }
1293        *self.read_sources.write().await = by_id;
1294    }
1295
1296    async fn record_read_source_request(&self, source_id: &str) {
1297        let mut stats = self.read_source_stats.write().await;
1298        stats
1299            .entry(source_id.to_string())
1300            .or_insert_with(AdaptiveSourceStats::default)
1301            .requests += 1;
1302    }
1303
1304    async fn record_read_source_miss(&self, source_id: &str) {
1305        let mut stats = self.read_source_stats.write().await;
1306        stats
1307            .entry(source_id.to_string())
1308            .or_insert_with(AdaptiveSourceStats::default)
1309            .misses += 1;
1310    }
1311
1312    async fn record_read_source_success(&self, source_id: &str, elapsed_ms: u64) {
1313        let now = Instant::now();
1314        let mut stats = self.read_source_stats.write().await;
1315        let stats = stats
1316            .entry(source_id.to_string())
1317            .or_insert_with(AdaptiveSourceStats::default);
1318        stats.successes += 1;
1319        stats.last_success_at = Some(now);
1320        stats.backoff_level = 0;
1321        stats.backed_off_until = None;
1322        if stats.srtt_ms <= 0.0 {
1323            stats.srtt_ms = elapsed_ms as f64;
1324            stats.rttvar_ms = elapsed_ms as f64 / 2.0;
1325            return;
1326        }
1327        let elapsed = elapsed_ms as f64;
1328        stats.rttvar_ms = 0.75 * stats.rttvar_ms + 0.25 * (stats.srtt_ms - elapsed).abs();
1329        stats.srtt_ms = 0.875 * stats.srtt_ms + 0.125 * elapsed;
1330    }
1331
1332    async fn record_read_source_failure(&self, source_id: &str) {
1333        let now = Instant::now();
1334        let mut stats = self.read_source_stats.write().await;
1335        let stats = stats
1336            .entry(source_id.to_string())
1337            .or_insert_with(AdaptiveSourceStats::default);
1338        stats.failures += 1;
1339        stats.last_failure_at = Some(now);
1340        Self::apply_source_backoff(stats, now);
1341    }
1342
1343    async fn record_read_source_timeout(&self, source_id: &str) {
1344        let now = Instant::now();
1345        let mut stats = self.read_source_stats.write().await;
1346        let stats = stats
1347            .entry(source_id.to_string())
1348            .or_insert_with(AdaptiveSourceStats::default);
1349        stats.timeouts += 1;
1350        stats.last_failure_at = Some(now);
1351        Self::apply_source_backoff(stats, now);
1352    }
1353
1354    fn apply_source_backoff(stats: &mut AdaptiveSourceStats, now: Instant) {
1355        stats.backoff_level = stats.backoff_level.saturating_add(1);
1356        let backoff_ms = (INITIAL_SOURCE_BACKOFF_MS
1357            .saturating_mul(2u64.saturating_pow(stats.backoff_level.saturating_sub(1))))
1358        .min(MAX_SOURCE_BACKOFF_MS);
1359        stats.backed_off_until = Some(now + Duration::from_millis(backoff_ms));
1360    }
1361
1362    async fn ordered_read_sources(&self) -> Vec<Arc<dyn MeshReadSource>> {
1363        let sources = self.read_sources.read().await;
1364        if sources.is_empty() {
1365            return Vec::new();
1366        }
1367
1368        let mut available: Vec<Arc<dyn MeshReadSource>> = sources
1369            .values()
1370            .filter(|source| source.is_available())
1371            .cloned()
1372            .collect();
1373        if available.is_empty() {
1374            return Vec::new();
1375        }
1376
1377        let now = Instant::now();
1378        let stats = self.read_source_stats.read().await;
1379        let mut healthy: Vec<Arc<dyn MeshReadSource>> = available
1380            .iter()
1381            .filter(|source| {
1382                stats
1383                    .get(source.id())
1384                    .and_then(|s| s.backed_off_until)
1385                    .is_none_or(|until| until <= now)
1386            })
1387            .cloned()
1388            .collect();
1389        if !healthy.is_empty() {
1390            available = std::mem::take(&mut healthy);
1391        }
1392
1393        available.sort_by(|left, right| {
1394            let left_stats = stats.get(left.id()).cloned().unwrap_or_default();
1395            let right_stats = stats.get(right.id()).cloned().unwrap_or_default();
1396            adaptive_source_score(&right_stats, now)
1397                .partial_cmp(&adaptive_source_score(&left_stats, now))
1398                .unwrap_or(std::cmp::Ordering::Equal)
1399                .then_with(|| left.id().cmp(right.id()))
1400        });
1401        available
1402    }
1403
1404    async fn should_probe_multiple_read_sources(
1405        &self,
1406        ordered_sources: &[Arc<dyn MeshReadSource>],
1407    ) -> bool {
1408        if ordered_sources.len() <= 1 {
1409            return false;
1410        }
1411        let stats = self.read_source_stats.read().await;
1412        let best = stats
1413            .get(ordered_sources[0].id())
1414            .cloned()
1415            .unwrap_or_default();
1416        let second = stats
1417            .get(ordered_sources[1].id())
1418            .cloned()
1419            .unwrap_or_default();
1420        if !source_has_history(&best) || !source_has_history(&second) {
1421            return false;
1422        }
1423        let now = Instant::now();
1424        adaptive_source_score(&best, now) - adaptive_source_score(&second, now)
1425            < SOURCE_SCORE_TIE_DELTA
1426    }
1427
1428    async fn source_dispatch_for(&self, source_count: usize) -> RequestDispatchConfig {
1429        if source_count == 0 {
1430            return self.routing.dispatch;
1431        }
1432        let ordered_sources = self.ordered_read_sources().await;
1433        let probe_multiple = self
1434            .should_probe_multiple_read_sources(&ordered_sources)
1435            .await;
1436        let initial_fanout = if probe_multiple {
1437            source_count.min(2)
1438        } else {
1439            1
1440        };
1441        RequestDispatchConfig {
1442            initial_fanout,
1443            hedge_fanout: self.routing.dispatch.hedge_fanout,
1444            max_fanout: self.routing.dispatch.max_fanout.min(source_count),
1445            hedge_interval_ms: self.routing.dispatch.hedge_interval_ms,
1446        }
1447    }
1448
1449    /// Get peer count
1450    pub async fn peer_count(&self) -> usize {
1451        self.signaling.peer_count().await
1452    }
1453
1454    /// Check if we need more peers
1455    pub async fn needs_peers(&self) -> bool {
1456        self.signaling.needs_peers().await
1457    }
1458
1459    /// Re-broadcast hello to refresh discovery as topology changes.
1460    pub async fn send_hello(&self) -> Result<(), TransportError> {
1461        self.signaling.send_hello(vec![]).await
1462    }
1463
1464    /// Drain all currently available peer-link messages and handle them.
1465    ///
1466    /// This keeps the message pump logic shared between simulation and the
1467    /// default production wrapper instead of duplicating per-channel loops.
1468    pub async fn drain_available_data_messages(self: &Arc<Self>) -> DataPumpStats {
1469        let mut stats = DataPumpStats::default();
1470        let peer_ids = self.signaling.peer_ids().await;
1471        for peer_id in peer_ids {
1472            let Some(channel) = self.signaling.get_channel(&peer_id).await else {
1473                continue;
1474            };
1475
1476            while let Some(data) = channel.try_recv() {
1477                stats.processed += 1;
1478                stats.processed_bytes += data.len() as u64;
1479                if let Some(msg) = parse_message(&data) {
1480                    match msg {
1481                        DataMessage::Request(_) => stats.request_messages += 1,
1482                        DataMessage::Response(_) => stats.response_messages += 1,
1483                        DataMessage::QuoteRequest(_) => stats.quote_request_messages += 1,
1484                        DataMessage::QuoteResponse(_) => stats.quote_response_messages += 1,
1485                        DataMessage::Payment(_)
1486                        | DataMessage::PaymentAck(_)
1487                        | DataMessage::Chunk(_) => {}
1488                    }
1489                }
1490                self.handle_data_message(&peer_id, &data).await;
1491            }
1492        }
1493        stats
1494    }
1495
1496    /// Apply an out-of-band payment credit to a peer's routing priority.
1497    pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
1498        self.peer_selector
1499            .write()
1500            .await
1501            .record_cashu_payment(peer_id, amount_sat);
1502    }
1503
1504    /// Record a post-delivery payment we received from a peer.
1505    pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
1506        self.peer_selector
1507            .write()
1508            .await
1509            .record_cashu_receipt(peer_id, amount_sat);
1510    }
1511
1512    /// Record that a peer failed to pay after we delivered successfully.
1513    pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
1514        self.peer_selector
1515            .write()
1516            .await
1517            .record_cashu_payment_default(peer_id);
1518    }
1519
1520    /// Snapshot routing/selection summary for inspection/debugging.
1521    pub async fn selector_summary(&self) -> crate::peer_selector::SelectorSummary {
1522        self.peer_selector.read().await.summary()
1523    }
1524
1525    fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
1526        selector.is_peer_blocked_for_payment_defaults(
1527            peer_id,
1528            self.routing.cashu_payment_default_block_threshold,
1529        )
1530    }
1531
1532    /// Export live peer metadata for inspection/debugging.
1533    pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
1534        self.peer_selector
1535            .read()
1536            .await
1537            .export_peer_metadata_snapshot()
1538    }
1539
1540    /// Snapshot current peer metadata and persist it into `local_store`.
1541    ///
1542    /// Uses content-addressed storage for the snapshot body and a reserved
1543    /// mutable pointer slot for the "latest snapshot hash".
1544    pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
1545        let snapshot = self
1546            .peer_selector
1547            .read()
1548            .await
1549            .export_peer_metadata_snapshot();
1550        let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
1551            StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
1552        })?;
1553        let snapshot_hash = hashtree_core::sha256(&bytes);
1554        let _ = self.local_store.put(snapshot_hash, bytes).await?;
1555
1556        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
1557        let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
1558        let _ = self.local_store.delete(&pointer_slot).await?;
1559        let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
1560
1561        Ok(snapshot_hash)
1562    }
1563
1564    /// Load persisted peer metadata from `local_store` if available.
1565    pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
1566        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
1567        let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
1568            return Ok(false);
1569        };
1570        let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
1571            StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
1572        })?;
1573        let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
1574
1575        let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
1576            return Ok(false);
1577        };
1578        let snapshot: PeerMetadataSnapshot =
1579            serde_json::from_slice(&snapshot_bytes).map_err(|e| {
1580                StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
1581            })?;
1582        self.peer_selector
1583            .write()
1584            .await
1585            .import_peer_metadata_snapshot(&snapshot);
1586        Ok(true)
1587    }
1588
1589    /// Request data from peers after negotiating a paid quote.
1590    ///
1591    /// If quote negotiation fails or the quoted peer does not deliver, the store
1592    /// falls back to the normal unpaid retrieval path to preserve liveness.
1593    pub async fn get_with_quote(
1594        &self,
1595        hash: &Hash,
1596        payment_sat: u64,
1597        quote_ttl: Duration,
1598    ) -> Result<Option<Vec<u8>>, StoreError> {
1599        if let Some(data) = self.local_store.get(hash).await? {
1600            return Ok(Some(data));
1601        }
1602        Ok(self
1603            .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
1604            .await)
1605    }
1606
1607    async fn request_from_peers_with_quote(
1608        &self,
1609        hash: &Hash,
1610        payment_sat: u64,
1611        quote_ttl: Duration,
1612    ) -> Option<Vec<u8>> {
1613        let ordered_peer_ids = self.ordered_connected_peers(None).await;
1614        if ordered_peer_ids.is_empty() {
1615            return None;
1616        }
1617
1618        if let Some(quote) = self
1619            .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
1620            .await
1621        {
1622            if let Some(data) = self
1623                .request_from_single_peer(hash, &quote.peer_id, MAX_HTL, Some(quote.quote_id))
1624                .await
1625            {
1626                return Some(data);
1627            }
1628        }
1629
1630        self.request_from_mesh(hash).await
1631    }
1632
1633    async fn request_quote_from_peers(
1634        &self,
1635        hash: &Hash,
1636        payment_sat: u64,
1637        quote_ttl: Duration,
1638        ordered_peer_ids: &[String],
1639    ) -> Option<NegotiatedQuote> {
1640        if ordered_peer_ids.is_empty() {
1641            return None;
1642        }
1643        let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
1644        if ttl_ms == 0 {
1645            return None;
1646        }
1647        let requested_mint = self.requested_quote_mint().map(str::to_string);
1648
1649        let hash_key = hash_to_key(hash);
1650        let (tx, rx) = oneshot::channel();
1651        self.pending_quotes.write().await.insert(
1652            hash_key.clone(),
1653            PendingQuoteRequest {
1654                response_tx: tx,
1655                preferred_mint_url: requested_mint.clone(),
1656                offered_payment_sat: payment_sat,
1657            },
1658        );
1659
1660        let rx = Arc::new(Mutex::new(rx));
1661        let result = run_hedged_waves(
1662            ordered_peer_ids.len(),
1663            self.routing.dispatch,
1664            self.request_timeout,
1665            |range| {
1666                let wave_peer_ids = ordered_peer_ids[range].to_vec();
1667                let requested_mint = requested_mint.clone();
1668                let hash = *hash;
1669                async move {
1670                    let mut sent = 0usize;
1671                    for peer_id in wave_peer_ids {
1672                        if self
1673                            .send_quote_request_to_peer(
1674                                &peer_id,
1675                                &hash,
1676                                payment_sat,
1677                                ttl_ms,
1678                                requested_mint.as_deref(),
1679                            )
1680                            .await
1681                        {
1682                            sent += 1;
1683                        }
1684                    }
1685                    sent
1686                }
1687            },
1688            |wait| {
1689                let rx = rx.clone();
1690                async move {
1691                    let mut rx = rx.lock().await;
1692                    match tokio::time::timeout(wait, &mut *rx).await {
1693                        Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
1694                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1695                        Err(_) => HedgedWaveAction::Continue,
1696                    }
1697                }
1698            },
1699        )
1700        .await;
1701        let _ = self.pending_quotes.write().await.remove(&hash_key);
1702        result
1703    }
1704
1705    async fn request_from_single_peer(
1706        &self,
1707        hash: &Hash,
1708        peer_id: &str,
1709        request_htl: u8,
1710        quote_id: Option<u64>,
1711    ) -> Option<Vec<u8>> {
1712        let hash_key = hash_to_key(hash);
1713        let (tx, rx) = oneshot::channel();
1714        self.pending_requests.write().await.insert(
1715            hash_key.clone(),
1716            PendingRequest {
1717                response_tx: tx,
1718                started_at: Instant::now(),
1719                queried_peers: vec![peer_id.to_string()],
1720            },
1721        );
1722
1723        let mut rx = rx;
1724        if !self
1725            .send_request_to_peer(peer_id, hash, request_htl, quote_id)
1726            .await
1727        {
1728            let _ = self.pending_requests.write().await.remove(&hash_key);
1729            return None;
1730        }
1731        self.reserve_peer_request(peer_id).await;
1732
1733        if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
1734            if hashtree_core::sha256(&data) == *hash {
1735                let _ = self.local_store.put(*hash, data.clone()).await;
1736                return Some(data);
1737            }
1738        }
1739
1740        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1741            self.release_queried_peer_requests(&pending.queried_peers)
1742                .await;
1743            for peer_id in pending.queried_peers {
1744                self.peer_selector.write().await.record_timeout(&peer_id);
1745            }
1746        }
1747        let _ = self.take_forward_requesters(&hash_key).await;
1748        None
1749    }
1750
1751    async fn request_from_ordered_peers(
1752        &self,
1753        hash: &Hash,
1754        ordered_peer_ids: &[String],
1755        request_htl: u8,
1756    ) -> RouteFetchOutcome {
1757        let hash_key = hash_to_key(hash);
1758        let (tx, rx) = oneshot::channel();
1759        self.pending_requests.write().await.insert(
1760            hash_key.clone(),
1761            PendingRequest {
1762                response_tx: tx,
1763                started_at: Instant::now(),
1764                queried_peers: Vec::new(),
1765            },
1766        );
1767
1768        let rx = Arc::new(Mutex::new(rx));
1769        let result = run_hedged_waves(
1770            ordered_peer_ids.len(),
1771            self.routing.dispatch,
1772            self.request_timeout,
1773            |range| {
1774                let wave_peer_ids = ordered_peer_ids[range].to_vec();
1775                let hash = *hash;
1776                let hash_key = hash_key.clone();
1777                async move {
1778                    let mut sent = 0usize;
1779                    for peer_id in wave_peer_ids {
1780                        if self
1781                            .send_request_to_peer(&peer_id, &hash, request_htl, None)
1782                            .await
1783                        {
1784                            sent += 1;
1785                            self.reserve_peer_request(&peer_id).await;
1786                            if let Some(pending) =
1787                                self.pending_requests.write().await.get_mut(&hash_key)
1788                            {
1789                                pending.queried_peers.push(peer_id);
1790                            }
1791                        }
1792                    }
1793                    sent
1794                }
1795            },
1796            |wait| {
1797                let rx = rx.clone();
1798                async move {
1799                    let mut rx = rx.lock().await;
1800                    match tokio::time::timeout(wait, &mut *rx).await {
1801                        Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == *hash => {
1802                            HedgedWaveAction::Success(data)
1803                        }
1804                        Ok(Ok(Some(_))) => HedgedWaveAction::Continue,
1805                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1806                        Err(_) => HedgedWaveAction::Continue,
1807                    }
1808                }
1809            },
1810        )
1811        .await;
1812
1813        let Some(data) = result else {
1814            if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1815                self.release_queried_peer_requests(&pending.queried_peers)
1816                    .await;
1817                for peer_id in pending.queried_peers {
1818                    self.peer_selector.write().await.record_timeout(&peer_id);
1819                }
1820            }
1821            let _ = self.take_forward_requesters(&hash_key).await;
1822            return RouteFetchOutcome::Timeout;
1823        };
1824
1825        let _ = self.local_store.put(*hash, data.clone()).await;
1826        RouteFetchOutcome::Hit(data)
1827    }
1828
1829    async fn request_from_read_sources_inner(&self, hash: &Hash) -> RouteFetchOutcome {
1830        let ordered_sources = self.ordered_read_sources().await;
1831        if ordered_sources.is_empty() {
1832            return RouteFetchOutcome::Miss;
1833        }
1834
1835        let dispatch = normalize_dispatch_config(
1836            self.source_dispatch_for(ordered_sources.len()).await,
1837            ordered_sources.len(),
1838        );
1839        let wave_plan = build_hedged_wave_plan(ordered_sources.len(), dispatch);
1840        if wave_plan.is_empty() {
1841            return RouteFetchOutcome::Miss;
1842        }
1843
1844        let deadline = Instant::now() + self.request_timeout;
1845        let mut pending = FuturesUnordered::new();
1846        let mut pending_source_ids = HashSet::new();
1847        let mut saw_timeout = false;
1848        let mut next_source_idx = 0usize;
1849
1850        for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
1851            let from = next_source_idx;
1852            let to = (next_source_idx + wave_size).min(ordered_sources.len());
1853            next_source_idx = to;
1854
1855            for source in ordered_sources[from..to].iter().cloned() {
1856                let source_id = source.id().to_string();
1857                self.record_read_source_request(&source_id).await;
1858                pending_source_ids.insert(source_id.clone());
1859                let hash = *hash;
1860                pending.push(tokio::spawn(async move {
1861                    let started_at = Instant::now();
1862                    let result = std::panic::AssertUnwindSafe(source.get(&hash))
1863                        .catch_unwind()
1864                        .await;
1865                    match result {
1866                        Ok(Some(data)) => SourceFetchOutcome::Hit {
1867                            source_id,
1868                            data,
1869                            elapsed_ms: started_at.elapsed().as_millis().max(1) as u64,
1870                        },
1871                        Ok(None) => SourceFetchOutcome::Miss { source_id },
1872                        Err(_) => SourceFetchOutcome::Failure { source_id },
1873                    }
1874                }));
1875            }
1876
1877            let is_last_wave =
1878                wave_idx + 1 == wave_plan.len() || next_source_idx >= ordered_sources.len();
1879            let window_end = if is_last_wave {
1880                deadline
1881            } else {
1882                (Instant::now() + Duration::from_millis(dispatch.hedge_interval_ms)).min(deadline)
1883            };
1884
1885            while Instant::now() < window_end {
1886                let remaining = window_end.saturating_duration_since(Instant::now());
1887                let Some(result) = tokio::time::timeout(remaining, pending.next())
1888                    .await
1889                    .ok()
1890                    .flatten()
1891                else {
1892                    break;
1893                };
1894                let Ok(outcome) = result else {
1895                    continue;
1896                };
1897                match outcome {
1898                    SourceFetchOutcome::Hit {
1899                        source_id,
1900                        data,
1901                        elapsed_ms,
1902                    } => {
1903                        pending_source_ids.remove(&source_id);
1904                        self.record_read_source_success(&source_id, elapsed_ms)
1905                            .await;
1906                        return RouteFetchOutcome::Hit(data);
1907                    }
1908                    SourceFetchOutcome::Miss { source_id } => {
1909                        pending_source_ids.remove(&source_id);
1910                        self.record_read_source_miss(&source_id).await;
1911                    }
1912                    SourceFetchOutcome::Failure { source_id } => {
1913                        pending_source_ids.remove(&source_id);
1914                        self.record_read_source_failure(&source_id).await;
1915                    }
1916                }
1917            }
1918
1919            if Instant::now() >= deadline {
1920                break;
1921            }
1922        }
1923
1924        for source_id in pending_source_ids {
1925            saw_timeout = true;
1926            self.record_read_source_timeout(&source_id).await;
1927        }
1928        if saw_timeout {
1929            RouteFetchOutcome::Timeout
1930        } else {
1931            RouteFetchOutcome::Miss
1932        }
1933    }
1934
1935    async fn request_from_read_sources(&self, hash: &Hash) -> RouteFetchOutcome {
1936        let hash_key = hash_to_key(hash);
1937        let existing_wait = {
1938            let mut inflight = self.inflight_source_fetches.lock().await;
1939            if let Some(existing) = inflight.get_mut(&hash_key) {
1940                let (tx, rx) = oneshot::channel();
1941                existing.waiters.push(tx);
1942                Some(rx)
1943            } else {
1944                inflight.insert(
1945                    hash_key.clone(),
1946                    InflightSourceFetch {
1947                        waiters: Vec::new(),
1948                    },
1949                );
1950                None
1951            }
1952        };
1953
1954        if let Some(wait) = existing_wait {
1955            return wait.await.unwrap_or(RouteFetchOutcome::Timeout);
1956        }
1957
1958        let result = self.request_from_read_sources_inner(hash).await;
1959        if let RouteFetchOutcome::Hit(hit) = &result {
1960            let _ = self.local_store.put(*hash, hit.clone()).await;
1961        }
1962
1963        let waiters = self
1964            .inflight_source_fetches
1965            .lock()
1966            .await
1967            .remove(&hash_key)
1968            .map(|inflight| inflight.waiters)
1969            .unwrap_or_default();
1970        for waiter in waiters {
1971            let _ = waiter.send(result.clone());
1972        }
1973
1974        result
1975    }
1976
1977    async fn ranked_read_routes(&self, context: &MeshReadContext) -> Vec<RankedReadRoute> {
1978        let mut routes = Vec::new();
1979        let ordered_peers = if should_forward_htl(context.request_htl) {
1980            self.ordered_connected_peers(context.exclude_peer_id.as_deref())
1981                .await
1982        } else {
1983            Vec::new()
1984        };
1985        if !ordered_peers.is_empty() {
1986            let best_peer_id = ordered_peers[0].clone();
1987            let selector = self.peer_selector.read().await;
1988            let best_peer = selector.get_stats(&best_peer_id).cloned();
1989            let now = Instant::now();
1990            let (score, has_history) = match best_peer.as_ref() {
1991                Some(stats) => (
1992                    peer_endpoint_score(stats, now),
1993                    peer_endpoint_has_history(stats),
1994                ),
1995                None => (0.0, false),
1996            };
1997            routes.push(RankedReadRoute {
1998                route: ReadRoute::Peers(ordered_peers),
1999                best_endpoint_id: format!("peer:{best_peer_id}"),
2000                score,
2001                has_history,
2002            });
2003        }
2004        let ordered_sources = self.ordered_read_sources().await;
2005        if let Some(best_source) = ordered_sources.first() {
2006            let stats = self.read_source_stats.read().await;
2007            let best_source_stats = stats.get(best_source.id()).cloned().unwrap_or_default();
2008            let now = Instant::now();
2009            routes.push(RankedReadRoute {
2010                route: ReadRoute::Sources,
2011                best_endpoint_id: format!("source:{}", best_source.id()),
2012                score: adaptive_source_score(&best_source_stats, now),
2013                has_history: source_has_history(&best_source_stats),
2014            });
2015        }
2016        if routes.len() <= 1 {
2017            return routes;
2018        }
2019
2020        routes.sort_by(|left, right| {
2021            right
2022                .score
2023                .partial_cmp(&left.score)
2024                .unwrap_or(std::cmp::Ordering::Equal)
2025                .then_with(|| ranked_route_kind(&left.route).cmp(&ranked_route_kind(&right.route)))
2026                .then_with(|| left.best_endpoint_id.cmp(&right.best_endpoint_id))
2027                .then_with(|| left.route.id().cmp(right.route.id()))
2028        });
2029        routes
2030    }
2031
2032    fn should_probe_multiple_routes(&self, routes: &[RankedReadRoute]) -> bool {
2033        if routes.len() <= 1 {
2034            return false;
2035        }
2036        if !routes[0].has_history || !routes[1].has_history {
2037            return false;
2038        }
2039        (routes[0].score - routes[1].score) < SOURCE_SCORE_TIE_DELTA
2040    }
2041
2042    async fn run_read_route(
2043        &self,
2044        hash: &Hash,
2045        route: &ReadRoute,
2046        context: &MeshReadContext,
2047    ) -> RouteFetchOutcome {
2048        match route {
2049            ReadRoute::Peers(peer_ids) => {
2050                self.request_from_ordered_peers(hash, peer_ids, context.request_htl)
2051                    .await
2052            }
2053            ReadRoute::Sources => self.request_from_read_sources(hash).await,
2054        }
2055    }
2056
2057    async fn request_from_mesh_with_context(
2058        &self,
2059        hash: &Hash,
2060        context: &MeshReadContext,
2061    ) -> Option<Vec<u8>> {
2062        let routes = self.ranked_read_routes(context).await;
2063        match routes.as_slice() {
2064            [] => None,
2065            [ranked] => match self.run_read_route(hash, &ranked.route, context).await {
2066                RouteFetchOutcome::Hit(data) => Some(data),
2067                RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
2068            },
2069            [first, second, ..] => {
2070                if self.should_probe_multiple_routes(&routes) {
2071                    let first_fut = self.run_read_route(hash, &first.route, context);
2072                    let second_fut = self.run_read_route(hash, &second.route, context);
2073                    tokio::pin!(first_fut);
2074                    tokio::pin!(second_fut);
2075                    let mut first_done = false;
2076                    let mut second_done = false;
2077                    loop {
2078                        tokio::select! {
2079                            result = &mut first_fut, if !first_done => {
2080                                first_done = true;
2081                                if let RouteFetchOutcome::Hit(data) = result {
2082                                    return Some(data);
2083                                }
2084                            }
2085                            result = &mut second_fut, if !second_done => {
2086                                second_done = true;
2087                                if let RouteFetchOutcome::Hit(data) = result {
2088                                    return Some(data);
2089                                }
2090                            }
2091                            else => break,
2092                        }
2093                        if first_done && second_done {
2094                            break;
2095                        }
2096                    }
2097                    None
2098                } else {
2099                    match self.run_read_route(hash, &first.route, context).await {
2100                        RouteFetchOutcome::Hit(data) => return Some(data),
2101                        RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
2102                    }
2103                    for ranked in routes.iter().skip(1) {
2104                        match self.run_read_route(hash, &ranked.route, context).await {
2105                            RouteFetchOutcome::Hit(data) => return Some(data),
2106                            RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
2107                        }
2108                    }
2109                    None
2110                }
2111            }
2112        }
2113    }
2114
2115    async fn request_from_mesh(&self, hash: &Hash) -> Option<Vec<u8>> {
2116        self.request_from_mesh_with_context(hash, &MeshReadContext::default())
2117            .await
2118    }
2119
2120    async fn begin_forward_request(&self, hash_key: &str, requester_id: &str) -> bool {
2121        let mut pending = self.pending_forward_requests.write().await;
2122        if let Some(existing) = pending.get_mut(hash_key) {
2123            existing.requester_ids.insert(requester_id.to_string());
2124            return false;
2125        }
2126
2127        let mut requester_ids = HashSet::new();
2128        requester_ids.insert(requester_id.to_string());
2129        pending.insert(
2130            hash_key.to_string(),
2131            PendingForwardRequest { requester_ids },
2132        );
2133        true
2134    }
2135
2136    async fn was_recent_forward_miss(&self, hash_key: &str) -> bool {
2137        self.recent_forward_misses.lock().await.contains(hash_key)
2138    }
2139
2140    async fn mark_recent_forward_miss(&self, hash_key: &str) {
2141        let _ = self
2142            .recent_forward_misses
2143            .lock()
2144            .await
2145            .insert_if_new(hash_key.to_string());
2146    }
2147
2148    async fn take_forward_requesters(&self, hash_key: &str) -> Vec<String> {
2149        self.pending_forward_requests
2150            .write()
2151            .await
2152            .remove(hash_key)
2153            .map(|pending| pending.requester_ids.into_iter().collect())
2154            .unwrap_or_default()
2155    }
2156
2157    async fn complete_pending_response(
2158        self: &Arc<Self>,
2159        from_peer: &str,
2160        hash: &Hash,
2161        hash_key: String,
2162        payload: Vec<u8>,
2163    ) {
2164        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2165            self.release_queried_peer_requests(&pending.queried_peers)
2166                .await;
2167            let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
2168            self.peer_selector.write().await.record_success(
2169                from_peer,
2170                rtt_ms,
2171                payload.len() as u64,
2172            );
2173            let forward_requesters = self.take_forward_requesters(&hash_key).await;
2174            let response_bytes = if forward_requesters.is_empty() {
2175                None
2176            } else {
2177                Some(encode_response(&create_response(hash, payload.clone())))
2178            };
2179            let _ = pending.response_tx.send(Some(payload));
2180            if let Some(response_bytes) = response_bytes {
2181                for requester_id in forward_requesters {
2182                    Arc::clone(&self)
2183                        .enqueue_response_send(requester_id, response_bytes.clone(), Instant::now())
2184                        .await;
2185                }
2186            }
2187        }
2188    }
2189
2190    async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
2191        if !res.a {
2192            return;
2193        }
2194
2195        let Some(quote_id) = res.q else {
2196            return;
2197        };
2198
2199        let hash_key = hash_to_key(&res.h);
2200        let (preferred_mint_url, offered_payment_sat) = {
2201            let pending_quotes = self.pending_quotes.read().await;
2202            let Some(pending) = pending_quotes.get(&hash_key) else {
2203                return;
2204            };
2205            (
2206                pending.preferred_mint_url.clone(),
2207                pending.offered_payment_sat,
2208            )
2209        };
2210        if !self
2211            .should_accept_quote_response(
2212                from_peer,
2213                preferred_mint_url.as_deref(),
2214                offered_payment_sat,
2215                &res,
2216            )
2217            .await
2218        {
2219            return;
2220        }
2221        let mut pending_quotes = self.pending_quotes.write().await;
2222        if let Some(pending) = pending_quotes.remove(&hash_key) {
2223            let _ = pending.response_tx.send(Some(NegotiatedQuote {
2224                peer_id: from_peer.to_string(),
2225                quote_id,
2226                mint_url: res.m,
2227            }));
2228        }
2229    }
2230
2231    async fn handle_response_message(
2232        self: &Arc<Self>,
2233        from_peer: &str,
2234        res: crate::protocol::DataResponse,
2235    ) {
2236        let hash_key = hash_to_key(&res.h);
2237        let hash = match crate::protocol::bytes_to_hash(&res.h) {
2238            Some(h) => h,
2239            None => return,
2240        };
2241
2242        // Ignore malformed/corrupt payload and keep waiting for a valid response.
2243        if hashtree_core::sha256(&res.d) != hash {
2244            self.peer_selector.write().await.record_failure(from_peer);
2245            if self.debug {
2246                println!("[MeshStoreCore] Ignoring invalid response payload for {hash_key}");
2247            }
2248            return;
2249        }
2250
2251        self.complete_pending_response(from_peer, &hash, hash_key, res.d)
2252            .await;
2253    }
2254
2255    async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
2256        let hash = match crate::protocol::bytes_to_hash(&req.h) {
2257            Some(h) => h,
2258            None => return,
2259        };
2260        let hash_key = hash_to_key(&hash);
2261
2262        {
2263            let selector = self.peer_selector.read().await;
2264            if self.should_refuse_requests_from_peer(&selector, from_peer) {
2265                if self.debug {
2266                    println!(
2267                        "[MeshStoreCore] Refusing quote request from delinquent peer {}",
2268                        from_peer
2269                    );
2270                }
2271                return;
2272            }
2273        }
2274
2275        let chosen_mint = self.choose_quote_mint(req.m.as_deref());
2276        let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
2277            && !self.should_drop_response(&hash)
2278            && !self.should_corrupt_response(&hash);
2279
2280        let res = if can_serve {
2281            let quote_id = self
2282                .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
2283                .await;
2284            create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
2285        } else {
2286            create_quote_response_unavailable(&hash)
2287        };
2288        let response_bytes = encode_quote_response(&res);
2289        if let Some(channel) = self.signaling.get_channel(from_peer).await {
2290            if channel.send(response_bytes.clone()).await.is_ok() {
2291                self.record_peer_wire_sent(from_peer, response_bytes.len() as u64)
2292                    .await;
2293            }
2294        }
2295    }
2296
2297    async fn handle_request_message(
2298        self: &Arc<Self>,
2299        from_peer: &str,
2300        req: crate::protocol::DataRequest,
2301    ) {
2302        let hash = match crate::protocol::bytes_to_hash(&req.h) {
2303            Some(h) => h,
2304            None => return,
2305        };
2306        let hash_key = hash_to_key(&hash);
2307
2308        if let Some(quote_id) = req.q {
2309            if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
2310                if self.debug {
2311                    println!(
2312                        "[MeshStoreCore] Refusing request with invalid or expired quote {} from {}",
2313                        quote_id, from_peer
2314                    );
2315                }
2316                return;
2317            }
2318        }
2319
2320        let allow_peer_forwarding = {
2321            let selector = self.peer_selector.read().await;
2322            !self.should_refuse_requests_from_peer(&selector, from_peer)
2323        };
2324
2325        // Check local store
2326        if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
2327            if self.should_drop_response(&hash) {
2328                if self.debug {
2329                    println!(
2330                        "[MeshStoreCore] Dropping response for {} due to actor profile",
2331                        hash_to_key(&hash)
2332                    );
2333                }
2334                return;
2335            }
2336
2337            let response_delay = self.response_send_delay(&hash, data.len());
2338            if self.should_corrupt_response(&hash) {
2339                if data.is_empty() {
2340                    data.push(0x80);
2341                } else {
2342                    data[0] ^= 0x80;
2343                }
2344            }
2345
2346            // Send response
2347            let res = create_response(&hash, data);
2348            let response_bytes = encode_response(&res);
2349            let ready_at = Instant::now() + response_delay;
2350            Arc::clone(self)
2351                .enqueue_response_send(from_peer.to_string(), response_bytes, ready_at)
2352                .await;
2353            return;
2354        }
2355
2356        if self.pending_requests.read().await.contains_key(&hash_key) {
2357            let _ = self.begin_forward_request(&hash_key, from_peer).await;
2358            return;
2359        }
2360
2361        if self.was_recent_forward_miss(&hash_key).await {
2362            if self.debug {
2363                println!(
2364                    "[MeshStoreCore] Suppressing recently missed forwarded request for {}",
2365                    hash_key
2366                );
2367            }
2368            return;
2369        }
2370
2371        if !self.begin_forward_request(&hash_key, from_peer).await {
2372            return;
2373        }
2374
2375        let from_peer = from_peer.to_string();
2376        let this = Arc::clone(self);
2377        let request_htl = req.htl;
2378        tokio::spawn(async move {
2379            let result = if allow_peer_forwarding {
2380                let context = MeshReadContext {
2381                    exclude_peer_id: Some(from_peer.clone()),
2382                    request_htl,
2383                };
2384                this.request_from_mesh_with_context(&hash, &context).await
2385            } else {
2386                if this.debug {
2387                    println!(
2388                        "[MeshStoreCore] Serving request from delinquent peer {} via read sources only",
2389                        from_peer
2390                    );
2391                }
2392                match this.request_from_read_sources(&hash).await {
2393                    RouteFetchOutcome::Hit(data) => Some(data),
2394                    RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
2395                }
2396            };
2397            let requester_ids = this.take_forward_requesters(&hash_key).await;
2398            if let Some(data) = result {
2399                let ready_at = Instant::now() + this.response_send_delay(&hash, data.len());
2400                let res = create_response(&hash, data);
2401                let response_bytes = encode_response(&res);
2402                for requester_id in requester_ids {
2403                    Arc::clone(&this)
2404                        .enqueue_response_send(requester_id, response_bytes.clone(), ready_at)
2405                        .await;
2406                }
2407            } else {
2408                this.mark_recent_forward_miss(&hash_key).await;
2409            }
2410        });
2411    }
2412
2413    /// Handle incoming data message
2414    pub async fn handle_data_message(self: &Arc<Self>, from_peer: &str, data: &[u8]) {
2415        self.record_peer_wire_received(from_peer, data.len() as u64)
2416            .await;
2417        let parsed = match parse_message(data) {
2418            Some(m) => m,
2419            None => return,
2420        };
2421
2422        match parsed {
2423            DataMessage::Request(req) => {
2424                self.handle_request_message(from_peer, req).await;
2425            }
2426            DataMessage::Response(res) => {
2427                self.handle_response_message(from_peer, res).await;
2428            }
2429            DataMessage::QuoteRequest(req) => {
2430                self.handle_quote_request_message(from_peer, req).await;
2431            }
2432            DataMessage::QuoteResponse(res) => {
2433                self.handle_quote_response_message(from_peer, res).await;
2434            }
2435            DataMessage::Payment(_) | DataMessage::PaymentAck(_) | DataMessage::Chunk(_) => {}
2436        }
2437    }
2438}
2439
2440#[async_trait]
2441impl<S, R, F> Store for MeshStoreCore<S, R, F>
2442where
2443    S: Store + Send + Sync + 'static,
2444    R: SignalingTransport + Send + Sync + 'static,
2445    F: PeerLinkFactory + Send + Sync + 'static,
2446{
2447    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2448        self.local_store.put(hash, data).await
2449    }
2450
2451    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2452        // Try local first
2453        if let Some(data) = self.local_store.get(hash).await? {
2454            return Ok(Some(data));
2455        }
2456
2457        // Try peers
2458        Ok(self.request_from_mesh(hash).await)
2459    }
2460
2461    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2462        self.local_store.has(hash).await
2463    }
2464
2465    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2466        self.local_store.delete(hash).await
2467    }
2468}
2469
2470#[cfg(test)]
2471mod tests;
2472
2473/// Type alias for simulation store.
2474pub type SimMeshStore<S> =
2475    MeshStoreCore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
2476
2477/// Type alias for the default production core (Nostr signaling + WebRTC links).
2478pub type ProductionMeshStore<S> =
2479    MeshStoreCore<S, crate::nostr::NostrRelayTransport, crate::real_factory::WebRtcPeerLinkFactory>;