Skip to main content

hashtree_network/
mesh_store_core.rs

1//! Shared routed mesh store core.
2//!
3//! This module provides a concrete store wrapper that works with any local storage
4//! backend plus any signaling transport and peer-link factory. Both production
5//! (Nostr websockets + WebRTC) and simulation (mocks) use this same code.
6
7use async_trait::async_trait;
8use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
9use std::collections::hash_map::DefaultHasher;
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::hash::{Hash as _, Hasher};
13use std::ops::Range;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::{oneshot, Mutex, RwLock};
17
18use hashtree_core::{Hash, Store, StoreError};
19
20use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
21use crate::protocol::{
22    create_quote_request, create_quote_response_available, create_quote_response_unavailable,
23    create_request, create_request_with_quote, create_response, encode_quote_request,
24    encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
25    DataMessage, DataQuoteRequest, DataQuoteResponse,
26};
27use crate::signaling::MeshRouter;
28use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
29use crate::types::{PeerHTLConfig, SignalingMessage, MAX_HTL};
30
31// Keep the on-disk namespace stable across the crate rename so existing peer
32// metadata does not disappear for users upgrading from the old package name.
33const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
34
35/// Pending request awaiting response
36struct PendingRequest {
37    response_tx: oneshot::Sender<Option<Vec<u8>>>,
38    started_at: Instant,
39    queried_peers: Vec<String>,
40}
41
42struct PendingQuoteRequest {
43    response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
44    preferred_mint_url: Option<String>,
45    offered_payment_sat: u64,
46}
47
48struct PendingForwardRequest {
49    requester_ids: HashSet<String>,
50}
51
52#[async_trait]
53pub trait MeshReadSource: Send + Sync {
54    fn id(&self) -> &str;
55
56    fn is_available(&self) -> bool {
57        true
58    }
59
60    async fn get(&self, hash: &Hash) -> Option<Vec<u8>>;
61}
62
63#[derive(Debug, Clone)]
64struct NegotiatedQuote {
65    peer_id: String,
66    quote_id: u64,
67    #[allow(dead_code)]
68    mint_url: Option<String>,
69}
70
71struct IssuedQuote {
72    expires_at: Instant,
73    #[allow(dead_code)]
74    payment_sat: u64,
75    #[allow(dead_code)]
76    mint_url: Option<String>,
77}
78
79#[derive(Debug, Clone, Default)]
80struct AdaptiveSourceStats {
81    requests: u64,
82    successes: u64,
83    misses: u64,
84    failures: u64,
85    timeouts: u64,
86    srtt_ms: f64,
87    rttvar_ms: f64,
88    backoff_level: u32,
89    backed_off_until: Option<Instant>,
90    last_success_at: Option<Instant>,
91    last_failure_at: Option<Instant>,
92}
93
94#[derive(Debug, Clone)]
95struct SourceFetchResult {
96    data: Vec<u8>,
97}
98
99struct InflightSourceFetch {
100    waiters: Vec<oneshot::Sender<Option<SourceFetchResult>>>,
101}
102
103enum SourceFetchOutcome {
104    Hit {
105        source_id: String,
106        data: Vec<u8>,
107        elapsed_ms: u64,
108    },
109    Miss {
110        source_id: String,
111    },
112    Failure {
113        source_id: String,
114    },
115}
116
117const INITIAL_SOURCE_BACKOFF_MS: u64 = 250;
118const MAX_SOURCE_BACKOFF_MS: u64 = 10_000;
119const SOURCE_SCORE_TIE_DELTA: f64 = 0.15;
120const RECENT_SOURCE_SUCCESS_WINDOW: Duration = Duration::from_secs(60);
121const ACTIVE_PEER_REQUEST_RANK_PENALTY: usize = 3;
122
123fn source_reliability_score(stats: &AdaptiveSourceStats) -> f64 {
124    (stats.successes as f64 + 1.0) / (stats.requests as f64 + 2.0)
125}
126
127fn source_latency_score(stats: &AdaptiveSourceStats) -> f64 {
128    if stats.srtt_ms <= 0.0 {
129        return 0.5;
130    }
131    (500.0 / (stats.srtt_ms + 50.0)).min(1.0)
132}
133
134fn source_has_history(stats: &AdaptiveSourceStats) -> bool {
135    stats.requests > 0
136        || stats.successes > 0
137        || stats.misses > 0
138        || stats.failures > 0
139        || stats.timeouts > 0
140}
141
142fn adaptive_source_score(stats: &AdaptiveSourceStats, now: Instant) -> f64 {
143    if let Some(backed_off_until) = stats.backed_off_until {
144        if backed_off_until > now {
145            return f64::NEG_INFINITY;
146        }
147    }
148
149    let miss_penalty = if stats.requests > 0 {
150        (stats.misses as f64 / stats.requests as f64) * 0.15
151    } else {
152        0.0
153    };
154    let failure_penalty = if stats.requests > 0 {
155        ((stats.failures + stats.timeouts) as f64 / stats.requests as f64) * 0.3
156    } else {
157        0.0
158    };
159    let recency_bonus = if stats
160        .last_success_at
161        .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
162    {
163        0.1
164    } else {
165        0.0
166    };
167
168    0.6 * source_reliability_score(stats) + 0.3 * source_latency_score(stats) + recency_bonus
169        - miss_penalty
170        - failure_penalty
171}
172
173#[derive(Clone)]
174enum ReadRoute {
175    Peers(Vec<String>),
176    Sources,
177}
178
179impl ReadRoute {
180    fn id(&self) -> &'static str {
181        match self {
182            Self::Peers(_) => "peers",
183            Self::Sources => "sources",
184        }
185    }
186}
187
188#[derive(Debug, Clone)]
189struct MeshReadContext {
190    exclude_peer_id: Option<String>,
191    request_htl: u8,
192}
193
194impl Default for MeshReadContext {
195    fn default() -> Self {
196        Self {
197            exclude_peer_id: None,
198            request_htl: MAX_HTL,
199        }
200    }
201}
202
203/// Aggregate stats from draining currently available peer-link messages.
204#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
205pub struct DataPumpStats {
206    pub processed: usize,
207    pub request_messages: usize,
208    pub response_messages: usize,
209    pub quote_request_messages: u64,
210    pub quote_response_messages: u64,
211    pub processed_bytes: u64,
212}
213
214/// Request dispatch strategy for peer queries.
215///
216/// `MeshStoreCore` supports two practical retrieval modes:
217/// - Flood (`usize::MAX` fanout): maximize success/latency at bandwidth cost.
218/// - Staged hedging: probe a subset first, then expand.
219#[derive(Debug, Clone, Copy)]
220pub struct RequestDispatchConfig {
221    /// Number of peers queried immediately.
222    pub initial_fanout: usize,
223    /// Number of additional peers to query on each hedge step.
224    pub hedge_fanout: usize,
225    /// Total peers allowed for this request.
226    pub max_fanout: usize,
227    /// Delay between hedge waves (ms). `0` means send all waves immediately.
228    pub hedge_interval_ms: u64,
229}
230
231impl Default for RequestDispatchConfig {
232    fn default() -> Self {
233        Self {
234            initial_fanout: usize::MAX,
235            hedge_fanout: usize::MAX,
236            max_fanout: usize::MAX,
237            hedge_interval_ms: 0,
238        }
239    }
240}
241
242/// Normalize fanout config against current peer availability.
243pub fn normalize_dispatch_config(
244    dispatch: RequestDispatchConfig,
245    available_peers: usize,
246) -> RequestDispatchConfig {
247    let mut cfg = dispatch;
248    let cap = if cfg.max_fanout == 0 {
249        available_peers
250    } else {
251        cfg.max_fanout.min(available_peers)
252    };
253    cfg.max_fanout = cap;
254    cfg.initial_fanout = if cfg.initial_fanout == 0 {
255        1
256    } else {
257        cfg.initial_fanout.min(cap.max(1))
258    };
259    cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
260        1
261    } else {
262        cfg.hedge_fanout.min(cap.max(1))
263    };
264    cfg
265}
266
267/// Build wave sizes for staged hedged dispatch.
268pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
269    if peer_count == 0 {
270        return Vec::new();
271    }
272    let cap = dispatch.max_fanout.min(peer_count);
273    if cap == 0 {
274        return Vec::new();
275    }
276
277    let mut plan = Vec::new();
278    let mut sent = 0usize;
279    let first = dispatch.initial_fanout.min(cap).max(1);
280    plan.push(first);
281    sent += first;
282
283    while sent < cap {
284        let next = dispatch.hedge_fanout.min(cap - sent).max(1);
285        plan.push(next);
286        sent += next;
287    }
288    plan
289}
290
291/// Outcome returned after waiting on a hedged dispatch wave.
292#[derive(Debug)]
293pub enum HedgedWaveAction<T> {
294    Continue,
295    Success(T),
296    Abort,
297}
298
299/// Run a staged hedged dispatch over peer index ranges.
300///
301/// This scheduler is shared by the reusable `MeshStoreCore` and the native
302/// `hashtree-cli` mesh path so tests and production use the same wave timing.
303pub async fn run_hedged_waves<T, SendWave, SendWaveFut, WaitWave, WaitWaveFut>(
304    peer_count: usize,
305    dispatch: RequestDispatchConfig,
306    request_timeout: Duration,
307    mut send_wave: SendWave,
308    mut wait_wave: WaitWave,
309) -> Option<T>
310where
311    SendWave: FnMut(Range<usize>) -> SendWaveFut,
312    SendWaveFut: Future<Output = usize>,
313    WaitWave: FnMut(Duration) -> WaitWaveFut,
314    WaitWaveFut: Future<Output = HedgedWaveAction<T>>,
315{
316    let dispatch = normalize_dispatch_config(dispatch, peer_count);
317    let wave_plan = build_hedged_wave_plan(peer_count, dispatch);
318    if wave_plan.is_empty() {
319        return None;
320    }
321
322    let deadline = Instant::now() + request_timeout;
323    let mut sent_total = 0usize;
324    let mut next_peer_idx = 0usize;
325
326    for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
327        let from = next_peer_idx;
328        let to = (next_peer_idx + wave_size).min(peer_count);
329        next_peer_idx = to;
330
331        if from == to {
332            continue;
333        }
334
335        sent_total += send_wave(from..to).await;
336        if sent_total == 0 {
337            if next_peer_idx >= peer_count {
338                break;
339            }
340            continue;
341        }
342
343        let now = Instant::now();
344        if now >= deadline {
345            break;
346        }
347        let remaining = deadline.saturating_duration_since(now);
348        let is_last_wave = wave_idx + 1 == wave_plan.len() || next_peer_idx >= peer_count;
349        let wait = if is_last_wave {
350            remaining
351        } else if dispatch.hedge_interval_ms == 0 {
352            Duration::ZERO
353        } else {
354            Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
355        };
356
357        if wait.is_zero() {
358            continue;
359        }
360
361        match wait_wave(wait).await {
362            HedgedWaveAction::Continue => {}
363            HedgedWaveAction::Success(value) => return Some(value),
364            HedgedWaveAction::Abort => break,
365        }
366    }
367
368    None
369}
370
371/// Keep selector membership aligned with currently connected peer IDs.
372pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
373    let mut selector = selector.write().await;
374    let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
375    let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
376    for peer_id in known {
377        if !current.contains(peer_id.as_str()) {
378            selector.remove_peer(&peer_id);
379        }
380    }
381    for peer_id in current_peer_ids {
382        selector.add_peer(peer_id.clone());
383    }
384}
385
386/// Response behavior profile for simulation/game-theory actors.
387///
388/// Defaults to honest behavior (always respond correctly, no extra delay).
389#[derive(Debug, Clone, Copy)]
390pub struct ResponseBehaviorConfig {
391    /// Probability that a node drops a response even when it has data.
392    pub drop_response_prob: f64,
393    /// Probability that a node responds with corrupted payload.
394    pub corrupt_response_prob: f64,
395    /// Optional response delay to model slow/incompetent peers.
396    pub extra_delay_ms: u64,
397}
398
399impl Default for ResponseBehaviorConfig {
400    fn default() -> Self {
401        Self {
402            drop_response_prob: 0.0,
403            corrupt_response_prob: 0.0,
404            extra_delay_ms: 0,
405        }
406    }
407}
408
409impl ResponseBehaviorConfig {
410    fn normalized(self) -> Self {
411        Self {
412            drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
413            corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
414            extra_delay_ms: self.extra_delay_ms,
415        }
416    }
417}
418
419/// Routing policy for request ordering + dispatch fanout.
420#[derive(Debug, Clone)]
421pub struct MeshRoutingConfig {
422    pub selection_strategy: SelectionStrategy,
423    pub fairness_enabled: bool,
424    /// Blend weight for payment-priority ranking in selector (`0.0` disables).
425    pub cashu_payment_weight: f64,
426    /// Refuse serving peers that have reached this many unpaid post-delivery settlements.
427    /// `0` disables refusal and only keeps metadata/downranking.
428    pub cashu_payment_default_block_threshold: u64,
429    /// Cashu mint URLs this node is willing to use for settlement.
430    pub cashu_accepted_mints: Vec<String>,
431    /// Preferred Cashu mint URL when initiating paid retrieval.
432    pub cashu_default_mint: Option<String>,
433    /// Baseline cap for accepting a peer-suggested mint outside the trusted set.
434    pub cashu_peer_suggested_mint_base_cap_sat: u64,
435    /// Additional sats allowed per successful delivery from that peer.
436    pub cashu_peer_suggested_mint_success_step_sat: u64,
437    /// Additional sats allowed per successful post-delivery payment received from that peer.
438    pub cashu_peer_suggested_mint_receipt_step_sat: u64,
439    /// Hard upper bound for any single peer-suggested mint quote we accept.
440    pub cashu_peer_suggested_mint_max_cap_sat: u64,
441    pub dispatch: RequestDispatchConfig,
442    pub response_behavior: ResponseBehaviorConfig,
443}
444
445impl Default for MeshRoutingConfig {
446    fn default() -> Self {
447        Self {
448            selection_strategy: SelectionStrategy::Weighted,
449            fairness_enabled: true,
450            cashu_payment_weight: 0.0,
451            cashu_payment_default_block_threshold: 0,
452            cashu_accepted_mints: Vec::new(),
453            cashu_default_mint: None,
454            cashu_peer_suggested_mint_base_cap_sat: 0,
455            cashu_peer_suggested_mint_success_step_sat: 0,
456            cashu_peer_suggested_mint_receipt_step_sat: 0,
457            cashu_peer_suggested_mint_max_cap_sat: 0,
458            dispatch: RequestDispatchConfig::default(),
459            response_behavior: ResponseBehaviorConfig::default(),
460        }
461    }
462}
463
464/// Routed mesh store core that works with any storage backend and transport
465/// implementation.
466///
467/// This is the shared code between production and simulation.
468/// - Production: `MeshStoreCore<LmdbStore, NostrRelayTransport, WebRtcPeerLinkFactory>`
469/// - Simulation: `MeshStoreCore<MemoryStore, MockRelayTransport, MockConnectionFactory>`
470pub struct MeshStoreCore<S, R, F>
471where
472    S: Store + Send + Sync + 'static,
473    R: SignalingTransport + Send + Sync + 'static,
474    F: PeerLinkFactory + Send + Sync + 'static,
475{
476    /// Local backing store
477    local_store: Arc<S>,
478    /// Mesh router (handles peer discovery and connection)
479    signaling: Arc<MeshRouter<R, F>>,
480    /// Per-peer HTL config
481    htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
482    /// Pending requests we sent
483    pending_requests: RwLock<HashMap<String, PendingRequest>>,
484    /// Pending quote negotiations keyed by requested hash.
485    pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
486    /// Forwarded peer requests currently being resolved through the mesh/upstream.
487    pending_forward_requests: RwLock<HashMap<String, PendingForwardRequest>>,
488    /// Quotes we issued to peers and will accept exactly once until expiry.
489    issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
490    /// Monotonic quote identifier generator.
491    next_quote_id: RwLock<u64>,
492    /// Non-peer read sources such as upstream Blossom servers.
493    read_sources: RwLock<HashMap<String, Arc<dyn MeshReadSource>>>,
494    /// Adaptive health stats for non-peer read sources.
495    read_source_stats: RwLock<HashMap<String, AdaptiveSourceStats>>,
496    /// Shared in-flight upstream reads keyed by hash.
497    inflight_source_fetches: Mutex<HashMap<String, InflightSourceFetch>>,
498    /// Adaptive route stats for choosing peers vs upstream sources.
499    read_route_stats: RwLock<HashMap<String, AdaptiveSourceStats>>,
500    /// Adaptive selector for peer ordering.
501    peer_selector: RwLock<PeerSelector>,
502    /// Active per-peer in-flight reads so concurrent block fetches spread across peers.
503    peer_active_requests: RwLock<HashMap<String, usize>>,
504    /// Routing/dispatch configuration.
505    routing: MeshRoutingConfig,
506    /// Request timeout
507    request_timeout: Duration,
508    /// Debug mode
509    debug: bool,
510    /// Running flag
511    running: RwLock<bool>,
512}
513
514impl<S, R, F> MeshStoreCore<S, R, F>
515where
516    S: Store + Send + Sync + 'static,
517    R: SignalingTransport + Send + Sync + 'static,
518    F: PeerLinkFactory + Send + Sync + 'static,
519{
520    /// Create a new routed mesh store core.
521    pub fn new(
522        local_store: Arc<S>,
523        signaling: Arc<MeshRouter<R, F>>,
524        request_timeout: Duration,
525        debug: bool,
526    ) -> Self {
527        Self::new_with_routing(
528            local_store,
529            signaling,
530            request_timeout,
531            debug,
532            Default::default(),
533        )
534    }
535
536    /// Create a new routed mesh store core with explicit routing configuration.
537    pub fn new_with_routing(
538        local_store: Arc<S>,
539        signaling: Arc<MeshRouter<R, F>>,
540        request_timeout: Duration,
541        debug: bool,
542        routing: MeshRoutingConfig,
543    ) -> Self {
544        let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
545        selector.set_fairness(routing.fairness_enabled);
546        selector.set_cashu_payment_weight(routing.cashu_payment_weight);
547        Self {
548            local_store,
549            signaling,
550            htl_configs: RwLock::new(HashMap::new()),
551            pending_requests: RwLock::new(HashMap::new()),
552            pending_quotes: RwLock::new(HashMap::new()),
553            pending_forward_requests: RwLock::new(HashMap::new()),
554            issued_quotes: RwLock::new(HashMap::new()),
555            next_quote_id: RwLock::new(1),
556            read_sources: RwLock::new(HashMap::new()),
557            read_source_stats: RwLock::new(HashMap::new()),
558            inflight_source_fetches: Mutex::new(HashMap::new()),
559            read_route_stats: RwLock::new(HashMap::new()),
560            peer_selector: RwLock::new(selector),
561            peer_active_requests: RwLock::new(HashMap::new()),
562            routing,
563            request_timeout,
564            debug,
565            running: RwLock::new(false),
566        }
567    }
568
569    /// Start the store (begin listening for messages)
570    pub async fn start(&self) -> Result<(), TransportError> {
571        *self.running.write().await = true;
572
573        // Send initial hello
574        self.signaling.send_hello(vec![]).await?;
575
576        Ok(())
577    }
578
579    /// Stop the store
580    pub async fn stop(&self) {
581        *self.running.write().await = false;
582    }
583
584    /// Process incoming signaling message
585    pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
586        // When a new peer connects, initialize their HTL config
587        let peer_id = msg.peer_id().to_string();
588        {
589            let mut configs = self.htl_configs.write().await;
590            if !configs.contains_key(&peer_id) {
591                configs.insert(peer_id.clone(), PeerHTLConfig::random());
592            }
593        }
594        self.peer_selector.write().await.add_peer(peer_id);
595
596        self.signaling.handle_message(msg).await
597    }
598
599    /// Get signaling manager reference
600    pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
601        &self.signaling
602    }
603
604    fn response_behavior(&self) -> ResponseBehaviorConfig {
605        self.routing.response_behavior.normalized()
606    }
607
608    fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
609        let mut hasher = DefaultHasher::new();
610        peer_id.hash(&mut hasher);
611        hash.hash(&mut hasher);
612        salt.hash(&mut hasher);
613        let v = hasher.finish();
614        (v as f64) / (u64::MAX as f64)
615    }
616
617    fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
618        Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
619    }
620
621    fn peer_metadata_pointer_slot_hash() -> Hash {
622        hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
623    }
624
625    fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
626        let bytes = hex::decode(hash_hex)
627            .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
628        if bytes.len() != 32 {
629            return Err(StoreError::Other(format!(
630                "Invalid hash length {}, expected 32 bytes",
631                bytes.len()
632            )));
633        }
634        let mut hash = [0u8; 32];
635        hash.copy_from_slice(&bytes);
636        Ok(hash)
637    }
638
639    fn should_drop_response(&self, hash: &Hash) -> bool {
640        let p = self.response_behavior().drop_response_prob;
641        if p <= 0.0 {
642            return false;
643        }
644        self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
645    }
646
647    fn should_corrupt_response(&self, hash: &Hash) -> bool {
648        let p = self.response_behavior().corrupt_response_prob;
649        if p <= 0.0 {
650            return false;
651        }
652        self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
653    }
654
655    async fn ordered_connected_peers(&self, exclude_peer_id: Option<&str>) -> Vec<String> {
656        let current_peer_ids = self.signaling.peer_ids().await;
657        if current_peer_ids.is_empty() {
658            return Vec::new();
659        }
660
661        sync_selector_peers(&self.peer_selector, &current_peer_ids).await;
662        let mut candidate_peer_ids: Vec<String> = current_peer_ids
663            .into_iter()
664            .filter(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude))
665            .collect();
666        if candidate_peer_ids.is_empty() {
667            return Vec::new();
668        }
669
670        let current_set: HashSet<&str> = candidate_peer_ids.iter().map(String::as_str).collect();
671        let mut selector = self.peer_selector.write().await;
672        let mut selector_order = selector.select_peers();
673        selector_order.retain(|peer_id| current_set.contains(peer_id.as_str()));
674        if selector_order.is_empty() {
675            let mut fallback = candidate_peer_ids;
676            fallback.sort();
677            return fallback;
678        }
679        let backed_off: HashMap<String, bool> = candidate_peer_ids
680            .iter()
681            .map(|peer_id| (peer_id.clone(), selector.is_peer_backed_off(peer_id)))
682            .collect();
683        drop(selector);
684
685        let rank: HashMap<&str, usize> = selector_order
686            .iter()
687            .enumerate()
688            .map(|(idx, peer_id)| (peer_id.as_str(), idx))
689            .collect();
690        let active = self.peer_active_requests.read().await;
691        candidate_peer_ids.sort_by(|left, right| {
692            let left_backed_off = backed_off.get(left).copied().unwrap_or(false);
693            let right_backed_off = backed_off.get(right).copied().unwrap_or(false);
694            if left_backed_off != right_backed_off {
695                return if left_backed_off {
696                    std::cmp::Ordering::Greater
697                } else {
698                    std::cmp::Ordering::Less
699                };
700            }
701            let left_rank = rank.get(left.as_str()).copied().unwrap_or(usize::MAX / 2);
702            let right_rank = rank.get(right.as_str()).copied().unwrap_or(usize::MAX / 2);
703            let left_load = active.get(left).copied().unwrap_or(0);
704            let right_load = active.get(right).copied().unwrap_or(0);
705            (left_rank + left_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY))
706                .cmp(&(right_rank + right_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY)))
707                .then_with(|| left.cmp(right))
708        });
709        candidate_peer_ids
710    }
711
712    async fn reserve_peer_request(&self, peer_id: &str) {
713        let mut active = self.peer_active_requests.write().await;
714        *active.entry(peer_id.to_string()).or_insert(0) += 1;
715    }
716
717    async fn release_peer_request(&self, peer_id: &str) {
718        let mut active = self.peer_active_requests.write().await;
719        let Some(count) = active.get_mut(peer_id) else {
720            return;
721        };
722        if *count <= 1 {
723            active.remove(peer_id);
724        } else {
725            *count -= 1;
726        }
727    }
728
729    async fn release_queried_peer_requests(&self, peer_ids: &[String]) {
730        for peer_id in peer_ids {
731            self.release_peer_request(peer_id).await;
732        }
733    }
734
735    fn requested_quote_mint(&self) -> Option<&str> {
736        if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
737            if self.routing.cashu_accepted_mints.is_empty()
738                || self
739                    .routing
740                    .cashu_accepted_mints
741                    .iter()
742                    .any(|mint| mint == default_mint)
743            {
744                return Some(default_mint);
745            }
746        }
747
748        self.routing
749            .cashu_accepted_mints
750            .first()
751            .map(String::as_str)
752    }
753
754    fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
755        if let Some(requested_mint) = requested_mint {
756            if self.accepts_quote_mint(Some(requested_mint)) {
757                return Some(requested_mint.to_string());
758            }
759        }
760        if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
761            return Some(default_mint.clone());
762        }
763        if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
764            return Some(first_mint.clone());
765        }
766        requested_mint.map(str::to_string)
767    }
768
769    fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
770        if self.routing.cashu_accepted_mints.is_empty() {
771            return true;
772        }
773
774        let Some(mint_url) = mint_url else {
775            return false;
776        };
777        self.routing
778            .cashu_accepted_mints
779            .iter()
780            .any(|mint| mint == mint_url)
781    }
782
783    fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
784        let Some(mint_url) = mint_url else {
785            return self.routing.cashu_default_mint.is_none()
786                && self.routing.cashu_accepted_mints.is_empty();
787        };
788        self.routing.cashu_default_mint.as_deref() == Some(mint_url)
789            || self
790                .routing
791                .cashu_accepted_mints
792                .iter()
793                .any(|mint| mint == mint_url)
794    }
795
796    async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
797        let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
798        if base == 0 {
799            return 0;
800        }
801
802        let selector = self.peer_selector.read().await;
803        let Some(stats) = selector.get_stats(peer_id) else {
804            let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
805            return if max_cap > 0 { base.min(max_cap) } else { base };
806        };
807
808        if stats.cashu_payment_defaults > 0
809            && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
810        {
811            return 0;
812        }
813
814        let success_bonus = stats
815            .successes
816            .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
817        let receipt_bonus = stats
818            .cashu_payment_receipts
819            .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
820        let mut cap = base
821            .saturating_add(success_bonus)
822            .saturating_add(receipt_bonus);
823        let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
824        if max_cap > 0 {
825            cap = cap.min(max_cap);
826        }
827        cap
828    }
829
830    async fn should_accept_quote_response(
831        &self,
832        from_peer: &str,
833        preferred_mint_url: Option<&str>,
834        offered_payment_sat: u64,
835        res: &DataQuoteResponse,
836    ) -> bool {
837        let Some(payment_sat) = res.p else {
838            return false;
839        };
840        if payment_sat > offered_payment_sat {
841            return false;
842        }
843
844        let response_mint = res.m.as_deref();
845        if response_mint == preferred_mint_url {
846            return true;
847        }
848        if self.trusts_quote_mint(response_mint) {
849            return true;
850        }
851        if response_mint.is_none() {
852            return false;
853        }
854
855        payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
856    }
857
858    async fn issue_quote(
859        &self,
860        peer_id: &str,
861        hash_key: &str,
862        payment_sat: u64,
863        ttl_ms: u32,
864        mint_url: Option<&str>,
865    ) -> u64 {
866        let quote_id = {
867            let mut next = self.next_quote_id.write().await;
868            let quote_id = *next;
869            *next = next.saturating_add(1);
870            quote_id
871        };
872
873        let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
874        self.issued_quotes.write().await.insert(
875            (peer_id.to_string(), hash_key.to_string(), quote_id),
876            IssuedQuote {
877                expires_at,
878                payment_sat,
879                mint_url: mint_url.map(str::to_string),
880            },
881        );
882        quote_id
883    }
884
885    async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
886        let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
887        let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
888            return false;
889        };
890        quote.expires_at > Instant::now()
891    }
892
893    async fn send_request_to_peer(
894        &self,
895        peer_id: &str,
896        hash: &Hash,
897        request_htl: u8,
898        quote_id: Option<u64>,
899    ) -> bool {
900        let channel = match self.signaling.get_channel(peer_id).await {
901            Some(c) => c,
902            None => return false,
903        };
904
905        let htl_config = {
906            let configs = self.htl_configs.read().await;
907            configs
908                .get(peer_id)
909                .cloned()
910                .unwrap_or_else(PeerHTLConfig::random)
911        };
912
913        let send_htl = htl_config.decrement(request_htl);
914        let req = match quote_id {
915            Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
916            None => create_request(hash, send_htl),
917        };
918        let request_bytes = encode_request(&req);
919
920        {
921            let mut selector = self.peer_selector.write().await;
922            selector.record_request(peer_id, request_bytes.len() as u64);
923        }
924
925        match channel.send(request_bytes).await {
926            Ok(()) => true,
927            Err(_) => {
928                self.peer_selector.write().await.record_failure(peer_id);
929                false
930            }
931        }
932    }
933
934    async fn send_quote_request_to_peer(
935        &self,
936        peer_id: &str,
937        hash: &Hash,
938        payment_sat: u64,
939        ttl_ms: u32,
940        mint_url: Option<&str>,
941    ) -> bool {
942        let channel = match self.signaling.get_channel(peer_id).await {
943            Some(c) => c,
944            None => return false,
945        };
946
947        let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
948        let request_bytes = encode_quote_request(&req);
949
950        match channel.send(request_bytes).await {
951            Ok(()) => true,
952            Err(_) => false,
953        }
954    }
955
956    pub async fn set_read_sources(&self, sources: Vec<Arc<dyn MeshReadSource>>) {
957        let mut by_id = HashMap::new();
958        let mut stats = self.read_source_stats.write().await;
959        for source in sources {
960            let source_id = source.id().to_string();
961            by_id.insert(source_id.clone(), source);
962            stats
963                .entry(source_id)
964                .or_insert_with(AdaptiveSourceStats::default);
965        }
966        *self.read_sources.write().await = by_id;
967    }
968
969    fn route_stats_for<'a>(
970        stats: &'a mut HashMap<String, AdaptiveSourceStats>,
971        route_id: &str,
972    ) -> &'a mut AdaptiveSourceStats {
973        stats
974            .entry(route_id.to_string())
975            .or_insert_with(AdaptiveSourceStats::default)
976    }
977
978    async fn record_route_request(&self, route_id: &str) {
979        let mut stats = self.read_route_stats.write().await;
980        Self::route_stats_for(&mut stats, route_id).requests += 1;
981    }
982
983    async fn record_route_success(&self, route_id: &str, elapsed_ms: u64) {
984        let now = Instant::now();
985        let mut stats = self.read_route_stats.write().await;
986        let stats = Self::route_stats_for(&mut stats, route_id);
987        stats.successes += 1;
988        stats.last_success_at = Some(now);
989        stats.backoff_level = 0;
990        stats.backed_off_until = None;
991        if stats.srtt_ms <= 0.0 {
992            stats.srtt_ms = elapsed_ms as f64;
993            stats.rttvar_ms = elapsed_ms as f64 / 2.0;
994            return;
995        }
996        let elapsed = elapsed_ms as f64;
997        stats.rttvar_ms = 0.75 * stats.rttvar_ms + 0.25 * (stats.srtt_ms - elapsed).abs();
998        stats.srtt_ms = 0.875 * stats.srtt_ms + 0.125 * elapsed;
999    }
1000
1001    async fn record_route_miss(&self, route_id: &str) {
1002        let mut stats = self.read_route_stats.write().await;
1003        Self::route_stats_for(&mut stats, route_id).misses += 1;
1004    }
1005
1006    async fn record_read_source_request(&self, source_id: &str) {
1007        let mut stats = self.read_source_stats.write().await;
1008        stats
1009            .entry(source_id.to_string())
1010            .or_insert_with(AdaptiveSourceStats::default)
1011            .requests += 1;
1012    }
1013
1014    async fn record_read_source_miss(&self, source_id: &str) {
1015        let mut stats = self.read_source_stats.write().await;
1016        stats
1017            .entry(source_id.to_string())
1018            .or_insert_with(AdaptiveSourceStats::default)
1019            .misses += 1;
1020    }
1021
1022    async fn record_read_source_success(&self, source_id: &str, elapsed_ms: u64) {
1023        let now = Instant::now();
1024        let mut stats = self.read_source_stats.write().await;
1025        let stats = stats
1026            .entry(source_id.to_string())
1027            .or_insert_with(AdaptiveSourceStats::default);
1028        stats.successes += 1;
1029        stats.last_success_at = Some(now);
1030        stats.backoff_level = 0;
1031        stats.backed_off_until = None;
1032        if stats.srtt_ms <= 0.0 {
1033            stats.srtt_ms = elapsed_ms as f64;
1034            stats.rttvar_ms = elapsed_ms as f64 / 2.0;
1035            return;
1036        }
1037        let elapsed = elapsed_ms as f64;
1038        stats.rttvar_ms = 0.75 * stats.rttvar_ms + 0.25 * (stats.srtt_ms - elapsed).abs();
1039        stats.srtt_ms = 0.875 * stats.srtt_ms + 0.125 * elapsed;
1040    }
1041
1042    async fn record_read_source_failure(&self, source_id: &str) {
1043        let now = Instant::now();
1044        let mut stats = self.read_source_stats.write().await;
1045        let stats = stats
1046            .entry(source_id.to_string())
1047            .or_insert_with(AdaptiveSourceStats::default);
1048        stats.failures += 1;
1049        stats.last_failure_at = Some(now);
1050        Self::apply_source_backoff(stats, now);
1051    }
1052
1053    async fn record_read_source_timeout(&self, source_id: &str) {
1054        let now = Instant::now();
1055        let mut stats = self.read_source_stats.write().await;
1056        let stats = stats
1057            .entry(source_id.to_string())
1058            .or_insert_with(AdaptiveSourceStats::default);
1059        stats.timeouts += 1;
1060        stats.last_failure_at = Some(now);
1061        Self::apply_source_backoff(stats, now);
1062    }
1063
1064    fn apply_source_backoff(stats: &mut AdaptiveSourceStats, now: Instant) {
1065        stats.backoff_level = stats.backoff_level.saturating_add(1);
1066        let backoff_ms = (INITIAL_SOURCE_BACKOFF_MS
1067            .saturating_mul(2u64.saturating_pow(stats.backoff_level.saturating_sub(1))))
1068        .min(MAX_SOURCE_BACKOFF_MS);
1069        stats.backed_off_until = Some(now + Duration::from_millis(backoff_ms));
1070    }
1071
1072    async fn ordered_read_sources(&self) -> Vec<Arc<dyn MeshReadSource>> {
1073        let sources = self.read_sources.read().await;
1074        if sources.is_empty() {
1075            return Vec::new();
1076        }
1077
1078        let mut available: Vec<Arc<dyn MeshReadSource>> = sources
1079            .values()
1080            .filter(|source| source.is_available())
1081            .cloned()
1082            .collect();
1083        if available.is_empty() {
1084            return Vec::new();
1085        }
1086
1087        let now = Instant::now();
1088        let stats = self.read_source_stats.read().await;
1089        let mut healthy: Vec<Arc<dyn MeshReadSource>> = available
1090            .iter()
1091            .filter(|source| {
1092                stats
1093                    .get(source.id())
1094                    .and_then(|s| s.backed_off_until)
1095                    .is_none_or(|until| until <= now)
1096            })
1097            .cloned()
1098            .collect();
1099        if !healthy.is_empty() {
1100            available = std::mem::take(&mut healthy);
1101        }
1102
1103        available.sort_by(|left, right| {
1104            let left_stats = stats.get(left.id()).cloned().unwrap_or_default();
1105            let right_stats = stats.get(right.id()).cloned().unwrap_or_default();
1106            adaptive_source_score(&right_stats, now)
1107                .partial_cmp(&adaptive_source_score(&left_stats, now))
1108                .unwrap_or(std::cmp::Ordering::Equal)
1109                .then_with(|| left.id().cmp(right.id()))
1110        });
1111        available
1112    }
1113
1114    async fn should_probe_multiple_read_sources(
1115        &self,
1116        ordered_sources: &[Arc<dyn MeshReadSource>],
1117    ) -> bool {
1118        if ordered_sources.len() <= 1 {
1119            return false;
1120        }
1121        let stats = self.read_source_stats.read().await;
1122        let best = stats
1123            .get(ordered_sources[0].id())
1124            .cloned()
1125            .unwrap_or_default();
1126        let second = stats
1127            .get(ordered_sources[1].id())
1128            .cloned()
1129            .unwrap_or_default();
1130        if !source_has_history(&best) || !source_has_history(&second) {
1131            return true;
1132        }
1133        let now = Instant::now();
1134        adaptive_source_score(&best, now) - adaptive_source_score(&second, now)
1135            < SOURCE_SCORE_TIE_DELTA
1136    }
1137
1138    async fn source_dispatch_for(&self, source_count: usize) -> RequestDispatchConfig {
1139        if source_count == 0 {
1140            return self.routing.dispatch;
1141        }
1142        let ordered_sources = self.ordered_read_sources().await;
1143        let probe_multiple = self
1144            .should_probe_multiple_read_sources(&ordered_sources)
1145            .await;
1146        RequestDispatchConfig {
1147            initial_fanout: if probe_multiple {
1148                source_count.min(2)
1149            } else {
1150                1
1151            },
1152            hedge_fanout: self.routing.dispatch.hedge_fanout,
1153            max_fanout: self.routing.dispatch.max_fanout.min(source_count),
1154            hedge_interval_ms: self.routing.dispatch.hedge_interval_ms,
1155        }
1156    }
1157
1158    /// Get peer count
1159    pub async fn peer_count(&self) -> usize {
1160        self.signaling.peer_count().await
1161    }
1162
1163    /// Check if we need more peers
1164    pub async fn needs_peers(&self) -> bool {
1165        self.signaling.needs_peers().await
1166    }
1167
1168    /// Re-broadcast hello to refresh discovery as topology changes.
1169    pub async fn send_hello(&self) -> Result<(), TransportError> {
1170        self.signaling.send_hello(vec![]).await
1171    }
1172
1173    /// Drain all currently available peer-link messages and handle them.
1174    ///
1175    /// This keeps the message pump logic shared between simulation and the
1176    /// default production wrapper instead of duplicating per-channel loops.
1177    pub async fn drain_available_data_messages(self: &Arc<Self>) -> DataPumpStats {
1178        let mut stats = DataPumpStats::default();
1179        let peer_ids = self.signaling.peer_ids().await;
1180        for peer_id in peer_ids {
1181            let Some(channel) = self.signaling.get_channel(&peer_id).await else {
1182                continue;
1183            };
1184
1185            while let Some(data) = channel.try_recv() {
1186                stats.processed += 1;
1187                stats.processed_bytes += data.len() as u64;
1188                if let Some(msg) = parse_message(&data) {
1189                    match msg {
1190                        DataMessage::Request(_) => stats.request_messages += 1,
1191                        DataMessage::Response(_) => stats.response_messages += 1,
1192                        DataMessage::QuoteRequest(_) => stats.quote_request_messages += 1,
1193                        DataMessage::QuoteResponse(_) => stats.quote_response_messages += 1,
1194                        DataMessage::Payment(_)
1195                        | DataMessage::PaymentAck(_)
1196                        | DataMessage::Chunk(_) => {}
1197                    }
1198                }
1199                self.handle_data_message(&peer_id, &data).await;
1200            }
1201        }
1202        stats
1203    }
1204
1205    /// Apply an out-of-band payment credit to a peer's routing priority.
1206    pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
1207        self.peer_selector
1208            .write()
1209            .await
1210            .record_cashu_payment(peer_id, amount_sat);
1211    }
1212
1213    /// Record a post-delivery payment we received from a peer.
1214    pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
1215        self.peer_selector
1216            .write()
1217            .await
1218            .record_cashu_receipt(peer_id, amount_sat);
1219    }
1220
1221    /// Record that a peer failed to pay after we delivered successfully.
1222    pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
1223        self.peer_selector
1224            .write()
1225            .await
1226            .record_cashu_payment_default(peer_id);
1227    }
1228
1229    /// Snapshot routing/selection summary for inspection/debugging.
1230    pub async fn selector_summary(&self) -> crate::peer_selector::SelectorSummary {
1231        self.peer_selector.read().await.summary()
1232    }
1233
1234    fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
1235        selector.is_peer_blocked_for_payment_defaults(
1236            peer_id,
1237            self.routing.cashu_payment_default_block_threshold,
1238        )
1239    }
1240
1241    /// Export live peer metadata for inspection/debugging.
1242    pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
1243        self.peer_selector
1244            .read()
1245            .await
1246            .export_peer_metadata_snapshot()
1247    }
1248
1249    /// Snapshot current peer metadata and persist it into `local_store`.
1250    ///
1251    /// Uses content-addressed storage for the snapshot body and a reserved
1252    /// mutable pointer slot for the "latest snapshot hash".
1253    pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
1254        let snapshot = self
1255            .peer_selector
1256            .read()
1257            .await
1258            .export_peer_metadata_snapshot();
1259        let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
1260            StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
1261        })?;
1262        let snapshot_hash = hashtree_core::sha256(&bytes);
1263        let _ = self.local_store.put(snapshot_hash, bytes).await?;
1264
1265        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
1266        let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
1267        let _ = self.local_store.delete(&pointer_slot).await?;
1268        let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
1269
1270        Ok(snapshot_hash)
1271    }
1272
1273    /// Load persisted peer metadata from `local_store` if available.
1274    pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
1275        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
1276        let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
1277            return Ok(false);
1278        };
1279        let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
1280            StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
1281        })?;
1282        let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
1283
1284        let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
1285            return Ok(false);
1286        };
1287        let snapshot: PeerMetadataSnapshot =
1288            serde_json::from_slice(&snapshot_bytes).map_err(|e| {
1289                StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
1290            })?;
1291        self.peer_selector
1292            .write()
1293            .await
1294            .import_peer_metadata_snapshot(&snapshot);
1295        Ok(true)
1296    }
1297
1298    /// Request data from peers after negotiating a paid quote.
1299    ///
1300    /// If quote negotiation fails or the quoted peer does not deliver, the store
1301    /// falls back to the normal unpaid retrieval path to preserve liveness.
1302    pub async fn get_with_quote(
1303        &self,
1304        hash: &Hash,
1305        payment_sat: u64,
1306        quote_ttl: Duration,
1307    ) -> Result<Option<Vec<u8>>, StoreError> {
1308        if let Some(data) = self.local_store.get(hash).await? {
1309            return Ok(Some(data));
1310        }
1311        Ok(self
1312            .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
1313            .await)
1314    }
1315
1316    async fn request_from_peers_with_quote(
1317        &self,
1318        hash: &Hash,
1319        payment_sat: u64,
1320        quote_ttl: Duration,
1321    ) -> Option<Vec<u8>> {
1322        let ordered_peer_ids = self.ordered_connected_peers(None).await;
1323        if ordered_peer_ids.is_empty() {
1324            return None;
1325        }
1326
1327        if let Some(quote) = self
1328            .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
1329            .await
1330        {
1331            if let Some(data) = self
1332                .request_from_single_peer(hash, &quote.peer_id, MAX_HTL, Some(quote.quote_id))
1333                .await
1334            {
1335                return Some(data);
1336            }
1337        }
1338
1339        self.request_from_mesh(hash).await
1340    }
1341
1342    async fn request_quote_from_peers(
1343        &self,
1344        hash: &Hash,
1345        payment_sat: u64,
1346        quote_ttl: Duration,
1347        ordered_peer_ids: &[String],
1348    ) -> Option<NegotiatedQuote> {
1349        if ordered_peer_ids.is_empty() {
1350            return None;
1351        }
1352        let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
1353        if ttl_ms == 0 {
1354            return None;
1355        }
1356        let requested_mint = self.requested_quote_mint().map(str::to_string);
1357
1358        let hash_key = hash_to_key(hash);
1359        let (tx, rx) = oneshot::channel();
1360        self.pending_quotes.write().await.insert(
1361            hash_key.clone(),
1362            PendingQuoteRequest {
1363                response_tx: tx,
1364                preferred_mint_url: requested_mint.clone(),
1365                offered_payment_sat: payment_sat,
1366            },
1367        );
1368
1369        let rx = Arc::new(Mutex::new(rx));
1370        let result = run_hedged_waves(
1371            ordered_peer_ids.len(),
1372            self.routing.dispatch,
1373            self.request_timeout,
1374            |range| {
1375                let wave_peer_ids = ordered_peer_ids[range].to_vec();
1376                let requested_mint = requested_mint.clone();
1377                let hash = *hash;
1378                async move {
1379                    let mut sent = 0usize;
1380                    for peer_id in wave_peer_ids {
1381                        if self
1382                            .send_quote_request_to_peer(
1383                                &peer_id,
1384                                &hash,
1385                                payment_sat,
1386                                ttl_ms,
1387                                requested_mint.as_deref(),
1388                            )
1389                            .await
1390                        {
1391                            sent += 1;
1392                        }
1393                    }
1394                    sent
1395                }
1396            },
1397            |wait| {
1398                let rx = rx.clone();
1399                async move {
1400                    let mut rx = rx.lock().await;
1401                    match tokio::time::timeout(wait, &mut *rx).await {
1402                        Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
1403                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1404                        Err(_) => HedgedWaveAction::Continue,
1405                    }
1406                }
1407            },
1408        )
1409        .await;
1410        let _ = self.pending_quotes.write().await.remove(&hash_key);
1411        result
1412    }
1413
1414    async fn request_from_single_peer(
1415        &self,
1416        hash: &Hash,
1417        peer_id: &str,
1418        request_htl: u8,
1419        quote_id: Option<u64>,
1420    ) -> Option<Vec<u8>> {
1421        let hash_key = hash_to_key(hash);
1422        let (tx, rx) = oneshot::channel();
1423        self.pending_requests.write().await.insert(
1424            hash_key.clone(),
1425            PendingRequest {
1426                response_tx: tx,
1427                started_at: Instant::now(),
1428                queried_peers: vec![peer_id.to_string()],
1429            },
1430        );
1431
1432        let mut rx = rx;
1433        if !self
1434            .send_request_to_peer(peer_id, hash, request_htl, quote_id)
1435            .await
1436        {
1437            let _ = self.pending_requests.write().await.remove(&hash_key);
1438            return None;
1439        }
1440        self.reserve_peer_request(peer_id).await;
1441
1442        if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
1443            if hashtree_core::sha256(&data) == *hash {
1444                let _ = self.local_store.put(*hash, data.clone()).await;
1445                return Some(data);
1446            }
1447        }
1448
1449        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1450            self.release_queried_peer_requests(&pending.queried_peers)
1451                .await;
1452            for peer_id in pending.queried_peers {
1453                self.peer_selector.write().await.record_timeout(&peer_id);
1454            }
1455        }
1456        let _ = self.take_forward_requesters(&hash_key).await;
1457        None
1458    }
1459
1460    async fn request_from_ordered_peers(
1461        &self,
1462        hash: &Hash,
1463        ordered_peer_ids: &[String],
1464        request_htl: u8,
1465    ) -> Option<Vec<u8>> {
1466        let hash_key = hash_to_key(hash);
1467        let (tx, rx) = oneshot::channel();
1468        self.pending_requests.write().await.insert(
1469            hash_key.clone(),
1470            PendingRequest {
1471                response_tx: tx,
1472                started_at: Instant::now(),
1473                queried_peers: Vec::new(),
1474            },
1475        );
1476
1477        let rx = Arc::new(Mutex::new(rx));
1478        let result = run_hedged_waves(
1479            ordered_peer_ids.len(),
1480            self.routing.dispatch,
1481            self.request_timeout,
1482            |range| {
1483                let wave_peer_ids = ordered_peer_ids[range].to_vec();
1484                let hash = *hash;
1485                let hash_key = hash_key.clone();
1486                async move {
1487                    let mut sent = 0usize;
1488                    for peer_id in wave_peer_ids {
1489                        if self
1490                            .send_request_to_peer(&peer_id, &hash, request_htl, None)
1491                            .await
1492                        {
1493                            sent += 1;
1494                            self.reserve_peer_request(&peer_id).await;
1495                            if let Some(pending) =
1496                                self.pending_requests.write().await.get_mut(&hash_key)
1497                            {
1498                                pending.queried_peers.push(peer_id);
1499                            }
1500                        }
1501                    }
1502                    sent
1503                }
1504            },
1505            |wait| {
1506                let rx = rx.clone();
1507                async move {
1508                    let mut rx = rx.lock().await;
1509                    match tokio::time::timeout(wait, &mut *rx).await {
1510                        Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == *hash => {
1511                            HedgedWaveAction::Success(data)
1512                        }
1513                        Ok(Ok(Some(_))) => HedgedWaveAction::Continue,
1514                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1515                        Err(_) => HedgedWaveAction::Continue,
1516                    }
1517                }
1518            },
1519        )
1520        .await;
1521
1522        let Some(data) = result else {
1523            if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1524                self.release_queried_peer_requests(&pending.queried_peers)
1525                    .await;
1526                for peer_id in pending.queried_peers {
1527                    self.peer_selector.write().await.record_timeout(&peer_id);
1528                }
1529            }
1530            let _ = self.take_forward_requesters(&hash_key).await;
1531            return None;
1532        };
1533
1534        let _ = self.local_store.put(*hash, data.clone()).await;
1535        Some(data)
1536    }
1537
1538    async fn request_from_read_sources_inner(&self, hash: &Hash) -> Option<SourceFetchResult> {
1539        let ordered_sources = self.ordered_read_sources().await;
1540        if ordered_sources.is_empty() {
1541            return None;
1542        }
1543
1544        let dispatch = normalize_dispatch_config(
1545            self.source_dispatch_for(ordered_sources.len()).await,
1546            ordered_sources.len(),
1547        );
1548        let wave_plan = build_hedged_wave_plan(ordered_sources.len(), dispatch);
1549        if wave_plan.is_empty() {
1550            return None;
1551        }
1552
1553        let deadline = Instant::now() + self.request_timeout;
1554        let mut pending = FuturesUnordered::new();
1555        let mut pending_source_ids = HashSet::new();
1556        let mut next_source_idx = 0usize;
1557
1558        for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
1559            let from = next_source_idx;
1560            let to = (next_source_idx + wave_size).min(ordered_sources.len());
1561            next_source_idx = to;
1562
1563            for source in ordered_sources[from..to].iter().cloned() {
1564                let source_id = source.id().to_string();
1565                self.record_read_source_request(&source_id).await;
1566                pending_source_ids.insert(source_id.clone());
1567                let hash = *hash;
1568                pending.push(tokio::spawn(async move {
1569                    let started_at = Instant::now();
1570                    let result = std::panic::AssertUnwindSafe(source.get(&hash))
1571                        .catch_unwind()
1572                        .await;
1573                    match result {
1574                        Ok(Some(data)) => SourceFetchOutcome::Hit {
1575                            source_id,
1576                            data,
1577                            elapsed_ms: started_at.elapsed().as_millis().max(1) as u64,
1578                        },
1579                        Ok(None) => SourceFetchOutcome::Miss { source_id },
1580                        Err(_) => SourceFetchOutcome::Failure { source_id },
1581                    }
1582                }));
1583            }
1584
1585            let is_last_wave =
1586                wave_idx + 1 == wave_plan.len() || next_source_idx >= ordered_sources.len();
1587            let window_end = if is_last_wave {
1588                deadline
1589            } else {
1590                (Instant::now() + Duration::from_millis(dispatch.hedge_interval_ms)).min(deadline)
1591            };
1592
1593            while Instant::now() < window_end {
1594                let remaining = window_end.saturating_duration_since(Instant::now());
1595                let Some(result) = tokio::time::timeout(remaining, pending.next())
1596                    .await
1597                    .ok()
1598                    .flatten()
1599                else {
1600                    break;
1601                };
1602                let Ok(outcome) = result else {
1603                    continue;
1604                };
1605                match outcome {
1606                    SourceFetchOutcome::Hit {
1607                        source_id,
1608                        data,
1609                        elapsed_ms,
1610                    } => {
1611                        pending_source_ids.remove(&source_id);
1612                        self.record_read_source_success(&source_id, elapsed_ms)
1613                            .await;
1614                        return Some(SourceFetchResult { data });
1615                    }
1616                    SourceFetchOutcome::Miss { source_id } => {
1617                        pending_source_ids.remove(&source_id);
1618                        self.record_read_source_miss(&source_id).await;
1619                    }
1620                    SourceFetchOutcome::Failure { source_id } => {
1621                        pending_source_ids.remove(&source_id);
1622                        self.record_read_source_failure(&source_id).await;
1623                    }
1624                }
1625            }
1626
1627            if Instant::now() >= deadline {
1628                break;
1629            }
1630        }
1631
1632        for source_id in pending_source_ids {
1633            self.record_read_source_timeout(&source_id).await;
1634        }
1635        None
1636    }
1637
1638    async fn request_from_read_sources(&self, hash: &Hash) -> Option<Vec<u8>> {
1639        let hash_key = hash_to_key(hash);
1640        let existing_wait = {
1641            let mut inflight = self.inflight_source_fetches.lock().await;
1642            if let Some(existing) = inflight.get_mut(&hash_key) {
1643                let (tx, rx) = oneshot::channel();
1644                existing.waiters.push(tx);
1645                Some(rx)
1646            } else {
1647                inflight.insert(
1648                    hash_key.clone(),
1649                    InflightSourceFetch {
1650                        waiters: Vec::new(),
1651                    },
1652                );
1653                None
1654            }
1655        };
1656
1657        if let Some(wait) = existing_wait {
1658            return wait.await.ok().flatten().map(|result| result.data);
1659        }
1660
1661        let result = self.request_from_read_sources_inner(hash).await;
1662        if let Some(hit) = result.as_ref() {
1663            let _ = self.local_store.put(*hash, hit.data.clone()).await;
1664        }
1665
1666        let waiters = self
1667            .inflight_source_fetches
1668            .lock()
1669            .await
1670            .remove(&hash_key)
1671            .map(|inflight| inflight.waiters)
1672            .unwrap_or_default();
1673        for waiter in waiters {
1674            let _ = waiter.send(result.clone());
1675        }
1676
1677        result.map(|hit| hit.data)
1678    }
1679
1680    async fn available_read_routes(&self, context: &MeshReadContext) -> Vec<ReadRoute> {
1681        let mut routes = Vec::new();
1682        let ordered_peers = self
1683            .ordered_connected_peers(context.exclude_peer_id.as_deref())
1684            .await;
1685        if !ordered_peers.is_empty() {
1686            routes.push(ReadRoute::Peers(ordered_peers));
1687        }
1688        if !self.ordered_read_sources().await.is_empty() {
1689            routes.push(ReadRoute::Sources);
1690        }
1691        if routes.len() <= 1 {
1692            return routes;
1693        }
1694
1695        let now = Instant::now();
1696        let stats = self.read_route_stats.read().await;
1697        routes.sort_by(|left, right| {
1698            let left_stats = stats.get(left.id()).cloned().unwrap_or_default();
1699            let right_stats = stats.get(right.id()).cloned().unwrap_or_default();
1700            adaptive_source_score(&right_stats, now)
1701                .partial_cmp(&adaptive_source_score(&left_stats, now))
1702                .unwrap_or(std::cmp::Ordering::Equal)
1703                .then_with(|| left.id().cmp(right.id()))
1704        });
1705        routes
1706    }
1707
1708    async fn should_probe_multiple_routes(&self, routes: &[ReadRoute]) -> bool {
1709        if routes.len() <= 1 {
1710            return false;
1711        }
1712        let stats = self.read_route_stats.read().await;
1713        let best = stats.get(routes[0].id()).cloned().unwrap_or_default();
1714        let second = stats.get(routes[1].id()).cloned().unwrap_or_default();
1715        if !source_has_history(&best) || !source_has_history(&second) {
1716            return true;
1717        }
1718        let now = Instant::now();
1719        adaptive_source_score(&best, now) - adaptive_source_score(&second, now)
1720            < SOURCE_SCORE_TIE_DELTA
1721    }
1722
1723    async fn run_read_route(
1724        &self,
1725        hash: &Hash,
1726        route: &ReadRoute,
1727        context: &MeshReadContext,
1728    ) -> Option<Vec<u8>> {
1729        let route_id = route.id();
1730        self.record_route_request(route_id).await;
1731        let started_at = Instant::now();
1732        let result = match route {
1733            ReadRoute::Peers(peer_ids) => {
1734                self.request_from_ordered_peers(hash, peer_ids, context.request_htl)
1735                    .await
1736            }
1737            ReadRoute::Sources => self.request_from_read_sources(hash).await,
1738        };
1739        if result.is_some() {
1740            self.record_route_success(route_id, started_at.elapsed().as_millis().max(1) as u64)
1741                .await;
1742        } else {
1743            self.record_route_miss(route_id).await;
1744        }
1745        result
1746    }
1747
1748    async fn request_from_mesh_with_context(
1749        &self,
1750        hash: &Hash,
1751        context: &MeshReadContext,
1752    ) -> Option<Vec<u8>> {
1753        let routes = self.available_read_routes(context).await;
1754        match routes.as_slice() {
1755            [] => None,
1756            [route] => self.run_read_route(hash, route, context).await,
1757            [first, second, ..] => {
1758                if self.should_probe_multiple_routes(&routes).await {
1759                    let first_fut = self.run_read_route(hash, first, context);
1760                    let second_fut = self.run_read_route(hash, second, context);
1761                    tokio::pin!(first_fut);
1762                    tokio::pin!(second_fut);
1763                    let mut first_done = false;
1764                    let mut second_done = false;
1765                    loop {
1766                        tokio::select! {
1767                            result = &mut first_fut, if !first_done => {
1768                                first_done = true;
1769                                if result.is_some() {
1770                                    return result;
1771                                }
1772                            }
1773                            result = &mut second_fut, if !second_done => {
1774                                second_done = true;
1775                                if result.is_some() {
1776                                    return result;
1777                                }
1778                            }
1779                            else => break,
1780                        }
1781                        if first_done && second_done {
1782                            break;
1783                        }
1784                    }
1785                    None
1786                } else {
1787                    if let Some(data) = self.run_read_route(hash, first, context).await {
1788                        return Some(data);
1789                    }
1790                    self.run_read_route(hash, second, context).await
1791                }
1792            }
1793        }
1794    }
1795
1796    async fn request_from_mesh(&self, hash: &Hash) -> Option<Vec<u8>> {
1797        self.request_from_mesh_with_context(hash, &MeshReadContext::default())
1798            .await
1799    }
1800
1801    async fn begin_forward_request(&self, hash_key: &str, requester_id: &str) -> bool {
1802        let mut pending = self.pending_forward_requests.write().await;
1803        if let Some(existing) = pending.get_mut(hash_key) {
1804            existing.requester_ids.insert(requester_id.to_string());
1805            return false;
1806        }
1807
1808        let mut requester_ids = HashSet::new();
1809        requester_ids.insert(requester_id.to_string());
1810        pending.insert(
1811            hash_key.to_string(),
1812            PendingForwardRequest { requester_ids },
1813        );
1814        true
1815    }
1816
1817    async fn take_forward_requesters(&self, hash_key: &str) -> Vec<String> {
1818        self.pending_forward_requests
1819            .write()
1820            .await
1821            .remove(hash_key)
1822            .map(|pending| pending.requester_ids.into_iter().collect())
1823            .unwrap_or_default()
1824    }
1825
1826    async fn complete_pending_response(
1827        &self,
1828        from_peer: &str,
1829        hash: &Hash,
1830        hash_key: String,
1831        payload: Vec<u8>,
1832    ) {
1833        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1834            self.release_queried_peer_requests(&pending.queried_peers)
1835                .await;
1836            let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
1837            self.peer_selector.write().await.record_success(
1838                from_peer,
1839                rtt_ms,
1840                payload.len() as u64,
1841            );
1842            let forward_requesters = self.take_forward_requesters(&hash_key).await;
1843            let response_bytes = if forward_requesters.is_empty() {
1844                None
1845            } else {
1846                Some(encode_response(&create_response(hash, payload.clone())))
1847            };
1848            let _ = pending.response_tx.send(Some(payload));
1849            if let Some(response_bytes) = response_bytes {
1850                for requester_id in forward_requesters {
1851                    if let Some(channel) = self.signaling.get_channel(&requester_id).await {
1852                        let _ = channel.send(response_bytes.clone()).await;
1853                    }
1854                }
1855            }
1856        }
1857    }
1858
1859    async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
1860        if !res.a {
1861            return;
1862        }
1863
1864        let Some(quote_id) = res.q else {
1865            return;
1866        };
1867
1868        let hash_key = hash_to_key(&res.h);
1869        let (preferred_mint_url, offered_payment_sat) = {
1870            let pending_quotes = self.pending_quotes.read().await;
1871            let Some(pending) = pending_quotes.get(&hash_key) else {
1872                return;
1873            };
1874            (
1875                pending.preferred_mint_url.clone(),
1876                pending.offered_payment_sat,
1877            )
1878        };
1879        if !self
1880            .should_accept_quote_response(
1881                from_peer,
1882                preferred_mint_url.as_deref(),
1883                offered_payment_sat,
1884                &res,
1885            )
1886            .await
1887        {
1888            return;
1889        }
1890        let mut pending_quotes = self.pending_quotes.write().await;
1891        if let Some(pending) = pending_quotes.remove(&hash_key) {
1892            let _ = pending.response_tx.send(Some(NegotiatedQuote {
1893                peer_id: from_peer.to_string(),
1894                quote_id,
1895                mint_url: res.m,
1896            }));
1897        }
1898    }
1899
1900    async fn handle_response_message(&self, from_peer: &str, res: crate::protocol::DataResponse) {
1901        let hash_key = hash_to_key(&res.h);
1902        let hash = match crate::protocol::bytes_to_hash(&res.h) {
1903            Some(h) => h,
1904            None => return,
1905        };
1906
1907        // Ignore malformed/corrupt payload and keep waiting for a valid response.
1908        if hashtree_core::sha256(&res.d) != hash {
1909            self.peer_selector.write().await.record_failure(from_peer);
1910            if self.debug {
1911                println!("[MeshStoreCore] Ignoring invalid response payload for {hash_key}");
1912            }
1913            return;
1914        }
1915
1916        self.complete_pending_response(from_peer, &hash, hash_key, res.d)
1917            .await;
1918    }
1919
1920    async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
1921        let hash = match crate::protocol::bytes_to_hash(&req.h) {
1922            Some(h) => h,
1923            None => return,
1924        };
1925        let hash_key = hash_to_key(&hash);
1926
1927        {
1928            let selector = self.peer_selector.read().await;
1929            if self.should_refuse_requests_from_peer(&selector, from_peer) {
1930                if self.debug {
1931                    println!(
1932                        "[MeshStoreCore] Refusing quote request from delinquent peer {}",
1933                        from_peer
1934                    );
1935                }
1936                return;
1937            }
1938        }
1939
1940        let chosen_mint = self.choose_quote_mint(req.m.as_deref());
1941        let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
1942            && !self.should_drop_response(&hash)
1943            && !self.should_corrupt_response(&hash);
1944
1945        let res = if can_serve {
1946            let quote_id = self
1947                .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
1948                .await;
1949            create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
1950        } else {
1951            create_quote_response_unavailable(&hash)
1952        };
1953        let response_bytes = encode_quote_response(&res);
1954        if let Some(channel) = self.signaling.get_channel(from_peer).await {
1955            let _ = channel.send(response_bytes).await;
1956        }
1957    }
1958
1959    async fn handle_request_message(
1960        self: &Arc<Self>,
1961        from_peer: &str,
1962        req: crate::protocol::DataRequest,
1963    ) {
1964        let hash = match crate::protocol::bytes_to_hash(&req.h) {
1965            Some(h) => h,
1966            None => return,
1967        };
1968        let hash_key = hash_to_key(&hash);
1969
1970        {
1971            let selector = self.peer_selector.read().await;
1972            if self.should_refuse_requests_from_peer(&selector, from_peer) {
1973                if self.debug {
1974                    println!(
1975                        "[MeshStoreCore] Refusing request from delinquent peer {}",
1976                        from_peer
1977                    );
1978                }
1979                return;
1980            }
1981        }
1982
1983        if let Some(quote_id) = req.q {
1984            if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
1985                if self.debug {
1986                    println!(
1987                        "[MeshStoreCore] Refusing request with invalid or expired quote {} from {}",
1988                        quote_id, from_peer
1989                    );
1990                }
1991                return;
1992            }
1993        }
1994
1995        // Check local store
1996        if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
1997            if self.should_drop_response(&hash) {
1998                if self.debug {
1999                    println!(
2000                        "[MeshStoreCore] Dropping response for {} due to actor profile",
2001                        hash_to_key(&hash)
2002                    );
2003                }
2004                return;
2005            }
2006
2007            let behavior = self.response_behavior();
2008            if behavior.extra_delay_ms > 0 {
2009                tokio::time::sleep(Duration::from_millis(behavior.extra_delay_ms)).await;
2010            }
2011
2012            if self.should_corrupt_response(&hash) {
2013                if data.is_empty() {
2014                    data.push(0x80);
2015                } else {
2016                    data[0] ^= 0x80;
2017                }
2018            }
2019
2020            // Send response
2021            let res = create_response(&hash, data);
2022            let response_bytes = encode_response(&res);
2023            if let Some(channel) = self.signaling.get_channel(from_peer).await {
2024                let _ = channel.send(response_bytes).await;
2025            }
2026            return;
2027        }
2028
2029        if self.pending_requests.read().await.contains_key(&hash_key) {
2030            let _ = self.begin_forward_request(&hash_key, from_peer).await;
2031            return;
2032        }
2033
2034        if !self.begin_forward_request(&hash_key, from_peer).await {
2035            return;
2036        }
2037
2038        let from_peer = from_peer.to_string();
2039        let this = Arc::clone(self);
2040        tokio::spawn(async move {
2041            let context = MeshReadContext {
2042                exclude_peer_id: Some(from_peer.clone()),
2043                request_htl: req.htl,
2044            };
2045            let result = this.request_from_mesh_with_context(&hash, &context).await;
2046            let requester_ids = this.take_forward_requesters(&hash_key).await;
2047            if let Some(data) = result {
2048                let res = create_response(&hash, data);
2049                let response_bytes = encode_response(&res);
2050                for requester_id in requester_ids {
2051                    if let Some(channel) = this.signaling.get_channel(&requester_id).await {
2052                        let _ = channel.send(response_bytes.clone()).await;
2053                    }
2054                }
2055            }
2056        });
2057    }
2058
2059    /// Handle incoming data message
2060    pub async fn handle_data_message(self: &Arc<Self>, from_peer: &str, data: &[u8]) {
2061        let parsed = match parse_message(data) {
2062            Some(m) => m,
2063            None => return,
2064        };
2065
2066        match parsed {
2067            DataMessage::Request(req) => {
2068                self.handle_request_message(from_peer, req).await;
2069            }
2070            DataMessage::Response(res) => {
2071                self.handle_response_message(from_peer, res).await;
2072            }
2073            DataMessage::QuoteRequest(req) => {
2074                self.handle_quote_request_message(from_peer, req).await;
2075            }
2076            DataMessage::QuoteResponse(res) => {
2077                self.handle_quote_response_message(from_peer, res).await;
2078            }
2079            DataMessage::Payment(_) | DataMessage::PaymentAck(_) | DataMessage::Chunk(_) => {}
2080        }
2081    }
2082}
2083
2084#[async_trait]
2085impl<S, R, F> Store for MeshStoreCore<S, R, F>
2086where
2087    S: Store + Send + Sync + 'static,
2088    R: SignalingTransport + Send + Sync + 'static,
2089    F: PeerLinkFactory + Send + Sync + 'static,
2090{
2091    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2092        self.local_store.put(hash, data).await
2093    }
2094
2095    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2096        // Try local first
2097        if let Some(data) = self.local_store.get(hash).await? {
2098            return Ok(Some(data));
2099        }
2100
2101        // Try peers
2102        Ok(self.request_from_mesh(hash).await)
2103    }
2104
2105    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2106        self.local_store.has(hash).await
2107    }
2108
2109    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2110        self.local_store.delete(hash).await
2111    }
2112}
2113
2114#[cfg(test)]
2115mod tests;
2116
2117/// Type alias for simulation store.
2118pub type SimMeshStore<S> =
2119    MeshStoreCore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
2120
2121/// Type alias for the default production core (Nostr signaling + WebRTC links).
2122pub type ProductionMeshStore<S> =
2123    MeshStoreCore<S, crate::nostr::NostrRelayTransport, crate::real_factory::WebRtcPeerLinkFactory>;