Skip to main content

hashtree_network/
mesh_store_core.rs

1//! Shared routed mesh store core.
2//!
3//! This module provides a concrete store wrapper that works with any local storage
4//! backend plus any signaling transport and peer-link factory. Both production
5//! (Nostr websockets + WebRTC) and simulation (mocks) use this same code.
6
7use async_trait::async_trait;
8use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
9use std::collections::hash_map::DefaultHasher;
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::hash::{Hash as _, Hasher};
13use std::ops::Range;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::{oneshot, Mutex, RwLock};
17
18use hashtree_core::{Hash, Store, StoreError};
19
20use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
21use crate::protocol::{
22    create_quote_request, create_quote_response_available, create_quote_response_unavailable,
23    create_request, create_request_with_quote, create_response, encode_quote_request,
24    encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
25    DataMessage, DataQuoteRequest, DataQuoteResponse,
26};
27use crate::signaling::MeshRouter;
28use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
29use crate::types::{PeerHTLConfig, SignalingMessage, MAX_HTL};
30
31// Keep the on-disk namespace stable across the crate rename so existing peer
32// metadata does not disappear for users upgrading from the old package name.
33const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
34
35/// Pending request awaiting response
36struct PendingRequest {
37    response_tx: oneshot::Sender<Option<Vec<u8>>>,
38    started_at: Instant,
39    queried_peers: Vec<String>,
40}
41
42struct PendingQuoteRequest {
43    response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
44    preferred_mint_url: Option<String>,
45    offered_payment_sat: u64,
46}
47
48struct PendingForwardRequest {
49    requester_ids: HashSet<String>,
50}
51
52#[async_trait]
53pub trait MeshReadSource: Send + Sync {
54    fn id(&self) -> &str;
55
56    fn is_available(&self) -> bool {
57        true
58    }
59
60    async fn get(&self, hash: &Hash) -> Option<Vec<u8>>;
61}
62
63#[derive(Debug, Clone)]
64struct NegotiatedQuote {
65    peer_id: String,
66    quote_id: u64,
67    #[allow(dead_code)]
68    mint_url: Option<String>,
69}
70
71struct IssuedQuote {
72    expires_at: Instant,
73    #[allow(dead_code)]
74    payment_sat: u64,
75    #[allow(dead_code)]
76    mint_url: Option<String>,
77}
78
79#[derive(Debug, Clone, Default)]
80struct AdaptiveSourceStats {
81    requests: u64,
82    successes: u64,
83    misses: u64,
84    failures: u64,
85    timeouts: u64,
86    srtt_ms: f64,
87    rttvar_ms: f64,
88    backoff_level: u32,
89    backed_off_until: Option<Instant>,
90    last_success_at: Option<Instant>,
91    last_failure_at: Option<Instant>,
92}
93
94#[derive(Debug, Clone)]
95enum RouteFetchOutcome {
96    Hit(Vec<u8>),
97    Miss,
98    Timeout,
99}
100
101struct InflightSourceFetch {
102    waiters: Vec<oneshot::Sender<RouteFetchOutcome>>,
103}
104
105enum SourceFetchOutcome {
106    Hit {
107        source_id: String,
108        data: Vec<u8>,
109        elapsed_ms: u64,
110    },
111    Miss {
112        source_id: String,
113    },
114    Failure {
115        source_id: String,
116    },
117}
118
119const INITIAL_SOURCE_BACKOFF_MS: u64 = 250;
120const MAX_SOURCE_BACKOFF_MS: u64 = 10_000;
121const SOURCE_SCORE_TIE_DELTA: f64 = 0.15;
122const RECENT_SOURCE_SUCCESS_WINDOW: Duration = Duration::from_secs(60);
123const ACTIVE_PEER_REQUEST_RANK_PENALTY: usize = 3;
124
125fn source_reliability_score(stats: &AdaptiveSourceStats) -> f64 {
126    (stats.successes as f64 + 1.0) / (stats.requests as f64 + 2.0)
127}
128
129fn source_latency_score(stats: &AdaptiveSourceStats) -> f64 {
130    if stats.srtt_ms <= 0.0 {
131        return 0.5;
132    }
133    (500.0 / (stats.srtt_ms + 50.0)).min(1.0)
134}
135
136fn source_has_history(stats: &AdaptiveSourceStats) -> bool {
137    stats.requests > 0
138        || stats.successes > 0
139        || stats.misses > 0
140        || stats.failures > 0
141        || stats.timeouts > 0
142}
143
144fn adaptive_source_score(stats: &AdaptiveSourceStats, now: Instant) -> f64 {
145    if let Some(backed_off_until) = stats.backed_off_until {
146        if backed_off_until > now {
147            return f64::NEG_INFINITY;
148        }
149    }
150
151    let miss_penalty = if stats.requests > 0 {
152        (stats.misses as f64 / stats.requests as f64) * 0.15
153    } else {
154        0.0
155    };
156    let failure_penalty = if stats.requests > 0 {
157        ((stats.failures + stats.timeouts) as f64 / stats.requests as f64) * 0.3
158    } else {
159        0.0
160    };
161    let recency_bonus = if stats
162        .last_success_at
163        .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
164    {
165        0.1
166    } else {
167        0.0
168    };
169
170    0.6 * source_reliability_score(stats) + 0.3 * source_latency_score(stats) + recency_bonus
171        - miss_penalty
172        - failure_penalty
173}
174
175#[derive(Clone)]
176enum ReadRoute {
177    Peers(Vec<String>),
178    Sources,
179}
180
181impl ReadRoute {
182    fn id(&self) -> &'static str {
183        match self {
184            Self::Peers(_) => "peers",
185            Self::Sources => "sources",
186        }
187    }
188}
189
190#[derive(Debug, Clone)]
191struct MeshReadContext {
192    exclude_peer_id: Option<String>,
193    request_htl: u8,
194}
195
196impl Default for MeshReadContext {
197    fn default() -> Self {
198        Self {
199            exclude_peer_id: None,
200            request_htl: MAX_HTL,
201        }
202    }
203}
204
205/// Aggregate stats from draining currently available peer-link messages.
206#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
207pub struct DataPumpStats {
208    pub processed: usize,
209    pub request_messages: usize,
210    pub response_messages: usize,
211    pub quote_request_messages: u64,
212    pub quote_response_messages: u64,
213    pub processed_bytes: u64,
214}
215
216/// Request dispatch strategy for peer queries.
217///
218/// `MeshStoreCore` supports two practical retrieval modes:
219/// - Flood (`usize::MAX` fanout): maximize success/latency at bandwidth cost.
220/// - Staged hedging: probe a subset first, then expand.
221#[derive(Debug, Clone, Copy)]
222pub struct RequestDispatchConfig {
223    /// Number of peers queried immediately.
224    pub initial_fanout: usize,
225    /// Number of additional peers to query on each hedge step.
226    pub hedge_fanout: usize,
227    /// Total peers allowed for this request.
228    pub max_fanout: usize,
229    /// Delay between hedge waves (ms). `0` means send all waves immediately.
230    pub hedge_interval_ms: u64,
231}
232
233impl Default for RequestDispatchConfig {
234    fn default() -> Self {
235        Self {
236            initial_fanout: usize::MAX,
237            hedge_fanout: usize::MAX,
238            max_fanout: usize::MAX,
239            hedge_interval_ms: 0,
240        }
241    }
242}
243
244/// Normalize fanout config against current peer availability.
245pub fn normalize_dispatch_config(
246    dispatch: RequestDispatchConfig,
247    available_peers: usize,
248) -> RequestDispatchConfig {
249    let mut cfg = dispatch;
250    let cap = if cfg.max_fanout == 0 {
251        available_peers
252    } else {
253        cfg.max_fanout.min(available_peers)
254    };
255    cfg.max_fanout = cap;
256    cfg.initial_fanout = if cfg.initial_fanout == 0 {
257        1
258    } else {
259        cfg.initial_fanout.min(cap.max(1))
260    };
261    cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
262        1
263    } else {
264        cfg.hedge_fanout.min(cap.max(1))
265    };
266    cfg
267}
268
269/// Build wave sizes for staged hedged dispatch.
270pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
271    if peer_count == 0 {
272        return Vec::new();
273    }
274    let cap = dispatch.max_fanout.min(peer_count);
275    if cap == 0 {
276        return Vec::new();
277    }
278
279    let mut plan = Vec::new();
280    let mut sent = 0usize;
281    let first = dispatch.initial_fanout.min(cap).max(1);
282    plan.push(first);
283    sent += first;
284
285    while sent < cap {
286        let next = dispatch.hedge_fanout.min(cap - sent).max(1);
287        plan.push(next);
288        sent += next;
289    }
290    plan
291}
292
293/// Outcome returned after waiting on a hedged dispatch wave.
294#[derive(Debug)]
295pub enum HedgedWaveAction<T> {
296    Continue,
297    Success(T),
298    Abort,
299}
300
301/// Run a staged hedged dispatch over peer index ranges.
302///
303/// This scheduler is shared by the reusable `MeshStoreCore` and the native
304/// `hashtree-cli` mesh path so tests and production use the same wave timing.
305pub async fn run_hedged_waves<T, SendWave, SendWaveFut, WaitWave, WaitWaveFut>(
306    peer_count: usize,
307    dispatch: RequestDispatchConfig,
308    request_timeout: Duration,
309    mut send_wave: SendWave,
310    mut wait_wave: WaitWave,
311) -> Option<T>
312where
313    SendWave: FnMut(Range<usize>) -> SendWaveFut,
314    SendWaveFut: Future<Output = usize>,
315    WaitWave: FnMut(Duration) -> WaitWaveFut,
316    WaitWaveFut: Future<Output = HedgedWaveAction<T>>,
317{
318    let dispatch = normalize_dispatch_config(dispatch, peer_count);
319    let wave_plan = build_hedged_wave_plan(peer_count, dispatch);
320    if wave_plan.is_empty() {
321        return None;
322    }
323
324    let deadline = Instant::now() + request_timeout;
325    let mut sent_total = 0usize;
326    let mut next_peer_idx = 0usize;
327
328    for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
329        let from = next_peer_idx;
330        let to = (next_peer_idx + wave_size).min(peer_count);
331        next_peer_idx = to;
332
333        if from == to {
334            continue;
335        }
336
337        sent_total += send_wave(from..to).await;
338        if sent_total == 0 {
339            if next_peer_idx >= peer_count {
340                break;
341            }
342            continue;
343        }
344
345        let now = Instant::now();
346        if now >= deadline {
347            break;
348        }
349        let remaining = deadline.saturating_duration_since(now);
350        let is_last_wave = wave_idx + 1 == wave_plan.len() || next_peer_idx >= peer_count;
351        let wait = if is_last_wave {
352            remaining
353        } else if dispatch.hedge_interval_ms == 0 {
354            Duration::ZERO
355        } else {
356            Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
357        };
358
359        if wait.is_zero() {
360            continue;
361        }
362
363        match wait_wave(wait).await {
364            HedgedWaveAction::Continue => {}
365            HedgedWaveAction::Success(value) => return Some(value),
366            HedgedWaveAction::Abort => break,
367        }
368    }
369
370    None
371}
372
373/// Keep selector membership aligned with currently connected peer IDs.
374pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
375    let mut selector = selector.write().await;
376    let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
377    let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
378    for peer_id in known {
379        if !current.contains(peer_id.as_str()) {
380            selector.remove_peer(&peer_id);
381        }
382    }
383    for peer_id in current_peer_ids {
384        selector.add_peer(peer_id.clone());
385    }
386}
387
388/// Response behavior profile for simulation/game-theory actors.
389///
390/// Defaults to honest behavior (always respond correctly, no extra delay).
391#[derive(Debug, Clone, Copy)]
392pub struct ResponseBehaviorConfig {
393    /// Probability that a node drops a response even when it has data.
394    pub drop_response_prob: f64,
395    /// Probability that a node responds with corrupted payload.
396    pub corrupt_response_prob: f64,
397    /// Baseline response delay before a peer starts sending any data.
398    pub extra_delay_ms: u64,
399    /// Additional delay before the first response byte becomes available.
400    pub first_byte_delay_ms: u64,
401    /// Sustained throughput for delivering large payloads. `0` disables size-based slowdown.
402    pub bytes_per_second: u64,
403    /// Probability that an otherwise honest response experiences an extra stall.
404    pub stall_response_prob: f64,
405    /// Extra delay injected when a stall event happens.
406    pub stall_delay_ms: u64,
407}
408
409impl Default for ResponseBehaviorConfig {
410    fn default() -> Self {
411        Self {
412            drop_response_prob: 0.0,
413            corrupt_response_prob: 0.0,
414            extra_delay_ms: 0,
415            first_byte_delay_ms: 0,
416            bytes_per_second: 0,
417            stall_response_prob: 0.0,
418            stall_delay_ms: 0,
419        }
420    }
421}
422
423impl ResponseBehaviorConfig {
424    fn normalized(self) -> Self {
425        Self {
426            drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
427            corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
428            extra_delay_ms: self.extra_delay_ms,
429            first_byte_delay_ms: self.first_byte_delay_ms,
430            bytes_per_second: self.bytes_per_second,
431            stall_response_prob: self.stall_response_prob.clamp(0.0, 1.0),
432            stall_delay_ms: self.stall_delay_ms,
433        }
434    }
435}
436
437/// Routing policy for request ordering + dispatch fanout.
438#[derive(Debug, Clone)]
439pub struct MeshRoutingConfig {
440    pub selection_strategy: SelectionStrategy,
441    pub fairness_enabled: bool,
442    /// Blend weight for payment-priority ranking in selector (`0.0` disables).
443    pub cashu_payment_weight: f64,
444    /// Refuse serving peers that have reached this many unpaid post-delivery settlements.
445    /// `0` disables refusal and only keeps metadata/downranking.
446    pub cashu_payment_default_block_threshold: u64,
447    /// Cashu mint URLs this node is willing to use for settlement.
448    pub cashu_accepted_mints: Vec<String>,
449    /// Preferred Cashu mint URL when initiating paid retrieval.
450    pub cashu_default_mint: Option<String>,
451    /// Baseline cap for accepting a peer-suggested mint outside the trusted set.
452    pub cashu_peer_suggested_mint_base_cap_sat: u64,
453    /// Additional sats allowed per successful delivery from that peer.
454    pub cashu_peer_suggested_mint_success_step_sat: u64,
455    /// Additional sats allowed per successful post-delivery payment received from that peer.
456    pub cashu_peer_suggested_mint_receipt_step_sat: u64,
457    /// Hard upper bound for any single peer-suggested mint quote we accept.
458    pub cashu_peer_suggested_mint_max_cap_sat: u64,
459    pub dispatch: RequestDispatchConfig,
460    pub response_behavior: ResponseBehaviorConfig,
461}
462
463impl Default for MeshRoutingConfig {
464    fn default() -> Self {
465        Self {
466            selection_strategy: SelectionStrategy::Weighted,
467            fairness_enabled: true,
468            cashu_payment_weight: 0.0,
469            cashu_payment_default_block_threshold: 0,
470            cashu_accepted_mints: Vec::new(),
471            cashu_default_mint: None,
472            cashu_peer_suggested_mint_base_cap_sat: 0,
473            cashu_peer_suggested_mint_success_step_sat: 0,
474            cashu_peer_suggested_mint_receipt_step_sat: 0,
475            cashu_peer_suggested_mint_max_cap_sat: 0,
476            dispatch: RequestDispatchConfig::default(),
477            response_behavior: ResponseBehaviorConfig::default(),
478        }
479    }
480}
481
482/// Routed mesh store core that works with any storage backend and transport
483/// implementation.
484///
485/// This is the shared code between production and simulation.
486/// - Production: `MeshStoreCore<LmdbStore, NostrRelayTransport, WebRtcPeerLinkFactory>`
487/// - Simulation: `MeshStoreCore<MemoryStore, MockRelayTransport, MockConnectionFactory>`
488pub struct MeshStoreCore<S, R, F>
489where
490    S: Store + Send + Sync + 'static,
491    R: SignalingTransport + Send + Sync + 'static,
492    F: PeerLinkFactory + Send + Sync + 'static,
493{
494    /// Local backing store
495    local_store: Arc<S>,
496    /// Mesh router (handles peer discovery and connection)
497    signaling: Arc<MeshRouter<R, F>>,
498    /// Per-peer HTL config
499    htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
500    /// Pending requests we sent
501    pending_requests: RwLock<HashMap<String, PendingRequest>>,
502    /// Pending quote negotiations keyed by requested hash.
503    pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
504    /// Forwarded peer requests currently being resolved through the mesh/upstream.
505    pending_forward_requests: RwLock<HashMap<String, PendingForwardRequest>>,
506    /// Quotes we issued to peers and will accept exactly once until expiry.
507    issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
508    /// Monotonic quote identifier generator.
509    next_quote_id: RwLock<u64>,
510    /// Non-peer read sources such as upstream Blossom servers.
511    read_sources: RwLock<HashMap<String, Arc<dyn MeshReadSource>>>,
512    /// Adaptive health stats for non-peer read sources.
513    read_source_stats: RwLock<HashMap<String, AdaptiveSourceStats>>,
514    /// Shared in-flight upstream reads keyed by hash.
515    inflight_source_fetches: Mutex<HashMap<String, InflightSourceFetch>>,
516    /// Adaptive route stats for choosing peers vs upstream sources.
517    read_route_stats: RwLock<HashMap<String, AdaptiveSourceStats>>,
518    /// Adaptive selector for peer ordering.
519    peer_selector: RwLock<PeerSelector>,
520    /// Active per-peer in-flight reads so concurrent block fetches spread across peers.
521    peer_active_requests: RwLock<HashMap<String, usize>>,
522    /// Routing/dispatch configuration.
523    routing: MeshRoutingConfig,
524    /// Request timeout
525    request_timeout: Duration,
526    /// Debug mode
527    debug: bool,
528    /// Running flag
529    running: RwLock<bool>,
530}
531
532impl<S, R, F> MeshStoreCore<S, R, F>
533where
534    S: Store + Send + Sync + 'static,
535    R: SignalingTransport + Send + Sync + 'static,
536    F: PeerLinkFactory + Send + Sync + 'static,
537{
538    /// Create a new routed mesh store core.
539    pub fn new(
540        local_store: Arc<S>,
541        signaling: Arc<MeshRouter<R, F>>,
542        request_timeout: Duration,
543        debug: bool,
544    ) -> Self {
545        Self::new_with_routing(
546            local_store,
547            signaling,
548            request_timeout,
549            debug,
550            Default::default(),
551        )
552    }
553
554    /// Create a new routed mesh store core with explicit routing configuration.
555    pub fn new_with_routing(
556        local_store: Arc<S>,
557        signaling: Arc<MeshRouter<R, F>>,
558        request_timeout: Duration,
559        debug: bool,
560        routing: MeshRoutingConfig,
561    ) -> Self {
562        let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
563        selector.set_fairness(routing.fairness_enabled);
564        selector.set_cashu_payment_weight(routing.cashu_payment_weight);
565        Self {
566            local_store,
567            signaling,
568            htl_configs: RwLock::new(HashMap::new()),
569            pending_requests: RwLock::new(HashMap::new()),
570            pending_quotes: RwLock::new(HashMap::new()),
571            pending_forward_requests: RwLock::new(HashMap::new()),
572            issued_quotes: RwLock::new(HashMap::new()),
573            next_quote_id: RwLock::new(1),
574            read_sources: RwLock::new(HashMap::new()),
575            read_source_stats: RwLock::new(HashMap::new()),
576            inflight_source_fetches: Mutex::new(HashMap::new()),
577            read_route_stats: RwLock::new(HashMap::new()),
578            peer_selector: RwLock::new(selector),
579            peer_active_requests: RwLock::new(HashMap::new()),
580            routing,
581            request_timeout,
582            debug,
583            running: RwLock::new(false),
584        }
585    }
586
587    /// Start the store (begin listening for messages)
588    pub async fn start(&self) -> Result<(), TransportError> {
589        *self.running.write().await = true;
590
591        // Send initial hello
592        self.signaling.send_hello(vec![]).await?;
593
594        Ok(())
595    }
596
597    /// Stop the store
598    pub async fn stop(&self) {
599        *self.running.write().await = false;
600    }
601
602    /// Process incoming signaling message
603    pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
604        // When a new peer connects, initialize their HTL config
605        let peer_id = msg.peer_id().to_string();
606        {
607            let mut configs = self.htl_configs.write().await;
608            if !configs.contains_key(&peer_id) {
609                configs.insert(peer_id.clone(), PeerHTLConfig::random());
610            }
611        }
612        self.peer_selector.write().await.add_peer(peer_id);
613
614        self.signaling.handle_message(msg).await
615    }
616
617    /// Get signaling manager reference
618    pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
619        &self.signaling
620    }
621
622    fn response_behavior(&self) -> ResponseBehaviorConfig {
623        self.routing.response_behavior.normalized()
624    }
625
626    fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
627        let mut hasher = DefaultHasher::new();
628        peer_id.hash(&mut hasher);
629        hash.hash(&mut hasher);
630        salt.hash(&mut hasher);
631        let v = hasher.finish();
632        (v as f64) / (u64::MAX as f64)
633    }
634
635    fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
636        Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
637    }
638
639    fn peer_metadata_pointer_slot_hash() -> Hash {
640        hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
641    }
642
643    fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
644        let bytes = hex::decode(hash_hex)
645            .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
646        if bytes.len() != 32 {
647            return Err(StoreError::Other(format!(
648                "Invalid hash length {}, expected 32 bytes",
649                bytes.len()
650            )));
651        }
652        let mut hash = [0u8; 32];
653        hash.copy_from_slice(&bytes);
654        Ok(hash)
655    }
656
657    fn should_drop_response(&self, hash: &Hash) -> bool {
658        let p = self.response_behavior().drop_response_prob;
659        if p <= 0.0 {
660            return false;
661        }
662        self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
663    }
664
665    fn should_corrupt_response(&self, hash: &Hash) -> bool {
666        let p = self.response_behavior().corrupt_response_prob;
667        if p <= 0.0 {
668            return false;
669        }
670        self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
671    }
672
673    fn should_stall_response(&self, hash: &Hash) -> bool {
674        let p = self.response_behavior().stall_response_prob;
675        if p <= 0.0 {
676            return false;
677        }
678        self.deterministic_actor_draw(hash, 0x5A_11_5A_11_5A_11_5A_11) < p
679    }
680
681    fn response_send_delay(&self, hash: &Hash, payload_len: usize) -> Duration {
682        let behavior = self.response_behavior();
683        let mut total_ms = behavior
684            .extra_delay_ms
685            .saturating_add(behavior.first_byte_delay_ms);
686
687        if behavior.bytes_per_second > 0 && payload_len > 0 {
688            let throughput_ms = ((payload_len as u128) * 1000)
689                .div_ceil(behavior.bytes_per_second as u128)
690                .min(u64::MAX as u128) as u64;
691            total_ms = total_ms.saturating_add(throughput_ms);
692        }
693
694        if behavior.stall_delay_ms > 0 && self.should_stall_response(hash) {
695            total_ms = total_ms.saturating_add(behavior.stall_delay_ms);
696        }
697
698        Duration::from_millis(total_ms)
699    }
700
701    async fn ordered_connected_peers(&self, exclude_peer_id: Option<&str>) -> Vec<String> {
702        let current_peer_ids = self.signaling.peer_ids().await;
703        if current_peer_ids.is_empty() {
704            return Vec::new();
705        }
706
707        sync_selector_peers(&self.peer_selector, &current_peer_ids).await;
708        let mut candidate_peer_ids: Vec<String> = current_peer_ids
709            .into_iter()
710            .filter(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude))
711            .collect();
712        if candidate_peer_ids.is_empty() {
713            return Vec::new();
714        }
715
716        let current_set: HashSet<&str> = candidate_peer_ids.iter().map(String::as_str).collect();
717        let mut selector = self.peer_selector.write().await;
718        let mut selector_order = selector.select_peers();
719        selector_order.retain(|peer_id| current_set.contains(peer_id.as_str()));
720        if selector_order.is_empty() {
721            let mut fallback = candidate_peer_ids;
722            fallback.sort();
723            return fallback;
724        }
725        let backed_off: HashMap<String, bool> = candidate_peer_ids
726            .iter()
727            .map(|peer_id| (peer_id.clone(), selector.is_peer_backed_off(peer_id)))
728            .collect();
729        drop(selector);
730
731        let rank: HashMap<&str, usize> = selector_order
732            .iter()
733            .enumerate()
734            .map(|(idx, peer_id)| (peer_id.as_str(), idx))
735            .collect();
736        let active = self.peer_active_requests.read().await;
737        candidate_peer_ids.sort_by(|left, right| {
738            let left_backed_off = backed_off.get(left).copied().unwrap_or(false);
739            let right_backed_off = backed_off.get(right).copied().unwrap_or(false);
740            if left_backed_off != right_backed_off {
741                return if left_backed_off {
742                    std::cmp::Ordering::Greater
743                } else {
744                    std::cmp::Ordering::Less
745                };
746            }
747            let left_rank = rank.get(left.as_str()).copied().unwrap_or(usize::MAX / 2);
748            let right_rank = rank.get(right.as_str()).copied().unwrap_or(usize::MAX / 2);
749            let left_load = active.get(left).copied().unwrap_or(0);
750            let right_load = active.get(right).copied().unwrap_or(0);
751            (left_rank + left_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY))
752                .cmp(&(right_rank + right_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY)))
753                .then_with(|| left.cmp(right))
754        });
755        candidate_peer_ids
756    }
757
758    async fn reserve_peer_request(&self, peer_id: &str) {
759        let mut active = self.peer_active_requests.write().await;
760        *active.entry(peer_id.to_string()).or_insert(0) += 1;
761    }
762
763    async fn release_peer_request(&self, peer_id: &str) {
764        let mut active = self.peer_active_requests.write().await;
765        let Some(count) = active.get_mut(peer_id) else {
766            return;
767        };
768        if *count <= 1 {
769            active.remove(peer_id);
770        } else {
771            *count -= 1;
772        }
773    }
774
775    async fn release_queried_peer_requests(&self, peer_ids: &[String]) {
776        for peer_id in peer_ids {
777            self.release_peer_request(peer_id).await;
778        }
779    }
780
781    fn requested_quote_mint(&self) -> Option<&str> {
782        if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
783            if self.routing.cashu_accepted_mints.is_empty()
784                || self
785                    .routing
786                    .cashu_accepted_mints
787                    .iter()
788                    .any(|mint| mint == default_mint)
789            {
790                return Some(default_mint);
791            }
792        }
793
794        self.routing
795            .cashu_accepted_mints
796            .first()
797            .map(String::as_str)
798    }
799
800    fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
801        if let Some(requested_mint) = requested_mint {
802            if self.accepts_quote_mint(Some(requested_mint)) {
803                return Some(requested_mint.to_string());
804            }
805        }
806        if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
807            return Some(default_mint.clone());
808        }
809        if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
810            return Some(first_mint.clone());
811        }
812        requested_mint.map(str::to_string)
813    }
814
815    fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
816        if self.routing.cashu_accepted_mints.is_empty() {
817            return true;
818        }
819
820        let Some(mint_url) = mint_url else {
821            return false;
822        };
823        self.routing
824            .cashu_accepted_mints
825            .iter()
826            .any(|mint| mint == mint_url)
827    }
828
829    fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
830        let Some(mint_url) = mint_url else {
831            return self.routing.cashu_default_mint.is_none()
832                && self.routing.cashu_accepted_mints.is_empty();
833        };
834        self.routing.cashu_default_mint.as_deref() == Some(mint_url)
835            || self
836                .routing
837                .cashu_accepted_mints
838                .iter()
839                .any(|mint| mint == mint_url)
840    }
841
842    async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
843        let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
844        if base == 0 {
845            return 0;
846        }
847
848        let selector = self.peer_selector.read().await;
849        let Some(stats) = selector.get_stats(peer_id) else {
850            let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
851            return if max_cap > 0 { base.min(max_cap) } else { base };
852        };
853
854        if stats.cashu_payment_defaults > 0
855            && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
856        {
857            return 0;
858        }
859
860        let success_bonus = stats
861            .successes
862            .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
863        let receipt_bonus = stats
864            .cashu_payment_receipts
865            .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
866        let mut cap = base
867            .saturating_add(success_bonus)
868            .saturating_add(receipt_bonus);
869        let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
870        if max_cap > 0 {
871            cap = cap.min(max_cap);
872        }
873        cap
874    }
875
876    async fn should_accept_quote_response(
877        &self,
878        from_peer: &str,
879        preferred_mint_url: Option<&str>,
880        offered_payment_sat: u64,
881        res: &DataQuoteResponse,
882    ) -> bool {
883        let Some(payment_sat) = res.p else {
884            return false;
885        };
886        if payment_sat > offered_payment_sat {
887            return false;
888        }
889
890        let response_mint = res.m.as_deref();
891        if response_mint == preferred_mint_url {
892            return true;
893        }
894        if self.trusts_quote_mint(response_mint) {
895            return true;
896        }
897        if response_mint.is_none() {
898            return false;
899        }
900
901        payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
902    }
903
904    async fn issue_quote(
905        &self,
906        peer_id: &str,
907        hash_key: &str,
908        payment_sat: u64,
909        ttl_ms: u32,
910        mint_url: Option<&str>,
911    ) -> u64 {
912        let quote_id = {
913            let mut next = self.next_quote_id.write().await;
914            let quote_id = *next;
915            *next = next.saturating_add(1);
916            quote_id
917        };
918
919        let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
920        self.issued_quotes.write().await.insert(
921            (peer_id.to_string(), hash_key.to_string(), quote_id),
922            IssuedQuote {
923                expires_at,
924                payment_sat,
925                mint_url: mint_url.map(str::to_string),
926            },
927        );
928        quote_id
929    }
930
931    async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
932        let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
933        let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
934            return false;
935        };
936        quote.expires_at > Instant::now()
937    }
938
939    async fn send_request_to_peer(
940        &self,
941        peer_id: &str,
942        hash: &Hash,
943        request_htl: u8,
944        quote_id: Option<u64>,
945    ) -> bool {
946        let channel = match self.signaling.get_channel(peer_id).await {
947            Some(c) => c,
948            None => return false,
949        };
950
951        let htl_config = {
952            let configs = self.htl_configs.read().await;
953            configs
954                .get(peer_id)
955                .cloned()
956                .unwrap_or_else(PeerHTLConfig::random)
957        };
958
959        let send_htl = htl_config.decrement(request_htl);
960        let req = match quote_id {
961            Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
962            None => create_request(hash, send_htl),
963        };
964        let request_bytes = encode_request(&req);
965
966        {
967            let mut selector = self.peer_selector.write().await;
968            selector.record_request(peer_id, request_bytes.len() as u64);
969        }
970
971        match channel.send(request_bytes).await {
972            Ok(()) => true,
973            Err(_) => {
974                self.peer_selector.write().await.record_failure(peer_id);
975                false
976            }
977        }
978    }
979
980    async fn send_quote_request_to_peer(
981        &self,
982        peer_id: &str,
983        hash: &Hash,
984        payment_sat: u64,
985        ttl_ms: u32,
986        mint_url: Option<&str>,
987    ) -> bool {
988        let channel = match self.signaling.get_channel(peer_id).await {
989            Some(c) => c,
990            None => return false,
991        };
992
993        let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
994        let request_bytes = encode_quote_request(&req);
995
996        match channel.send(request_bytes).await {
997            Ok(()) => true,
998            Err(_) => false,
999        }
1000    }
1001
1002    pub async fn set_read_sources(&self, sources: Vec<Arc<dyn MeshReadSource>>) {
1003        let mut by_id = HashMap::new();
1004        let mut stats = self.read_source_stats.write().await;
1005        for source in sources {
1006            let source_id = source.id().to_string();
1007            by_id.insert(source_id.clone(), source);
1008            stats
1009                .entry(source_id)
1010                .or_insert_with(AdaptiveSourceStats::default);
1011        }
1012        *self.read_sources.write().await = by_id;
1013    }
1014
1015    fn route_stats_for<'a>(
1016        stats: &'a mut HashMap<String, AdaptiveSourceStats>,
1017        route_id: &str,
1018    ) -> &'a mut AdaptiveSourceStats {
1019        stats
1020            .entry(route_id.to_string())
1021            .or_insert_with(AdaptiveSourceStats::default)
1022    }
1023
1024    async fn record_route_request(&self, route_id: &str) {
1025        let mut stats = self.read_route_stats.write().await;
1026        Self::route_stats_for(&mut stats, route_id).requests += 1;
1027    }
1028
1029    async fn record_route_success(&self, route_id: &str, elapsed_ms: u64) {
1030        let now = Instant::now();
1031        let mut stats = self.read_route_stats.write().await;
1032        let stats = Self::route_stats_for(&mut stats, route_id);
1033        stats.successes += 1;
1034        stats.last_success_at = Some(now);
1035        stats.backoff_level = 0;
1036        stats.backed_off_until = None;
1037        if stats.srtt_ms <= 0.0 {
1038            stats.srtt_ms = elapsed_ms as f64;
1039            stats.rttvar_ms = elapsed_ms as f64 / 2.0;
1040            return;
1041        }
1042        let elapsed = elapsed_ms as f64;
1043        stats.rttvar_ms = 0.75 * stats.rttvar_ms + 0.25 * (stats.srtt_ms - elapsed).abs();
1044        stats.srtt_ms = 0.875 * stats.srtt_ms + 0.125 * elapsed;
1045    }
1046
1047    async fn record_route_miss(&self, route_id: &str) {
1048        let mut stats = self.read_route_stats.write().await;
1049        Self::route_stats_for(&mut stats, route_id).misses += 1;
1050    }
1051
1052    async fn record_route_timeout(&self, route_id: &str) {
1053        let mut stats = self.read_route_stats.write().await;
1054        Self::route_stats_for(&mut stats, route_id).timeouts += 1;
1055    }
1056
1057    async fn record_read_source_request(&self, source_id: &str) {
1058        let mut stats = self.read_source_stats.write().await;
1059        stats
1060            .entry(source_id.to_string())
1061            .or_insert_with(AdaptiveSourceStats::default)
1062            .requests += 1;
1063    }
1064
1065    async fn record_read_source_miss(&self, source_id: &str) {
1066        let mut stats = self.read_source_stats.write().await;
1067        stats
1068            .entry(source_id.to_string())
1069            .or_insert_with(AdaptiveSourceStats::default)
1070            .misses += 1;
1071    }
1072
1073    async fn record_read_source_success(&self, source_id: &str, elapsed_ms: u64) {
1074        let now = Instant::now();
1075        let mut stats = self.read_source_stats.write().await;
1076        let stats = stats
1077            .entry(source_id.to_string())
1078            .or_insert_with(AdaptiveSourceStats::default);
1079        stats.successes += 1;
1080        stats.last_success_at = Some(now);
1081        stats.backoff_level = 0;
1082        stats.backed_off_until = None;
1083        if stats.srtt_ms <= 0.0 {
1084            stats.srtt_ms = elapsed_ms as f64;
1085            stats.rttvar_ms = elapsed_ms as f64 / 2.0;
1086            return;
1087        }
1088        let elapsed = elapsed_ms as f64;
1089        stats.rttvar_ms = 0.75 * stats.rttvar_ms + 0.25 * (stats.srtt_ms - elapsed).abs();
1090        stats.srtt_ms = 0.875 * stats.srtt_ms + 0.125 * elapsed;
1091    }
1092
1093    async fn record_read_source_failure(&self, source_id: &str) {
1094        let now = Instant::now();
1095        let mut stats = self.read_source_stats.write().await;
1096        let stats = stats
1097            .entry(source_id.to_string())
1098            .or_insert_with(AdaptiveSourceStats::default);
1099        stats.failures += 1;
1100        stats.last_failure_at = Some(now);
1101        Self::apply_source_backoff(stats, now);
1102    }
1103
1104    async fn record_read_source_timeout(&self, source_id: &str) {
1105        let now = Instant::now();
1106        let mut stats = self.read_source_stats.write().await;
1107        let stats = stats
1108            .entry(source_id.to_string())
1109            .or_insert_with(AdaptiveSourceStats::default);
1110        stats.timeouts += 1;
1111        stats.last_failure_at = Some(now);
1112        Self::apply_source_backoff(stats, now);
1113    }
1114
1115    fn apply_source_backoff(stats: &mut AdaptiveSourceStats, now: Instant) {
1116        stats.backoff_level = stats.backoff_level.saturating_add(1);
1117        let backoff_ms = (INITIAL_SOURCE_BACKOFF_MS
1118            .saturating_mul(2u64.saturating_pow(stats.backoff_level.saturating_sub(1))))
1119        .min(MAX_SOURCE_BACKOFF_MS);
1120        stats.backed_off_until = Some(now + Duration::from_millis(backoff_ms));
1121    }
1122
1123    async fn ordered_read_sources(&self) -> Vec<Arc<dyn MeshReadSource>> {
1124        let sources = self.read_sources.read().await;
1125        if sources.is_empty() {
1126            return Vec::new();
1127        }
1128
1129        let mut available: Vec<Arc<dyn MeshReadSource>> = sources
1130            .values()
1131            .filter(|source| source.is_available())
1132            .cloned()
1133            .collect();
1134        if available.is_empty() {
1135            return Vec::new();
1136        }
1137
1138        let now = Instant::now();
1139        let stats = self.read_source_stats.read().await;
1140        let mut healthy: Vec<Arc<dyn MeshReadSource>> = available
1141            .iter()
1142            .filter(|source| {
1143                stats
1144                    .get(source.id())
1145                    .and_then(|s| s.backed_off_until)
1146                    .is_none_or(|until| until <= now)
1147            })
1148            .cloned()
1149            .collect();
1150        if !healthy.is_empty() {
1151            available = std::mem::take(&mut healthy);
1152        }
1153
1154        available.sort_by(|left, right| {
1155            let left_stats = stats.get(left.id()).cloned().unwrap_or_default();
1156            let right_stats = stats.get(right.id()).cloned().unwrap_or_default();
1157            adaptive_source_score(&right_stats, now)
1158                .partial_cmp(&adaptive_source_score(&left_stats, now))
1159                .unwrap_or(std::cmp::Ordering::Equal)
1160                .then_with(|| left.id().cmp(right.id()))
1161        });
1162        available
1163    }
1164
1165    async fn should_probe_multiple_read_sources(
1166        &self,
1167        ordered_sources: &[Arc<dyn MeshReadSource>],
1168    ) -> bool {
1169        if ordered_sources.len() <= 1 {
1170            return false;
1171        }
1172        let stats = self.read_source_stats.read().await;
1173        let best = stats
1174            .get(ordered_sources[0].id())
1175            .cloned()
1176            .unwrap_or_default();
1177        let second = stats
1178            .get(ordered_sources[1].id())
1179            .cloned()
1180            .unwrap_or_default();
1181        if !source_has_history(&best) || !source_has_history(&second) {
1182            return true;
1183        }
1184        let now = Instant::now();
1185        adaptive_source_score(&best, now) - adaptive_source_score(&second, now)
1186            < SOURCE_SCORE_TIE_DELTA
1187    }
1188
1189    async fn source_dispatch_for(&self, source_count: usize) -> RequestDispatchConfig {
1190        if source_count == 0 {
1191            return self.routing.dispatch;
1192        }
1193        let ordered_sources = self.ordered_read_sources().await;
1194        let probe_multiple = self
1195            .should_probe_multiple_read_sources(&ordered_sources)
1196            .await;
1197        RequestDispatchConfig {
1198            initial_fanout: if probe_multiple {
1199                source_count.min(2)
1200            } else {
1201                1
1202            },
1203            hedge_fanout: self.routing.dispatch.hedge_fanout,
1204            max_fanout: self.routing.dispatch.max_fanout.min(source_count),
1205            hedge_interval_ms: self.routing.dispatch.hedge_interval_ms,
1206        }
1207    }
1208
1209    /// Get peer count
1210    pub async fn peer_count(&self) -> usize {
1211        self.signaling.peer_count().await
1212    }
1213
1214    /// Check if we need more peers
1215    pub async fn needs_peers(&self) -> bool {
1216        self.signaling.needs_peers().await
1217    }
1218
1219    /// Re-broadcast hello to refresh discovery as topology changes.
1220    pub async fn send_hello(&self) -> Result<(), TransportError> {
1221        self.signaling.send_hello(vec![]).await
1222    }
1223
1224    /// Drain all currently available peer-link messages and handle them.
1225    ///
1226    /// This keeps the message pump logic shared between simulation and the
1227    /// default production wrapper instead of duplicating per-channel loops.
1228    pub async fn drain_available_data_messages(self: &Arc<Self>) -> DataPumpStats {
1229        let mut stats = DataPumpStats::default();
1230        let peer_ids = self.signaling.peer_ids().await;
1231        for peer_id in peer_ids {
1232            let Some(channel) = self.signaling.get_channel(&peer_id).await else {
1233                continue;
1234            };
1235
1236            while let Some(data) = channel.try_recv() {
1237                stats.processed += 1;
1238                stats.processed_bytes += data.len() as u64;
1239                if let Some(msg) = parse_message(&data) {
1240                    match msg {
1241                        DataMessage::Request(_) => stats.request_messages += 1,
1242                        DataMessage::Response(_) => stats.response_messages += 1,
1243                        DataMessage::QuoteRequest(_) => stats.quote_request_messages += 1,
1244                        DataMessage::QuoteResponse(_) => stats.quote_response_messages += 1,
1245                        DataMessage::Payment(_)
1246                        | DataMessage::PaymentAck(_)
1247                        | DataMessage::Chunk(_) => {}
1248                    }
1249                }
1250                self.handle_data_message(&peer_id, &data).await;
1251            }
1252        }
1253        stats
1254    }
1255
1256    /// Apply an out-of-band payment credit to a peer's routing priority.
1257    pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
1258        self.peer_selector
1259            .write()
1260            .await
1261            .record_cashu_payment(peer_id, amount_sat);
1262    }
1263
1264    /// Record a post-delivery payment we received from a peer.
1265    pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
1266        self.peer_selector
1267            .write()
1268            .await
1269            .record_cashu_receipt(peer_id, amount_sat);
1270    }
1271
1272    /// Record that a peer failed to pay after we delivered successfully.
1273    pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
1274        self.peer_selector
1275            .write()
1276            .await
1277            .record_cashu_payment_default(peer_id);
1278    }
1279
1280    /// Snapshot routing/selection summary for inspection/debugging.
1281    pub async fn selector_summary(&self) -> crate::peer_selector::SelectorSummary {
1282        self.peer_selector.read().await.summary()
1283    }
1284
1285    fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
1286        selector.is_peer_blocked_for_payment_defaults(
1287            peer_id,
1288            self.routing.cashu_payment_default_block_threshold,
1289        )
1290    }
1291
1292    /// Export live peer metadata for inspection/debugging.
1293    pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
1294        self.peer_selector
1295            .read()
1296            .await
1297            .export_peer_metadata_snapshot()
1298    }
1299
1300    /// Snapshot current peer metadata and persist it into `local_store`.
1301    ///
1302    /// Uses content-addressed storage for the snapshot body and a reserved
1303    /// mutable pointer slot for the "latest snapshot hash".
1304    pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
1305        let snapshot = self
1306            .peer_selector
1307            .read()
1308            .await
1309            .export_peer_metadata_snapshot();
1310        let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
1311            StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
1312        })?;
1313        let snapshot_hash = hashtree_core::sha256(&bytes);
1314        let _ = self.local_store.put(snapshot_hash, bytes).await?;
1315
1316        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
1317        let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
1318        let _ = self.local_store.delete(&pointer_slot).await?;
1319        let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
1320
1321        Ok(snapshot_hash)
1322    }
1323
1324    /// Load persisted peer metadata from `local_store` if available.
1325    pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
1326        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
1327        let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
1328            return Ok(false);
1329        };
1330        let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
1331            StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
1332        })?;
1333        let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
1334
1335        let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
1336            return Ok(false);
1337        };
1338        let snapshot: PeerMetadataSnapshot =
1339            serde_json::from_slice(&snapshot_bytes).map_err(|e| {
1340                StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
1341            })?;
1342        self.peer_selector
1343            .write()
1344            .await
1345            .import_peer_metadata_snapshot(&snapshot);
1346        Ok(true)
1347    }
1348
1349    /// Request data from peers after negotiating a paid quote.
1350    ///
1351    /// If quote negotiation fails or the quoted peer does not deliver, the store
1352    /// falls back to the normal unpaid retrieval path to preserve liveness.
1353    pub async fn get_with_quote(
1354        &self,
1355        hash: &Hash,
1356        payment_sat: u64,
1357        quote_ttl: Duration,
1358    ) -> Result<Option<Vec<u8>>, StoreError> {
1359        if let Some(data) = self.local_store.get(hash).await? {
1360            return Ok(Some(data));
1361        }
1362        Ok(self
1363            .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
1364            .await)
1365    }
1366
1367    async fn request_from_peers_with_quote(
1368        &self,
1369        hash: &Hash,
1370        payment_sat: u64,
1371        quote_ttl: Duration,
1372    ) -> Option<Vec<u8>> {
1373        let ordered_peer_ids = self.ordered_connected_peers(None).await;
1374        if ordered_peer_ids.is_empty() {
1375            return None;
1376        }
1377
1378        if let Some(quote) = self
1379            .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
1380            .await
1381        {
1382            if let Some(data) = self
1383                .request_from_single_peer(hash, &quote.peer_id, MAX_HTL, Some(quote.quote_id))
1384                .await
1385            {
1386                return Some(data);
1387            }
1388        }
1389
1390        self.request_from_mesh(hash).await
1391    }
1392
1393    async fn request_quote_from_peers(
1394        &self,
1395        hash: &Hash,
1396        payment_sat: u64,
1397        quote_ttl: Duration,
1398        ordered_peer_ids: &[String],
1399    ) -> Option<NegotiatedQuote> {
1400        if ordered_peer_ids.is_empty() {
1401            return None;
1402        }
1403        let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
1404        if ttl_ms == 0 {
1405            return None;
1406        }
1407        let requested_mint = self.requested_quote_mint().map(str::to_string);
1408
1409        let hash_key = hash_to_key(hash);
1410        let (tx, rx) = oneshot::channel();
1411        self.pending_quotes.write().await.insert(
1412            hash_key.clone(),
1413            PendingQuoteRequest {
1414                response_tx: tx,
1415                preferred_mint_url: requested_mint.clone(),
1416                offered_payment_sat: payment_sat,
1417            },
1418        );
1419
1420        let rx = Arc::new(Mutex::new(rx));
1421        let result = run_hedged_waves(
1422            ordered_peer_ids.len(),
1423            self.routing.dispatch,
1424            self.request_timeout,
1425            |range| {
1426                let wave_peer_ids = ordered_peer_ids[range].to_vec();
1427                let requested_mint = requested_mint.clone();
1428                let hash = *hash;
1429                async move {
1430                    let mut sent = 0usize;
1431                    for peer_id in wave_peer_ids {
1432                        if self
1433                            .send_quote_request_to_peer(
1434                                &peer_id,
1435                                &hash,
1436                                payment_sat,
1437                                ttl_ms,
1438                                requested_mint.as_deref(),
1439                            )
1440                            .await
1441                        {
1442                            sent += 1;
1443                        }
1444                    }
1445                    sent
1446                }
1447            },
1448            |wait| {
1449                let rx = rx.clone();
1450                async move {
1451                    let mut rx = rx.lock().await;
1452                    match tokio::time::timeout(wait, &mut *rx).await {
1453                        Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
1454                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1455                        Err(_) => HedgedWaveAction::Continue,
1456                    }
1457                }
1458            },
1459        )
1460        .await;
1461        let _ = self.pending_quotes.write().await.remove(&hash_key);
1462        result
1463    }
1464
1465    async fn request_from_single_peer(
1466        &self,
1467        hash: &Hash,
1468        peer_id: &str,
1469        request_htl: u8,
1470        quote_id: Option<u64>,
1471    ) -> Option<Vec<u8>> {
1472        let hash_key = hash_to_key(hash);
1473        let (tx, rx) = oneshot::channel();
1474        self.pending_requests.write().await.insert(
1475            hash_key.clone(),
1476            PendingRequest {
1477                response_tx: tx,
1478                started_at: Instant::now(),
1479                queried_peers: vec![peer_id.to_string()],
1480            },
1481        );
1482
1483        let mut rx = rx;
1484        if !self
1485            .send_request_to_peer(peer_id, hash, request_htl, quote_id)
1486            .await
1487        {
1488            let _ = self.pending_requests.write().await.remove(&hash_key);
1489            return None;
1490        }
1491        self.reserve_peer_request(peer_id).await;
1492
1493        if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
1494            if hashtree_core::sha256(&data) == *hash {
1495                let _ = self.local_store.put(*hash, data.clone()).await;
1496                return Some(data);
1497            }
1498        }
1499
1500        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1501            self.release_queried_peer_requests(&pending.queried_peers)
1502                .await;
1503            for peer_id in pending.queried_peers {
1504                self.peer_selector.write().await.record_timeout(&peer_id);
1505            }
1506        }
1507        let _ = self.take_forward_requesters(&hash_key).await;
1508        None
1509    }
1510
1511    async fn request_from_ordered_peers(
1512        &self,
1513        hash: &Hash,
1514        ordered_peer_ids: &[String],
1515        request_htl: u8,
1516    ) -> RouteFetchOutcome {
1517        let hash_key = hash_to_key(hash);
1518        let (tx, rx) = oneshot::channel();
1519        self.pending_requests.write().await.insert(
1520            hash_key.clone(),
1521            PendingRequest {
1522                response_tx: tx,
1523                started_at: Instant::now(),
1524                queried_peers: Vec::new(),
1525            },
1526        );
1527
1528        let rx = Arc::new(Mutex::new(rx));
1529        let result = run_hedged_waves(
1530            ordered_peer_ids.len(),
1531            self.routing.dispatch,
1532            self.request_timeout,
1533            |range| {
1534                let wave_peer_ids = ordered_peer_ids[range].to_vec();
1535                let hash = *hash;
1536                let hash_key = hash_key.clone();
1537                async move {
1538                    let mut sent = 0usize;
1539                    for peer_id in wave_peer_ids {
1540                        if self
1541                            .send_request_to_peer(&peer_id, &hash, request_htl, None)
1542                            .await
1543                        {
1544                            sent += 1;
1545                            self.reserve_peer_request(&peer_id).await;
1546                            if let Some(pending) =
1547                                self.pending_requests.write().await.get_mut(&hash_key)
1548                            {
1549                                pending.queried_peers.push(peer_id);
1550                            }
1551                        }
1552                    }
1553                    sent
1554                }
1555            },
1556            |wait| {
1557                let rx = rx.clone();
1558                async move {
1559                    let mut rx = rx.lock().await;
1560                    match tokio::time::timeout(wait, &mut *rx).await {
1561                        Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == *hash => {
1562                            HedgedWaveAction::Success(data)
1563                        }
1564                        Ok(Ok(Some(_))) => HedgedWaveAction::Continue,
1565                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1566                        Err(_) => HedgedWaveAction::Continue,
1567                    }
1568                }
1569            },
1570        )
1571        .await;
1572
1573        let Some(data) = result else {
1574            if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1575                self.release_queried_peer_requests(&pending.queried_peers)
1576                    .await;
1577                for peer_id in pending.queried_peers {
1578                    self.peer_selector.write().await.record_timeout(&peer_id);
1579                }
1580            }
1581            let _ = self.take_forward_requesters(&hash_key).await;
1582            return RouteFetchOutcome::Timeout;
1583        };
1584
1585        let _ = self.local_store.put(*hash, data.clone()).await;
1586        RouteFetchOutcome::Hit(data)
1587    }
1588
1589    async fn request_from_read_sources_inner(&self, hash: &Hash) -> RouteFetchOutcome {
1590        let ordered_sources = self.ordered_read_sources().await;
1591        if ordered_sources.is_empty() {
1592            return RouteFetchOutcome::Miss;
1593        }
1594
1595        let dispatch = normalize_dispatch_config(
1596            self.source_dispatch_for(ordered_sources.len()).await,
1597            ordered_sources.len(),
1598        );
1599        let wave_plan = build_hedged_wave_plan(ordered_sources.len(), dispatch);
1600        if wave_plan.is_empty() {
1601            return RouteFetchOutcome::Miss;
1602        }
1603
1604        let deadline = Instant::now() + self.request_timeout;
1605        let mut pending = FuturesUnordered::new();
1606        let mut pending_source_ids = HashSet::new();
1607        let mut saw_timeout = false;
1608        let mut next_source_idx = 0usize;
1609
1610        for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
1611            let from = next_source_idx;
1612            let to = (next_source_idx + wave_size).min(ordered_sources.len());
1613            next_source_idx = to;
1614
1615            for source in ordered_sources[from..to].iter().cloned() {
1616                let source_id = source.id().to_string();
1617                self.record_read_source_request(&source_id).await;
1618                pending_source_ids.insert(source_id.clone());
1619                let hash = *hash;
1620                pending.push(tokio::spawn(async move {
1621                    let started_at = Instant::now();
1622                    let result = std::panic::AssertUnwindSafe(source.get(&hash))
1623                        .catch_unwind()
1624                        .await;
1625                    match result {
1626                        Ok(Some(data)) => SourceFetchOutcome::Hit {
1627                            source_id,
1628                            data,
1629                            elapsed_ms: started_at.elapsed().as_millis().max(1) as u64,
1630                        },
1631                        Ok(None) => SourceFetchOutcome::Miss { source_id },
1632                        Err(_) => SourceFetchOutcome::Failure { source_id },
1633                    }
1634                }));
1635            }
1636
1637            let is_last_wave =
1638                wave_idx + 1 == wave_plan.len() || next_source_idx >= ordered_sources.len();
1639            let window_end = if is_last_wave {
1640                deadline
1641            } else {
1642                (Instant::now() + Duration::from_millis(dispatch.hedge_interval_ms)).min(deadline)
1643            };
1644
1645            while Instant::now() < window_end {
1646                let remaining = window_end.saturating_duration_since(Instant::now());
1647                let Some(result) = tokio::time::timeout(remaining, pending.next())
1648                    .await
1649                    .ok()
1650                    .flatten()
1651                else {
1652                    break;
1653                };
1654                let Ok(outcome) = result else {
1655                    continue;
1656                };
1657                match outcome {
1658                    SourceFetchOutcome::Hit {
1659                        source_id,
1660                        data,
1661                        elapsed_ms,
1662                    } => {
1663                        pending_source_ids.remove(&source_id);
1664                        self.record_read_source_success(&source_id, elapsed_ms)
1665                            .await;
1666                        return RouteFetchOutcome::Hit(data);
1667                    }
1668                    SourceFetchOutcome::Miss { source_id } => {
1669                        pending_source_ids.remove(&source_id);
1670                        self.record_read_source_miss(&source_id).await;
1671                    }
1672                    SourceFetchOutcome::Failure { source_id } => {
1673                        pending_source_ids.remove(&source_id);
1674                        self.record_read_source_failure(&source_id).await;
1675                    }
1676                }
1677            }
1678
1679            if Instant::now() >= deadline {
1680                break;
1681            }
1682        }
1683
1684        for source_id in pending_source_ids {
1685            saw_timeout = true;
1686            self.record_read_source_timeout(&source_id).await;
1687        }
1688        if saw_timeout {
1689            RouteFetchOutcome::Timeout
1690        } else {
1691            RouteFetchOutcome::Miss
1692        }
1693    }
1694
1695    async fn request_from_read_sources(&self, hash: &Hash) -> RouteFetchOutcome {
1696        let hash_key = hash_to_key(hash);
1697        let existing_wait = {
1698            let mut inflight = self.inflight_source_fetches.lock().await;
1699            if let Some(existing) = inflight.get_mut(&hash_key) {
1700                let (tx, rx) = oneshot::channel();
1701                existing.waiters.push(tx);
1702                Some(rx)
1703            } else {
1704                inflight.insert(
1705                    hash_key.clone(),
1706                    InflightSourceFetch {
1707                        waiters: Vec::new(),
1708                    },
1709                );
1710                None
1711            }
1712        };
1713
1714        if let Some(wait) = existing_wait {
1715            return wait.await.unwrap_or(RouteFetchOutcome::Timeout);
1716        }
1717
1718        let result = self.request_from_read_sources_inner(hash).await;
1719        if let RouteFetchOutcome::Hit(hit) = &result {
1720            let _ = self.local_store.put(*hash, hit.clone()).await;
1721        }
1722
1723        let waiters = self
1724            .inflight_source_fetches
1725            .lock()
1726            .await
1727            .remove(&hash_key)
1728            .map(|inflight| inflight.waiters)
1729            .unwrap_or_default();
1730        for waiter in waiters {
1731            let _ = waiter.send(result.clone());
1732        }
1733
1734        result
1735    }
1736
1737    async fn available_read_routes(&self, context: &MeshReadContext) -> Vec<ReadRoute> {
1738        let mut routes = Vec::new();
1739        let ordered_peers = self
1740            .ordered_connected_peers(context.exclude_peer_id.as_deref())
1741            .await;
1742        if !ordered_peers.is_empty() {
1743            routes.push(ReadRoute::Peers(ordered_peers));
1744        }
1745        if !self.ordered_read_sources().await.is_empty() {
1746            routes.push(ReadRoute::Sources);
1747        }
1748        if routes.len() <= 1 {
1749            return routes;
1750        }
1751
1752        let now = Instant::now();
1753        let stats = self.read_route_stats.read().await;
1754        routes.sort_by(|left, right| {
1755            let left_stats = stats.get(left.id()).cloned().unwrap_or_default();
1756            let right_stats = stats.get(right.id()).cloned().unwrap_or_default();
1757            adaptive_source_score(&right_stats, now)
1758                .partial_cmp(&adaptive_source_score(&left_stats, now))
1759                .unwrap_or(std::cmp::Ordering::Equal)
1760                .then_with(|| left.id().cmp(right.id()))
1761        });
1762        routes
1763    }
1764
1765    async fn should_probe_multiple_routes(&self, routes: &[ReadRoute]) -> bool {
1766        if routes.len() <= 1 {
1767            return false;
1768        }
1769        let stats = self.read_route_stats.read().await;
1770        let best = stats.get(routes[0].id()).cloned().unwrap_or_default();
1771        let second = stats.get(routes[1].id()).cloned().unwrap_or_default();
1772        if !source_has_history(&best) || !source_has_history(&second) {
1773            return true;
1774        }
1775        let now = Instant::now();
1776        adaptive_source_score(&best, now) - adaptive_source_score(&second, now)
1777            < SOURCE_SCORE_TIE_DELTA
1778    }
1779
1780    async fn run_read_route(
1781        &self,
1782        hash: &Hash,
1783        route: &ReadRoute,
1784        context: &MeshReadContext,
1785    ) -> RouteFetchOutcome {
1786        let route_id = route.id();
1787        self.record_route_request(route_id).await;
1788        let started_at = Instant::now();
1789        let result = match route {
1790            ReadRoute::Peers(peer_ids) => {
1791                self.request_from_ordered_peers(hash, peer_ids, context.request_htl)
1792                    .await
1793            }
1794            ReadRoute::Sources => self.request_from_read_sources(hash).await,
1795        };
1796        match &result {
1797            RouteFetchOutcome::Hit(_) => {
1798                self.record_route_success(route_id, started_at.elapsed().as_millis().max(1) as u64)
1799                    .await;
1800            }
1801            RouteFetchOutcome::Miss => {
1802                self.record_route_miss(route_id).await;
1803            }
1804            RouteFetchOutcome::Timeout => {
1805                self.record_route_timeout(route_id).await;
1806            }
1807        }
1808        result
1809    }
1810
1811    async fn request_from_mesh_with_context(
1812        &self,
1813        hash: &Hash,
1814        context: &MeshReadContext,
1815    ) -> Option<Vec<u8>> {
1816        let routes = self.available_read_routes(context).await;
1817        match routes.as_slice() {
1818            [] => None,
1819            [route] => match self.run_read_route(hash, route, context).await {
1820                RouteFetchOutcome::Hit(data) => Some(data),
1821                RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
1822            },
1823            [first, second, ..] => {
1824                if self.should_probe_multiple_routes(&routes).await {
1825                    let first_fut = self.run_read_route(hash, first, context);
1826                    let second_fut = self.run_read_route(hash, second, context);
1827                    tokio::pin!(first_fut);
1828                    tokio::pin!(second_fut);
1829                    let mut first_done = false;
1830                    let mut second_done = false;
1831                    loop {
1832                        tokio::select! {
1833                            result = &mut first_fut, if !first_done => {
1834                                first_done = true;
1835                                if let RouteFetchOutcome::Hit(data) = result {
1836                                    return Some(data);
1837                                }
1838                            }
1839                            result = &mut second_fut, if !second_done => {
1840                                second_done = true;
1841                                if let RouteFetchOutcome::Hit(data) = result {
1842                                    return Some(data);
1843                                }
1844                            }
1845                            else => break,
1846                        }
1847                        if first_done && second_done {
1848                            break;
1849                        }
1850                    }
1851                    None
1852                } else {
1853                    match self.run_read_route(hash, first, context).await {
1854                        RouteFetchOutcome::Hit(data) => return Some(data),
1855                        RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
1856                    }
1857                    match self.run_read_route(hash, second, context).await {
1858                        RouteFetchOutcome::Hit(data) => Some(data),
1859                        RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
1860                    }
1861                }
1862            }
1863        }
1864    }
1865
1866    async fn request_from_mesh(&self, hash: &Hash) -> Option<Vec<u8>> {
1867        self.request_from_mesh_with_context(hash, &MeshReadContext::default())
1868            .await
1869    }
1870
1871    async fn begin_forward_request(&self, hash_key: &str, requester_id: &str) -> bool {
1872        let mut pending = self.pending_forward_requests.write().await;
1873        if let Some(existing) = pending.get_mut(hash_key) {
1874            existing.requester_ids.insert(requester_id.to_string());
1875            return false;
1876        }
1877
1878        let mut requester_ids = HashSet::new();
1879        requester_ids.insert(requester_id.to_string());
1880        pending.insert(
1881            hash_key.to_string(),
1882            PendingForwardRequest { requester_ids },
1883        );
1884        true
1885    }
1886
1887    async fn take_forward_requesters(&self, hash_key: &str) -> Vec<String> {
1888        self.pending_forward_requests
1889            .write()
1890            .await
1891            .remove(hash_key)
1892            .map(|pending| pending.requester_ids.into_iter().collect())
1893            .unwrap_or_default()
1894    }
1895
1896    async fn complete_pending_response(
1897        &self,
1898        from_peer: &str,
1899        hash: &Hash,
1900        hash_key: String,
1901        payload: Vec<u8>,
1902    ) {
1903        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1904            self.release_queried_peer_requests(&pending.queried_peers)
1905                .await;
1906            let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
1907            self.peer_selector.write().await.record_success(
1908                from_peer,
1909                rtt_ms,
1910                payload.len() as u64,
1911            );
1912            let forward_requesters = self.take_forward_requesters(&hash_key).await;
1913            let response_bytes = if forward_requesters.is_empty() {
1914                None
1915            } else {
1916                Some(encode_response(&create_response(hash, payload.clone())))
1917            };
1918            let _ = pending.response_tx.send(Some(payload));
1919            if let Some(response_bytes) = response_bytes {
1920                for requester_id in forward_requesters {
1921                    if let Some(channel) = self.signaling.get_channel(&requester_id).await {
1922                        let _ = channel.send(response_bytes.clone()).await;
1923                    }
1924                }
1925            }
1926        }
1927    }
1928
1929    async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
1930        if !res.a {
1931            return;
1932        }
1933
1934        let Some(quote_id) = res.q else {
1935            return;
1936        };
1937
1938        let hash_key = hash_to_key(&res.h);
1939        let (preferred_mint_url, offered_payment_sat) = {
1940            let pending_quotes = self.pending_quotes.read().await;
1941            let Some(pending) = pending_quotes.get(&hash_key) else {
1942                return;
1943            };
1944            (
1945                pending.preferred_mint_url.clone(),
1946                pending.offered_payment_sat,
1947            )
1948        };
1949        if !self
1950            .should_accept_quote_response(
1951                from_peer,
1952                preferred_mint_url.as_deref(),
1953                offered_payment_sat,
1954                &res,
1955            )
1956            .await
1957        {
1958            return;
1959        }
1960        let mut pending_quotes = self.pending_quotes.write().await;
1961        if let Some(pending) = pending_quotes.remove(&hash_key) {
1962            let _ = pending.response_tx.send(Some(NegotiatedQuote {
1963                peer_id: from_peer.to_string(),
1964                quote_id,
1965                mint_url: res.m,
1966            }));
1967        }
1968    }
1969
1970    async fn handle_response_message(&self, from_peer: &str, res: crate::protocol::DataResponse) {
1971        let hash_key = hash_to_key(&res.h);
1972        let hash = match crate::protocol::bytes_to_hash(&res.h) {
1973            Some(h) => h,
1974            None => return,
1975        };
1976
1977        // Ignore malformed/corrupt payload and keep waiting for a valid response.
1978        if hashtree_core::sha256(&res.d) != hash {
1979            self.peer_selector.write().await.record_failure(from_peer);
1980            if self.debug {
1981                println!("[MeshStoreCore] Ignoring invalid response payload for {hash_key}");
1982            }
1983            return;
1984        }
1985
1986        self.complete_pending_response(from_peer, &hash, hash_key, res.d)
1987            .await;
1988    }
1989
1990    async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
1991        let hash = match crate::protocol::bytes_to_hash(&req.h) {
1992            Some(h) => h,
1993            None => return,
1994        };
1995        let hash_key = hash_to_key(&hash);
1996
1997        {
1998            let selector = self.peer_selector.read().await;
1999            if self.should_refuse_requests_from_peer(&selector, from_peer) {
2000                if self.debug {
2001                    println!(
2002                        "[MeshStoreCore] Refusing quote request from delinquent peer {}",
2003                        from_peer
2004                    );
2005                }
2006                return;
2007            }
2008        }
2009
2010        let chosen_mint = self.choose_quote_mint(req.m.as_deref());
2011        let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
2012            && !self.should_drop_response(&hash)
2013            && !self.should_corrupt_response(&hash);
2014
2015        let res = if can_serve {
2016            let quote_id = self
2017                .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
2018                .await;
2019            create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
2020        } else {
2021            create_quote_response_unavailable(&hash)
2022        };
2023        let response_bytes = encode_quote_response(&res);
2024        if let Some(channel) = self.signaling.get_channel(from_peer).await {
2025            let _ = channel.send(response_bytes).await;
2026        }
2027    }
2028
2029    async fn handle_request_message(
2030        self: &Arc<Self>,
2031        from_peer: &str,
2032        req: crate::protocol::DataRequest,
2033    ) {
2034        let hash = match crate::protocol::bytes_to_hash(&req.h) {
2035            Some(h) => h,
2036            None => return,
2037        };
2038        let hash_key = hash_to_key(&hash);
2039
2040        if let Some(quote_id) = req.q {
2041            if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
2042                if self.debug {
2043                    println!(
2044                        "[MeshStoreCore] Refusing request with invalid or expired quote {} from {}",
2045                        quote_id, from_peer
2046                    );
2047                }
2048                return;
2049            }
2050        }
2051
2052        let allow_peer_forwarding = {
2053            let selector = self.peer_selector.read().await;
2054            !self.should_refuse_requests_from_peer(&selector, from_peer)
2055        };
2056
2057        // Check local store
2058        if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
2059            if self.should_drop_response(&hash) {
2060                if self.debug {
2061                    println!(
2062                        "[MeshStoreCore] Dropping response for {} due to actor profile",
2063                        hash_to_key(&hash)
2064                    );
2065                }
2066                return;
2067            }
2068
2069            let response_delay = self.response_send_delay(&hash, data.len());
2070            if !response_delay.is_zero() {
2071                tokio::time::sleep(response_delay).await;
2072            }
2073
2074            if self.should_corrupt_response(&hash) {
2075                if data.is_empty() {
2076                    data.push(0x80);
2077                } else {
2078                    data[0] ^= 0x80;
2079                }
2080            }
2081
2082            // Send response
2083            let res = create_response(&hash, data);
2084            let response_bytes = encode_response(&res);
2085            if let Some(channel) = self.signaling.get_channel(from_peer).await {
2086                let _ = channel.send(response_bytes).await;
2087            }
2088            return;
2089        }
2090
2091        if self.pending_requests.read().await.contains_key(&hash_key) {
2092            let _ = self.begin_forward_request(&hash_key, from_peer).await;
2093            return;
2094        }
2095
2096        if !self.begin_forward_request(&hash_key, from_peer).await {
2097            return;
2098        }
2099
2100        let from_peer = from_peer.to_string();
2101        let this = Arc::clone(self);
2102        let request_htl = req.htl;
2103        tokio::spawn(async move {
2104            let result = if allow_peer_forwarding {
2105                let context = MeshReadContext {
2106                    exclude_peer_id: Some(from_peer.clone()),
2107                    request_htl,
2108                };
2109                this.request_from_mesh_with_context(&hash, &context).await
2110            } else {
2111                if this.debug {
2112                    println!(
2113                        "[MeshStoreCore] Serving request from delinquent peer {} via read sources only",
2114                        from_peer
2115                    );
2116                }
2117                match this.request_from_read_sources(&hash).await {
2118                    RouteFetchOutcome::Hit(data) => Some(data),
2119                    RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
2120                }
2121            };
2122            let requester_ids = this.take_forward_requesters(&hash_key).await;
2123            if let Some(data) = result {
2124                let res = create_response(&hash, data);
2125                let response_bytes = encode_response(&res);
2126                for requester_id in requester_ids {
2127                    if let Some(channel) = this.signaling.get_channel(&requester_id).await {
2128                        let _ = channel.send(response_bytes.clone()).await;
2129                    }
2130                }
2131            }
2132        });
2133    }
2134
2135    /// Handle incoming data message
2136    pub async fn handle_data_message(self: &Arc<Self>, from_peer: &str, data: &[u8]) {
2137        let parsed = match parse_message(data) {
2138            Some(m) => m,
2139            None => return,
2140        };
2141
2142        match parsed {
2143            DataMessage::Request(req) => {
2144                self.handle_request_message(from_peer, req).await;
2145            }
2146            DataMessage::Response(res) => {
2147                self.handle_response_message(from_peer, res).await;
2148            }
2149            DataMessage::QuoteRequest(req) => {
2150                self.handle_quote_request_message(from_peer, req).await;
2151            }
2152            DataMessage::QuoteResponse(res) => {
2153                self.handle_quote_response_message(from_peer, res).await;
2154            }
2155            DataMessage::Payment(_) | DataMessage::PaymentAck(_) | DataMessage::Chunk(_) => {}
2156        }
2157    }
2158}
2159
2160#[async_trait]
2161impl<S, R, F> Store for MeshStoreCore<S, R, F>
2162where
2163    S: Store + Send + Sync + 'static,
2164    R: SignalingTransport + Send + Sync + 'static,
2165    F: PeerLinkFactory + Send + Sync + 'static,
2166{
2167    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2168        self.local_store.put(hash, data).await
2169    }
2170
2171    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2172        // Try local first
2173        if let Some(data) = self.local_store.get(hash).await? {
2174            return Ok(Some(data));
2175        }
2176
2177        // Try peers
2178        Ok(self.request_from_mesh(hash).await)
2179    }
2180
2181    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2182        self.local_store.has(hash).await
2183    }
2184
2185    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2186        self.local_store.delete(hash).await
2187    }
2188}
2189
2190#[cfg(test)]
2191mod tests;
2192
2193/// Type alias for simulation store.
2194pub type SimMeshStore<S> =
2195    MeshStoreCore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
2196
2197/// Type alias for the default production core (Nostr signaling + WebRTC links).
2198pub type ProductionMeshStore<S> =
2199    MeshStoreCore<S, crate::nostr::NostrRelayTransport, crate::real_factory::WebRtcPeerLinkFactory>;