Skip to main content

hashtree_network/
mesh_store_core.rs

1//! Shared routed mesh store core.
2//!
3//! This module provides a concrete store wrapper that works with any local storage
4//! backend plus any signaling transport and peer-link factory. Both production
5//! (Nostr websockets + WebRTC) and simulation (mocks) use this same code.
6
7use async_trait::async_trait;
8use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
9use std::collections::hash_map::DefaultHasher;
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::hash::{Hash as _, Hasher};
13use std::ops::Range;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::{oneshot, Mutex, RwLock};
18
19use hashtree_core::{Hash, Store, StoreError};
20
21use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
22use crate::protocol::{
23    create_quote_request, create_quote_response_available, create_quote_response_unavailable,
24    create_request, create_request_with_quote, create_response, encode_quote_request,
25    encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
26    DataMessage, DataQuoteRequest, DataQuoteResponse,
27};
28use crate::signaling::MeshRouter;
29use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
30use crate::types::{PeerHTLConfig, SignalingMessage, MAX_HTL};
31
32// Keep the on-disk namespace stable across the crate rename so existing peer
33// metadata does not disappear for users upgrading from the old package name.
34const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
35
36/// Pending request awaiting response
37struct PendingRequest {
38    response_tx: oneshot::Sender<Option<Vec<u8>>>,
39    started_at: Instant,
40    queried_peers: Vec<String>,
41}
42
43struct PendingQuoteRequest {
44    response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
45    preferred_mint_url: Option<String>,
46    offered_payment_sat: u64,
47}
48
49struct PendingForwardRequest {
50    requester_ids: HashSet<String>,
51}
52
53#[derive(Debug, Clone, Default)]
54struct PeerWireStats {
55    bytes_sent: u64,
56    bytes_received: u64,
57    bandwidth_debt: f64,
58}
59
60struct PendingResponseSend {
61    job_id: u64,
62    peer_id: String,
63    bytes: Vec<u8>,
64    ready_at: Instant,
65    queue_sequence: u64,
66}
67
68#[async_trait]
69pub trait MeshReadSource: Send + Sync {
70    fn id(&self) -> &str;
71
72    fn is_available(&self) -> bool {
73        true
74    }
75
76    async fn get(&self, hash: &Hash) -> Option<Vec<u8>>;
77}
78
79#[derive(Debug, Clone)]
80struct NegotiatedQuote {
81    peer_id: String,
82    quote_id: u64,
83    #[allow(dead_code)]
84    mint_url: Option<String>,
85}
86
87struct IssuedQuote {
88    expires_at: Instant,
89    #[allow(dead_code)]
90    payment_sat: u64,
91    #[allow(dead_code)]
92    mint_url: Option<String>,
93}
94
95#[derive(Debug, Clone, Default)]
96struct AdaptiveSourceStats {
97    requests: u64,
98    successes: u64,
99    misses: u64,
100    failures: u64,
101    timeouts: u64,
102    srtt_ms: f64,
103    rttvar_ms: f64,
104    backoff_level: u32,
105    backed_off_until: Option<Instant>,
106    last_success_at: Option<Instant>,
107    last_failure_at: Option<Instant>,
108}
109
110#[derive(Debug, Clone)]
111enum RouteFetchOutcome {
112    Hit(Vec<u8>),
113    Miss,
114    Timeout,
115}
116
117struct InflightSourceFetch {
118    waiters: Vec<oneshot::Sender<RouteFetchOutcome>>,
119}
120
121enum SourceFetchOutcome {
122    Hit {
123        source_id: String,
124        data: Vec<u8>,
125        elapsed_ms: u64,
126    },
127    Miss {
128        source_id: String,
129    },
130    Failure {
131        source_id: String,
132    },
133}
134
135const INITIAL_SOURCE_BACKOFF_MS: u64 = 250;
136const MAX_SOURCE_BACKOFF_MS: u64 = 10_000;
137const SOURCE_SCORE_TIE_DELTA: f64 = 0.15;
138const RECENT_SOURCE_SUCCESS_WINDOW: Duration = Duration::from_secs(60);
139const ACTIVE_PEER_REQUEST_RANK_PENALTY: usize = 3;
140
141fn source_reliability_score(stats: &AdaptiveSourceStats) -> f64 {
142    (stats.successes as f64 + 1.0) / (stats.requests as f64 + 2.0)
143}
144
145fn source_latency_score(stats: &AdaptiveSourceStats) -> f64 {
146    if stats.srtt_ms <= 0.0 {
147        return 0.5;
148    }
149    (500.0 / (stats.srtt_ms + 50.0)).min(1.0)
150}
151
152fn source_has_history(stats: &AdaptiveSourceStats) -> bool {
153    stats.requests > 0
154        || stats.successes > 0
155        || stats.misses > 0
156        || stats.failures > 0
157        || stats.timeouts > 0
158}
159
160fn adaptive_source_score(stats: &AdaptiveSourceStats, now: Instant) -> f64 {
161    if let Some(backed_off_until) = stats.backed_off_until {
162        if backed_off_until > now {
163            return f64::NEG_INFINITY;
164        }
165    }
166
167    let miss_penalty = if stats.requests > 0 {
168        (stats.misses as f64 / stats.requests as f64) * 0.15
169    } else {
170        0.0
171    };
172    let failure_penalty = if stats.requests > 0 {
173        ((stats.failures + stats.timeouts) as f64 / stats.requests as f64) * 0.3
174    } else {
175        0.0
176    };
177    let recency_bonus = if stats
178        .last_success_at
179        .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
180    {
181        0.1
182    } else {
183        0.0
184    };
185
186    0.6 * source_reliability_score(stats) + 0.3 * source_latency_score(stats) + recency_bonus
187        - miss_penalty
188        - failure_penalty
189}
190
191#[derive(Clone)]
192enum ReadRoute {
193    Peers(Vec<String>),
194    Sources,
195}
196
197impl ReadRoute {
198    fn id(&self) -> &'static str {
199        match self {
200            Self::Peers(_) => "peers",
201            Self::Sources => "sources",
202        }
203    }
204}
205
206#[derive(Debug, Clone)]
207struct MeshReadContext {
208    exclude_peer_id: Option<String>,
209    request_htl: u8,
210}
211
212impl Default for MeshReadContext {
213    fn default() -> Self {
214        Self {
215            exclude_peer_id: None,
216            request_htl: MAX_HTL,
217        }
218    }
219}
220
221/// Aggregate stats from draining currently available peer-link messages.
222#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
223pub struct DataPumpStats {
224    pub processed: usize,
225    pub request_messages: usize,
226    pub response_messages: usize,
227    pub quote_request_messages: u64,
228    pub quote_response_messages: u64,
229    pub processed_bytes: u64,
230}
231
232/// Request dispatch strategy for peer queries.
233///
234/// `MeshStoreCore` supports two practical retrieval modes:
235/// - Flood (`usize::MAX` fanout): maximize success/latency at bandwidth cost.
236/// - Staged hedging: probe a subset first, then expand.
237#[derive(Debug, Clone, Copy)]
238pub struct RequestDispatchConfig {
239    /// Number of peers queried immediately.
240    pub initial_fanout: usize,
241    /// Number of additional peers to query on each hedge step.
242    pub hedge_fanout: usize,
243    /// Total peers allowed for this request.
244    pub max_fanout: usize,
245    /// Delay between hedge waves (ms). `0` means send all waves immediately.
246    pub hedge_interval_ms: u64,
247}
248
249impl Default for RequestDispatchConfig {
250    fn default() -> Self {
251        Self {
252            initial_fanout: usize::MAX,
253            hedge_fanout: usize::MAX,
254            max_fanout: usize::MAX,
255            hedge_interval_ms: 0,
256        }
257    }
258}
259
260/// Normalize fanout config against current peer availability.
261pub fn normalize_dispatch_config(
262    dispatch: RequestDispatchConfig,
263    available_peers: usize,
264) -> RequestDispatchConfig {
265    let mut cfg = dispatch;
266    let cap = if cfg.max_fanout == 0 {
267        available_peers
268    } else {
269        cfg.max_fanout.min(available_peers)
270    };
271    cfg.max_fanout = cap;
272    cfg.initial_fanout = if cfg.initial_fanout == 0 {
273        1
274    } else {
275        cfg.initial_fanout.min(cap.max(1))
276    };
277    cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
278        1
279    } else {
280        cfg.hedge_fanout.min(cap.max(1))
281    };
282    cfg
283}
284
285/// Build wave sizes for staged hedged dispatch.
286pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
287    if peer_count == 0 {
288        return Vec::new();
289    }
290    let cap = dispatch.max_fanout.min(peer_count);
291    if cap == 0 {
292        return Vec::new();
293    }
294
295    let mut plan = Vec::new();
296    let mut sent = 0usize;
297    let first = dispatch.initial_fanout.min(cap).max(1);
298    plan.push(first);
299    sent += first;
300
301    while sent < cap {
302        let next = dispatch.hedge_fanout.min(cap - sent).max(1);
303        plan.push(next);
304        sent += next;
305    }
306    plan
307}
308
309/// Outcome returned after waiting on a hedged dispatch wave.
310#[derive(Debug)]
311pub enum HedgedWaveAction<T> {
312    Continue,
313    Success(T),
314    Abort,
315}
316
317/// Run a staged hedged dispatch over peer index ranges.
318///
319/// This scheduler is shared by the reusable `MeshStoreCore` and the native
320/// `hashtree-cli` mesh path so tests and production use the same wave timing.
321pub async fn run_hedged_waves<T, SendWave, SendWaveFut, WaitWave, WaitWaveFut>(
322    peer_count: usize,
323    dispatch: RequestDispatchConfig,
324    request_timeout: Duration,
325    mut send_wave: SendWave,
326    mut wait_wave: WaitWave,
327) -> Option<T>
328where
329    SendWave: FnMut(Range<usize>) -> SendWaveFut,
330    SendWaveFut: Future<Output = usize>,
331    WaitWave: FnMut(Duration) -> WaitWaveFut,
332    WaitWaveFut: Future<Output = HedgedWaveAction<T>>,
333{
334    let dispatch = normalize_dispatch_config(dispatch, peer_count);
335    let wave_plan = build_hedged_wave_plan(peer_count, dispatch);
336    if wave_plan.is_empty() {
337        return None;
338    }
339
340    let deadline = Instant::now() + request_timeout;
341    let mut sent_total = 0usize;
342    let mut next_peer_idx = 0usize;
343
344    for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
345        let from = next_peer_idx;
346        let to = (next_peer_idx + wave_size).min(peer_count);
347        next_peer_idx = to;
348
349        if from == to {
350            continue;
351        }
352
353        sent_total += send_wave(from..to).await;
354        if sent_total == 0 {
355            if next_peer_idx >= peer_count {
356                break;
357            }
358            continue;
359        }
360
361        let now = Instant::now();
362        if now >= deadline {
363            break;
364        }
365        let remaining = deadline.saturating_duration_since(now);
366        let is_last_wave = wave_idx + 1 == wave_plan.len() || next_peer_idx >= peer_count;
367        let wait = if is_last_wave {
368            remaining
369        } else if dispatch.hedge_interval_ms == 0 {
370            Duration::ZERO
371        } else {
372            Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
373        };
374
375        if wait.is_zero() {
376            continue;
377        }
378
379        match wait_wave(wait).await {
380            HedgedWaveAction::Continue => {}
381            HedgedWaveAction::Success(value) => return Some(value),
382            HedgedWaveAction::Abort => break,
383        }
384    }
385
386    None
387}
388
389/// Keep selector membership aligned with currently connected peer IDs.
390pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
391    let mut selector = selector.write().await;
392    let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
393    let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
394    for peer_id in known {
395        if !current.contains(peer_id.as_str()) {
396            selector.remove_peer(&peer_id);
397        }
398    }
399    for peer_id in current_peer_ids {
400        selector.add_peer(peer_id.clone());
401    }
402}
403
404/// Response behavior profile for simulation/game-theory actors.
405///
406/// Defaults to honest behavior (always respond correctly, no extra delay).
407#[derive(Debug, Clone, Copy)]
408pub struct ResponseBehaviorConfig {
409    /// Probability that a node drops a response even when it has data.
410    pub drop_response_prob: f64,
411    /// Probability that a node responds with corrupted payload.
412    pub corrupt_response_prob: f64,
413    /// Baseline response delay before a peer starts sending any data.
414    pub extra_delay_ms: u64,
415    /// Additional delay before the first response byte becomes available.
416    pub first_byte_delay_ms: u64,
417    /// Sustained throughput for delivering large payloads. `0` disables size-based slowdown.
418    pub bytes_per_second: u64,
419    /// Probability that an otherwise honest response experiences an extra stall.
420    pub stall_response_prob: f64,
421    /// Extra delay injected when a stall event happens.
422    pub stall_delay_ms: u64,
423}
424
425impl Default for ResponseBehaviorConfig {
426    fn default() -> Self {
427        Self {
428            drop_response_prob: 0.0,
429            corrupt_response_prob: 0.0,
430            extra_delay_ms: 0,
431            first_byte_delay_ms: 0,
432            bytes_per_second: 0,
433            stall_response_prob: 0.0,
434            stall_delay_ms: 0,
435        }
436    }
437}
438
439impl ResponseBehaviorConfig {
440    fn normalized(self) -> Self {
441        Self {
442            drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
443            corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
444            extra_delay_ms: self.extra_delay_ms,
445            first_byte_delay_ms: self.first_byte_delay_ms,
446            bytes_per_second: self.bytes_per_second,
447            stall_response_prob: self.stall_response_prob.clamp(0.0, 1.0),
448            stall_delay_ms: self.stall_delay_ms,
449        }
450    }
451}
452
453/// Routing policy for request ordering + dispatch fanout.
454#[derive(Debug, Clone)]
455pub struct MeshRoutingConfig {
456    pub selection_strategy: SelectionStrategy,
457    pub fairness_enabled: bool,
458    /// Blend weight for payment-priority ranking in selector (`0.0` disables).
459    pub cashu_payment_weight: f64,
460    /// Refuse serving peers that have reached this many unpaid post-delivery settlements.
461    /// `0` disables refusal and only keeps metadata/downranking.
462    pub cashu_payment_default_block_threshold: u64,
463    /// Cashu mint URLs this node is willing to use for settlement.
464    pub cashu_accepted_mints: Vec<String>,
465    /// Preferred Cashu mint URL when initiating paid retrieval.
466    pub cashu_default_mint: Option<String>,
467    /// Baseline cap for accepting a peer-suggested mint outside the trusted set.
468    pub cashu_peer_suggested_mint_base_cap_sat: u64,
469    /// Additional sats allowed per successful delivery from that peer.
470    pub cashu_peer_suggested_mint_success_step_sat: u64,
471    /// Additional sats allowed per successful post-delivery payment received from that peer.
472    pub cashu_peer_suggested_mint_receipt_step_sat: u64,
473    /// Hard upper bound for any single peer-suggested mint quote we accept.
474    pub cashu_peer_suggested_mint_max_cap_sat: u64,
475    pub dispatch: RequestDispatchConfig,
476    pub response_behavior: ResponseBehaviorConfig,
477}
478
479impl Default for MeshRoutingConfig {
480    fn default() -> Self {
481        Self {
482            selection_strategy: SelectionStrategy::Weighted,
483            fairness_enabled: true,
484            cashu_payment_weight: 0.0,
485            cashu_payment_default_block_threshold: 0,
486            cashu_accepted_mints: Vec::new(),
487            cashu_default_mint: None,
488            cashu_peer_suggested_mint_base_cap_sat: 0,
489            cashu_peer_suggested_mint_success_step_sat: 0,
490            cashu_peer_suggested_mint_receipt_step_sat: 0,
491            cashu_peer_suggested_mint_max_cap_sat: 0,
492            dispatch: RequestDispatchConfig::default(),
493            response_behavior: ResponseBehaviorConfig::default(),
494        }
495    }
496}
497
498/// Routed mesh store core that works with any storage backend and transport
499/// implementation.
500///
501/// This is the shared code between production and simulation.
502/// - Production: `MeshStoreCore<LmdbStore, NostrRelayTransport, WebRtcPeerLinkFactory>`
503/// - Simulation: `MeshStoreCore<MemoryStore, MockRelayTransport, MockConnectionFactory>`
504pub struct MeshStoreCore<S, R, F>
505where
506    S: Store + Send + Sync + 'static,
507    R: SignalingTransport + Send + Sync + 'static,
508    F: PeerLinkFactory + Send + Sync + 'static,
509{
510    /// Local backing store
511    local_store: Arc<S>,
512    /// Mesh router (handles peer discovery and connection)
513    signaling: Arc<MeshRouter<R, F>>,
514    /// Per-peer HTL config
515    htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
516    /// Pending requests we sent
517    pending_requests: RwLock<HashMap<String, PendingRequest>>,
518    /// Pending quote negotiations keyed by requested hash.
519    pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
520    /// Forwarded peer requests currently being resolved through the mesh/upstream.
521    pending_forward_requests: RwLock<HashMap<String, PendingForwardRequest>>,
522    /// Quotes we issued to peers and will accept exactly once until expiry.
523    issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
524    /// Monotonic quote identifier generator.
525    next_quote_id: RwLock<u64>,
526    /// Non-peer read sources such as upstream Blossom servers.
527    read_sources: RwLock<HashMap<String, Arc<dyn MeshReadSource>>>,
528    /// Adaptive health stats for non-peer read sources.
529    read_source_stats: RwLock<HashMap<String, AdaptiveSourceStats>>,
530    /// Shared in-flight upstream reads keyed by hash.
531    inflight_source_fetches: Mutex<HashMap<String, InflightSourceFetch>>,
532    /// Adaptive route stats for choosing peers vs upstream sources.
533    read_route_stats: RwLock<HashMap<String, AdaptiveSourceStats>>,
534    /// Adaptive selector for peer ordering.
535    peer_selector: RwLock<PeerSelector>,
536    /// Active per-peer in-flight reads so concurrent block fetches spread across peers.
537    peer_active_requests: RwLock<HashMap<String, usize>>,
538    /// Actual wire traffic stats used for upload-side reciprocity scheduling.
539    peer_wire_stats: RwLock<HashMap<String, PeerWireStats>>,
540    /// Pending content responses waiting for upload arbitration.
541    pending_response_sends: Mutex<Vec<PendingResponseSend>>,
542    /// Upload response scheduler state.
543    response_scheduler_running: AtomicBool,
544    /// Monotonic id for queued response sends.
545    next_response_job_id: AtomicU64,
546    /// Routing/dispatch configuration.
547    routing: MeshRoutingConfig,
548    /// Request timeout
549    request_timeout: Duration,
550    /// Debug mode
551    debug: bool,
552    /// Running flag
553    running: RwLock<bool>,
554}
555
556impl<S, R, F> MeshStoreCore<S, R, F>
557where
558    S: Store + Send + Sync + 'static,
559    R: SignalingTransport + Send + Sync + 'static,
560    F: PeerLinkFactory + Send + Sync + 'static,
561{
562    /// Create a new routed mesh store core.
563    pub fn new(
564        local_store: Arc<S>,
565        signaling: Arc<MeshRouter<R, F>>,
566        request_timeout: Duration,
567        debug: bool,
568    ) -> Self {
569        Self::new_with_routing(
570            local_store,
571            signaling,
572            request_timeout,
573            debug,
574            Default::default(),
575        )
576    }
577
578    /// Create a new routed mesh store core with explicit routing configuration.
579    pub fn new_with_routing(
580        local_store: Arc<S>,
581        signaling: Arc<MeshRouter<R, F>>,
582        request_timeout: Duration,
583        debug: bool,
584        routing: MeshRoutingConfig,
585    ) -> Self {
586        let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
587        selector.set_fairness(routing.fairness_enabled);
588        selector.set_cashu_payment_weight(routing.cashu_payment_weight);
589        Self {
590            local_store,
591            signaling,
592            htl_configs: RwLock::new(HashMap::new()),
593            pending_requests: RwLock::new(HashMap::new()),
594            pending_quotes: RwLock::new(HashMap::new()),
595            pending_forward_requests: RwLock::new(HashMap::new()),
596            issued_quotes: RwLock::new(HashMap::new()),
597            next_quote_id: RwLock::new(1),
598            read_sources: RwLock::new(HashMap::new()),
599            read_source_stats: RwLock::new(HashMap::new()),
600            inflight_source_fetches: Mutex::new(HashMap::new()),
601            read_route_stats: RwLock::new(HashMap::new()),
602            peer_selector: RwLock::new(selector),
603            peer_active_requests: RwLock::new(HashMap::new()),
604            peer_wire_stats: RwLock::new(HashMap::new()),
605            pending_response_sends: Mutex::new(Vec::new()),
606            response_scheduler_running: AtomicBool::new(false),
607            next_response_job_id: AtomicU64::new(1),
608            routing,
609            request_timeout,
610            debug,
611            running: RwLock::new(false),
612        }
613    }
614
615    /// Start the store (begin listening for messages)
616    pub async fn start(&self) -> Result<(), TransportError> {
617        *self.running.write().await = true;
618
619        // Send initial hello
620        self.signaling.send_hello(vec![]).await?;
621
622        Ok(())
623    }
624
625    /// Stop the store
626    pub async fn stop(&self) {
627        *self.running.write().await = false;
628    }
629
630    /// Process incoming signaling message
631    pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
632        // When a new peer connects, initialize their HTL config
633        let peer_id = msg.peer_id().to_string();
634        {
635            let mut configs = self.htl_configs.write().await;
636            if !configs.contains_key(&peer_id) {
637                configs.insert(peer_id.clone(), PeerHTLConfig::random());
638            }
639        }
640        self.peer_selector.write().await.add_peer(peer_id);
641
642        self.signaling.handle_message(msg).await
643    }
644
645    /// Get signaling manager reference
646    pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
647        &self.signaling
648    }
649
650    fn response_behavior(&self) -> ResponseBehaviorConfig {
651        self.routing.response_behavior.normalized()
652    }
653
654    async fn record_peer_wire_sent(&self, peer_id: &str, bytes: u64) {
655        if bytes == 0 {
656            return;
657        }
658        let mut stats = self.peer_wire_stats.write().await;
659        let entry = stats.entry(peer_id.to_string()).or_default();
660        entry.bytes_sent = entry.bytes_sent.saturating_add(bytes);
661    }
662
663    async fn record_peer_wire_received(&self, peer_id: &str, bytes: u64) {
664        if bytes == 0 {
665            return;
666        }
667        let mut stats = self.peer_wire_stats.write().await;
668        let entry = stats.entry(peer_id.to_string()).or_default();
669        entry.bytes_received = entry.bytes_received.saturating_add(bytes);
670    }
671
672    fn peer_upload_weight(stats: &PeerWireStats) -> f64 {
673        let raw_ratio = (stats.bytes_received.saturating_add(1024) as f64)
674            / (stats.bytes_sent.saturating_add(1024) as f64);
675        let bounded_ratio = raw_ratio / (1.0 + raw_ratio);
676        0.5 + 1.5 * bounded_ratio
677    }
678
679    fn choose_ready_response_job(
680        ready_jobs: &[(u64, String, usize, Instant, u64)],
681        stats: &HashMap<String, PeerWireStats>,
682    ) -> Option<(u64, f64)> {
683        ready_jobs
684            .iter()
685            .map(|job| {
686                let peer_stats = stats.get(&job.1).cloned().unwrap_or_default();
687                let finish = peer_stats.bandwidth_debt
688                    + (job.2 as f64 / Self::peer_upload_weight(&peer_stats));
689                (job.0, job.1.as_str(), job.4, finish)
690            })
691            .min_by(|left, right| {
692                left.3
693                    .partial_cmp(&right.3)
694                    .unwrap_or(std::cmp::Ordering::Equal)
695                    .then_with(|| left.2.cmp(&right.2))
696                    .then_with(|| left.1.cmp(right.1))
697            })
698            .map(|choice| (choice.0, choice.3))
699    }
700
701    async fn enqueue_response_send(
702        self: &Arc<Self>,
703        peer_id: String,
704        bytes: Vec<u8>,
705        ready_at: Instant,
706    ) {
707        let job_id = self.next_response_job_id.fetch_add(1, Ordering::Relaxed);
708        {
709            let mut queue = self.pending_response_sends.lock().await;
710            queue.push(PendingResponseSend {
711                job_id,
712                peer_id,
713                bytes,
714                ready_at,
715                queue_sequence: job_id,
716            });
717        }
718
719        if self
720            .response_scheduler_running
721            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
722            .is_ok()
723        {
724            let this = Arc::clone(self);
725            tokio::spawn(async move {
726                this.run_response_scheduler().await;
727            });
728        }
729    }
730
731    async fn run_response_scheduler(self: Arc<Self>) {
732        loop {
733            let snapshot = {
734                let queue = self.pending_response_sends.lock().await;
735                if queue.is_empty() {
736                    self.response_scheduler_running
737                        .store(false, Ordering::Release);
738                    return;
739                }
740                queue
741                    .iter()
742                    .map(|job| {
743                        (
744                            job.job_id,
745                            job.peer_id.clone(),
746                            job.bytes.len(),
747                            job.ready_at,
748                            job.queue_sequence,
749                        )
750                    })
751                    .collect::<Vec<_>>()
752            };
753
754            let now = Instant::now();
755            let mut earliest_ready_at: Option<Instant> = None;
756            let mut ready_jobs = Vec::new();
757            for job in &snapshot {
758                if job.3 <= now {
759                    ready_jobs.push(job.clone());
760                } else {
761                    earliest_ready_at = Some(match earliest_ready_at {
762                        Some(current) => current.min(job.3),
763                        None => job.3,
764                    });
765                }
766            }
767
768            if ready_jobs.is_empty() {
769                if let Some(ready_at) = earliest_ready_at {
770                    tokio::time::sleep(ready_at.saturating_duration_since(Instant::now())).await;
771                    continue;
772                }
773                self.response_scheduler_running
774                    .store(false, Ordering::Release);
775                return;
776            }
777
778            let (selected_job_id, selected_finish) = {
779                let stats = self.peer_wire_stats.read().await;
780                Self::choose_ready_response_job(&ready_jobs, &stats).expect("ready response job")
781            };
782
783            let selected = {
784                let mut queue = self.pending_response_sends.lock().await;
785                let Some(index) = queue.iter().position(|job| job.job_id == selected_job_id) else {
786                    continue;
787                };
788                queue.swap_remove(index)
789            };
790
791            let sent = if let Some(channel) = self.signaling.get_channel(&selected.peer_id).await {
792                channel.send(selected.bytes.clone()).await.is_ok()
793            } else {
794                false
795            };
796
797            let queued_peers = {
798                let queue = self.pending_response_sends.lock().await;
799                queue
800                    .iter()
801                    .map(|job| job.peer_id.clone())
802                    .collect::<HashSet<_>>()
803            };
804            let mut stats = self.peer_wire_stats.write().await;
805            let entry = stats.entry(selected.peer_id.clone()).or_default();
806            if sent {
807                entry.bytes_sent = entry.bytes_sent.saturating_add(selected.bytes.len() as u64);
808                entry.bandwidth_debt = selected_finish;
809            }
810            if queued_peers.is_empty() {
811                for peer_stats in stats.values_mut() {
812                    peer_stats.bandwidth_debt = 0.0;
813                }
814            } else {
815                let floor = queued_peers
816                    .iter()
817                    .filter_map(|peer_id| stats.get(peer_id).map(|peer| peer.bandwidth_debt))
818                    .fold(f64::INFINITY, f64::min);
819                if floor.is_finite() && floor > 0.0 {
820                    for peer_id in queued_peers {
821                        if let Some(peer_stats) = stats.get_mut(&peer_id) {
822                            peer_stats.bandwidth_debt =
823                                (peer_stats.bandwidth_debt - floor).max(0.0);
824                        }
825                    }
826                }
827            }
828        }
829    }
830
831    fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
832        let mut hasher = DefaultHasher::new();
833        peer_id.hash(&mut hasher);
834        hash.hash(&mut hasher);
835        salt.hash(&mut hasher);
836        let v = hasher.finish();
837        (v as f64) / (u64::MAX as f64)
838    }
839
840    fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
841        Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
842    }
843
844    fn peer_metadata_pointer_slot_hash() -> Hash {
845        hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
846    }
847
848    fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
849        let bytes = hex::decode(hash_hex)
850            .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
851        if bytes.len() != 32 {
852            return Err(StoreError::Other(format!(
853                "Invalid hash length {}, expected 32 bytes",
854                bytes.len()
855            )));
856        }
857        let mut hash = [0u8; 32];
858        hash.copy_from_slice(&bytes);
859        Ok(hash)
860    }
861
862    fn should_drop_response(&self, hash: &Hash) -> bool {
863        let p = self.response_behavior().drop_response_prob;
864        if p <= 0.0 {
865            return false;
866        }
867        self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
868    }
869
870    fn should_corrupt_response(&self, hash: &Hash) -> bool {
871        let p = self.response_behavior().corrupt_response_prob;
872        if p <= 0.0 {
873            return false;
874        }
875        self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
876    }
877
878    fn should_stall_response(&self, hash: &Hash) -> bool {
879        let p = self.response_behavior().stall_response_prob;
880        if p <= 0.0 {
881            return false;
882        }
883        self.deterministic_actor_draw(hash, 0x5A_11_5A_11_5A_11_5A_11) < p
884    }
885
886    fn response_send_delay(&self, hash: &Hash, payload_len: usize) -> Duration {
887        let behavior = self.response_behavior();
888        let mut total_ms = behavior
889            .extra_delay_ms
890            .saturating_add(behavior.first_byte_delay_ms);
891
892        if behavior.bytes_per_second > 0 && payload_len > 0 {
893            let throughput_ms = ((payload_len as u128) * 1000)
894                .div_ceil(behavior.bytes_per_second as u128)
895                .min(u64::MAX as u128) as u64;
896            total_ms = total_ms.saturating_add(throughput_ms);
897        }
898
899        if behavior.stall_delay_ms > 0 && self.should_stall_response(hash) {
900            total_ms = total_ms.saturating_add(behavior.stall_delay_ms);
901        }
902
903        Duration::from_millis(total_ms)
904    }
905
906    async fn ordered_connected_peers(&self, exclude_peer_id: Option<&str>) -> Vec<String> {
907        let current_peer_ids = self.signaling.peer_ids().await;
908        if current_peer_ids.is_empty() {
909            return Vec::new();
910        }
911
912        sync_selector_peers(&self.peer_selector, &current_peer_ids).await;
913        let mut candidate_peer_ids: Vec<String> = current_peer_ids
914            .into_iter()
915            .filter(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude))
916            .collect();
917        if candidate_peer_ids.is_empty() {
918            return Vec::new();
919        }
920
921        let current_set: HashSet<&str> = candidate_peer_ids.iter().map(String::as_str).collect();
922        let mut selector = self.peer_selector.write().await;
923        let mut selector_order = selector.select_peers();
924        selector_order.retain(|peer_id| current_set.contains(peer_id.as_str()));
925        if selector_order.is_empty() {
926            let mut fallback = candidate_peer_ids;
927            fallback.sort();
928            return fallback;
929        }
930        let backed_off: HashMap<String, bool> = candidate_peer_ids
931            .iter()
932            .map(|peer_id| (peer_id.clone(), selector.is_peer_backed_off(peer_id)))
933            .collect();
934        drop(selector);
935
936        let rank: HashMap<&str, usize> = selector_order
937            .iter()
938            .enumerate()
939            .map(|(idx, peer_id)| (peer_id.as_str(), idx))
940            .collect();
941        let active = self.peer_active_requests.read().await;
942        candidate_peer_ids.sort_by(|left, right| {
943            let left_backed_off = backed_off.get(left).copied().unwrap_or(false);
944            let right_backed_off = backed_off.get(right).copied().unwrap_or(false);
945            if left_backed_off != right_backed_off {
946                return if left_backed_off {
947                    std::cmp::Ordering::Greater
948                } else {
949                    std::cmp::Ordering::Less
950                };
951            }
952            let left_rank = rank.get(left.as_str()).copied().unwrap_or(usize::MAX / 2);
953            let right_rank = rank.get(right.as_str()).copied().unwrap_or(usize::MAX / 2);
954            let left_load = active.get(left).copied().unwrap_or(0);
955            let right_load = active.get(right).copied().unwrap_or(0);
956            (left_rank + left_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY))
957                .cmp(&(right_rank + right_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY)))
958                .then_with(|| left.cmp(right))
959        });
960        candidate_peer_ids
961    }
962
963    async fn reserve_peer_request(&self, peer_id: &str) {
964        let mut active = self.peer_active_requests.write().await;
965        *active.entry(peer_id.to_string()).or_insert(0) += 1;
966    }
967
968    async fn release_peer_request(&self, peer_id: &str) {
969        let mut active = self.peer_active_requests.write().await;
970        let Some(count) = active.get_mut(peer_id) else {
971            return;
972        };
973        if *count <= 1 {
974            active.remove(peer_id);
975        } else {
976            *count -= 1;
977        }
978    }
979
980    async fn release_queried_peer_requests(&self, peer_ids: &[String]) {
981        for peer_id in peer_ids {
982            self.release_peer_request(peer_id).await;
983        }
984    }
985
986    fn requested_quote_mint(&self) -> Option<&str> {
987        if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
988            if self.routing.cashu_accepted_mints.is_empty()
989                || self
990                    .routing
991                    .cashu_accepted_mints
992                    .iter()
993                    .any(|mint| mint == default_mint)
994            {
995                return Some(default_mint);
996            }
997        }
998
999        self.routing
1000            .cashu_accepted_mints
1001            .first()
1002            .map(String::as_str)
1003    }
1004
1005    fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
1006        if let Some(requested_mint) = requested_mint {
1007            if self.accepts_quote_mint(Some(requested_mint)) {
1008                return Some(requested_mint.to_string());
1009            }
1010        }
1011        if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
1012            return Some(default_mint.clone());
1013        }
1014        if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
1015            return Some(first_mint.clone());
1016        }
1017        requested_mint.map(str::to_string)
1018    }
1019
1020    fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
1021        if self.routing.cashu_accepted_mints.is_empty() {
1022            return true;
1023        }
1024
1025        let Some(mint_url) = mint_url else {
1026            return false;
1027        };
1028        self.routing
1029            .cashu_accepted_mints
1030            .iter()
1031            .any(|mint| mint == mint_url)
1032    }
1033
1034    fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
1035        let Some(mint_url) = mint_url else {
1036            return self.routing.cashu_default_mint.is_none()
1037                && self.routing.cashu_accepted_mints.is_empty();
1038        };
1039        self.routing.cashu_default_mint.as_deref() == Some(mint_url)
1040            || self
1041                .routing
1042                .cashu_accepted_mints
1043                .iter()
1044                .any(|mint| mint == mint_url)
1045    }
1046
1047    async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
1048        let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
1049        if base == 0 {
1050            return 0;
1051        }
1052
1053        let selector = self.peer_selector.read().await;
1054        let Some(stats) = selector.get_stats(peer_id) else {
1055            let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
1056            return if max_cap > 0 { base.min(max_cap) } else { base };
1057        };
1058
1059        if stats.cashu_payment_defaults > 0
1060            && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
1061        {
1062            return 0;
1063        }
1064
1065        let success_bonus = stats
1066            .successes
1067            .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
1068        let receipt_bonus = stats
1069            .cashu_payment_receipts
1070            .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
1071        let mut cap = base
1072            .saturating_add(success_bonus)
1073            .saturating_add(receipt_bonus);
1074        let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
1075        if max_cap > 0 {
1076            cap = cap.min(max_cap);
1077        }
1078        cap
1079    }
1080
1081    async fn should_accept_quote_response(
1082        &self,
1083        from_peer: &str,
1084        preferred_mint_url: Option<&str>,
1085        offered_payment_sat: u64,
1086        res: &DataQuoteResponse,
1087    ) -> bool {
1088        let Some(payment_sat) = res.p else {
1089            return false;
1090        };
1091        if payment_sat > offered_payment_sat {
1092            return false;
1093        }
1094
1095        let response_mint = res.m.as_deref();
1096        if response_mint == preferred_mint_url {
1097            return true;
1098        }
1099        if self.trusts_quote_mint(response_mint) {
1100            return true;
1101        }
1102        if response_mint.is_none() {
1103            return false;
1104        }
1105
1106        payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
1107    }
1108
1109    async fn issue_quote(
1110        &self,
1111        peer_id: &str,
1112        hash_key: &str,
1113        payment_sat: u64,
1114        ttl_ms: u32,
1115        mint_url: Option<&str>,
1116    ) -> u64 {
1117        let quote_id = {
1118            let mut next = self.next_quote_id.write().await;
1119            let quote_id = *next;
1120            *next = next.saturating_add(1);
1121            quote_id
1122        };
1123
1124        let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
1125        self.issued_quotes.write().await.insert(
1126            (peer_id.to_string(), hash_key.to_string(), quote_id),
1127            IssuedQuote {
1128                expires_at,
1129                payment_sat,
1130                mint_url: mint_url.map(str::to_string),
1131            },
1132        );
1133        quote_id
1134    }
1135
1136    async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
1137        let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
1138        let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
1139            return false;
1140        };
1141        quote.expires_at > Instant::now()
1142    }
1143
1144    async fn send_request_to_peer(
1145        &self,
1146        peer_id: &str,
1147        hash: &Hash,
1148        request_htl: u8,
1149        quote_id: Option<u64>,
1150    ) -> bool {
1151        let channel = match self.signaling.get_channel(peer_id).await {
1152            Some(c) => c,
1153            None => return false,
1154        };
1155
1156        let htl_config = {
1157            let configs = self.htl_configs.read().await;
1158            configs
1159                .get(peer_id)
1160                .cloned()
1161                .unwrap_or_else(PeerHTLConfig::random)
1162        };
1163
1164        let send_htl = htl_config.decrement(request_htl);
1165        let req = match quote_id {
1166            Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
1167            None => create_request(hash, send_htl),
1168        };
1169        let request_bytes = encode_request(&req);
1170        let request_len = request_bytes.len() as u64;
1171
1172        {
1173            let mut selector = self.peer_selector.write().await;
1174            selector.record_request(peer_id, request_len);
1175        }
1176
1177        match channel.send(request_bytes).await {
1178            Ok(()) => {
1179                self.record_peer_wire_sent(peer_id, request_len).await;
1180                true
1181            }
1182            Err(_) => {
1183                self.peer_selector.write().await.record_failure(peer_id);
1184                false
1185            }
1186        }
1187    }
1188
1189    async fn send_quote_request_to_peer(
1190        &self,
1191        peer_id: &str,
1192        hash: &Hash,
1193        payment_sat: u64,
1194        ttl_ms: u32,
1195        mint_url: Option<&str>,
1196    ) -> bool {
1197        let channel = match self.signaling.get_channel(peer_id).await {
1198            Some(c) => c,
1199            None => return false,
1200        };
1201
1202        let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
1203        let request_bytes = encode_quote_request(&req);
1204        let request_len = request_bytes.len() as u64;
1205
1206        match channel.send(request_bytes).await {
1207            Ok(()) => {
1208                self.record_peer_wire_sent(peer_id, request_len).await;
1209                true
1210            }
1211            Err(_) => false,
1212        }
1213    }
1214
1215    pub async fn set_read_sources(&self, sources: Vec<Arc<dyn MeshReadSource>>) {
1216        let mut by_id = HashMap::new();
1217        let mut stats = self.read_source_stats.write().await;
1218        for source in sources {
1219            let source_id = source.id().to_string();
1220            by_id.insert(source_id.clone(), source);
1221            stats
1222                .entry(source_id)
1223                .or_insert_with(AdaptiveSourceStats::default);
1224        }
1225        *self.read_sources.write().await = by_id;
1226    }
1227
1228    fn route_stats_for<'a>(
1229        stats: &'a mut HashMap<String, AdaptiveSourceStats>,
1230        route_id: &str,
1231    ) -> &'a mut AdaptiveSourceStats {
1232        stats
1233            .entry(route_id.to_string())
1234            .or_insert_with(AdaptiveSourceStats::default)
1235    }
1236
1237    async fn record_route_request(&self, route_id: &str) {
1238        let mut stats = self.read_route_stats.write().await;
1239        Self::route_stats_for(&mut stats, route_id).requests += 1;
1240    }
1241
1242    async fn record_route_success(&self, route_id: &str, elapsed_ms: u64) {
1243        let now = Instant::now();
1244        let mut stats = self.read_route_stats.write().await;
1245        let stats = Self::route_stats_for(&mut stats, route_id);
1246        stats.successes += 1;
1247        stats.last_success_at = Some(now);
1248        stats.backoff_level = 0;
1249        stats.backed_off_until = None;
1250        if stats.srtt_ms <= 0.0 {
1251            stats.srtt_ms = elapsed_ms as f64;
1252            stats.rttvar_ms = elapsed_ms as f64 / 2.0;
1253            return;
1254        }
1255        let elapsed = elapsed_ms as f64;
1256        stats.rttvar_ms = 0.75 * stats.rttvar_ms + 0.25 * (stats.srtt_ms - elapsed).abs();
1257        stats.srtt_ms = 0.875 * stats.srtt_ms + 0.125 * elapsed;
1258    }
1259
1260    async fn record_route_miss(&self, route_id: &str) {
1261        let mut stats = self.read_route_stats.write().await;
1262        Self::route_stats_for(&mut stats, route_id).misses += 1;
1263    }
1264
1265    async fn record_route_timeout(&self, route_id: &str) {
1266        let mut stats = self.read_route_stats.write().await;
1267        Self::route_stats_for(&mut stats, route_id).timeouts += 1;
1268    }
1269
1270    async fn record_read_source_request(&self, source_id: &str) {
1271        let mut stats = self.read_source_stats.write().await;
1272        stats
1273            .entry(source_id.to_string())
1274            .or_insert_with(AdaptiveSourceStats::default)
1275            .requests += 1;
1276    }
1277
1278    async fn record_read_source_miss(&self, source_id: &str) {
1279        let mut stats = self.read_source_stats.write().await;
1280        stats
1281            .entry(source_id.to_string())
1282            .or_insert_with(AdaptiveSourceStats::default)
1283            .misses += 1;
1284    }
1285
1286    async fn record_read_source_success(&self, source_id: &str, elapsed_ms: u64) {
1287        let now = Instant::now();
1288        let mut stats = self.read_source_stats.write().await;
1289        let stats = stats
1290            .entry(source_id.to_string())
1291            .or_insert_with(AdaptiveSourceStats::default);
1292        stats.successes += 1;
1293        stats.last_success_at = Some(now);
1294        stats.backoff_level = 0;
1295        stats.backed_off_until = None;
1296        if stats.srtt_ms <= 0.0 {
1297            stats.srtt_ms = elapsed_ms as f64;
1298            stats.rttvar_ms = elapsed_ms as f64 / 2.0;
1299            return;
1300        }
1301        let elapsed = elapsed_ms as f64;
1302        stats.rttvar_ms = 0.75 * stats.rttvar_ms + 0.25 * (stats.srtt_ms - elapsed).abs();
1303        stats.srtt_ms = 0.875 * stats.srtt_ms + 0.125 * elapsed;
1304    }
1305
1306    async fn record_read_source_failure(&self, source_id: &str) {
1307        let now = Instant::now();
1308        let mut stats = self.read_source_stats.write().await;
1309        let stats = stats
1310            .entry(source_id.to_string())
1311            .or_insert_with(AdaptiveSourceStats::default);
1312        stats.failures += 1;
1313        stats.last_failure_at = Some(now);
1314        Self::apply_source_backoff(stats, now);
1315    }
1316
1317    async fn record_read_source_timeout(&self, source_id: &str) {
1318        let now = Instant::now();
1319        let mut stats = self.read_source_stats.write().await;
1320        let stats = stats
1321            .entry(source_id.to_string())
1322            .or_insert_with(AdaptiveSourceStats::default);
1323        stats.timeouts += 1;
1324        stats.last_failure_at = Some(now);
1325        Self::apply_source_backoff(stats, now);
1326    }
1327
1328    fn apply_source_backoff(stats: &mut AdaptiveSourceStats, now: Instant) {
1329        stats.backoff_level = stats.backoff_level.saturating_add(1);
1330        let backoff_ms = (INITIAL_SOURCE_BACKOFF_MS
1331            .saturating_mul(2u64.saturating_pow(stats.backoff_level.saturating_sub(1))))
1332        .min(MAX_SOURCE_BACKOFF_MS);
1333        stats.backed_off_until = Some(now + Duration::from_millis(backoff_ms));
1334    }
1335
1336    async fn ordered_read_sources(&self) -> Vec<Arc<dyn MeshReadSource>> {
1337        let sources = self.read_sources.read().await;
1338        if sources.is_empty() {
1339            return Vec::new();
1340        }
1341
1342        let mut available: Vec<Arc<dyn MeshReadSource>> = sources
1343            .values()
1344            .filter(|source| source.is_available())
1345            .cloned()
1346            .collect();
1347        if available.is_empty() {
1348            return Vec::new();
1349        }
1350
1351        let now = Instant::now();
1352        let stats = self.read_source_stats.read().await;
1353        let mut healthy: Vec<Arc<dyn MeshReadSource>> = available
1354            .iter()
1355            .filter(|source| {
1356                stats
1357                    .get(source.id())
1358                    .and_then(|s| s.backed_off_until)
1359                    .is_none_or(|until| until <= now)
1360            })
1361            .cloned()
1362            .collect();
1363        if !healthy.is_empty() {
1364            available = std::mem::take(&mut healthy);
1365        }
1366
1367        available.sort_by(|left, right| {
1368            let left_stats = stats.get(left.id()).cloned().unwrap_or_default();
1369            let right_stats = stats.get(right.id()).cloned().unwrap_or_default();
1370            adaptive_source_score(&right_stats, now)
1371                .partial_cmp(&adaptive_source_score(&left_stats, now))
1372                .unwrap_or(std::cmp::Ordering::Equal)
1373                .then_with(|| left.id().cmp(right.id()))
1374        });
1375        available
1376    }
1377
1378    async fn should_probe_multiple_read_sources(
1379        &self,
1380        ordered_sources: &[Arc<dyn MeshReadSource>],
1381    ) -> bool {
1382        if ordered_sources.len() <= 1 {
1383            return false;
1384        }
1385        let stats = self.read_source_stats.read().await;
1386        let best = stats
1387            .get(ordered_sources[0].id())
1388            .cloned()
1389            .unwrap_or_default();
1390        let second = stats
1391            .get(ordered_sources[1].id())
1392            .cloned()
1393            .unwrap_or_default();
1394        if !source_has_history(&best) || !source_has_history(&second) {
1395            return true;
1396        }
1397        let now = Instant::now();
1398        adaptive_source_score(&best, now) - adaptive_source_score(&second, now)
1399            < SOURCE_SCORE_TIE_DELTA
1400    }
1401
1402    async fn source_dispatch_for(&self, source_count: usize) -> RequestDispatchConfig {
1403        if source_count == 0 {
1404            return self.routing.dispatch;
1405        }
1406        let ordered_sources = self.ordered_read_sources().await;
1407        let probe_multiple = self
1408            .should_probe_multiple_read_sources(&ordered_sources)
1409            .await;
1410        RequestDispatchConfig {
1411            initial_fanout: if probe_multiple {
1412                source_count.min(2)
1413            } else {
1414                1
1415            },
1416            hedge_fanout: self.routing.dispatch.hedge_fanout,
1417            max_fanout: self.routing.dispatch.max_fanout.min(source_count),
1418            hedge_interval_ms: self.routing.dispatch.hedge_interval_ms,
1419        }
1420    }
1421
1422    /// Get peer count
1423    pub async fn peer_count(&self) -> usize {
1424        self.signaling.peer_count().await
1425    }
1426
1427    /// Check if we need more peers
1428    pub async fn needs_peers(&self) -> bool {
1429        self.signaling.needs_peers().await
1430    }
1431
1432    /// Re-broadcast hello to refresh discovery as topology changes.
1433    pub async fn send_hello(&self) -> Result<(), TransportError> {
1434        self.signaling.send_hello(vec![]).await
1435    }
1436
1437    /// Drain all currently available peer-link messages and handle them.
1438    ///
1439    /// This keeps the message pump logic shared between simulation and the
1440    /// default production wrapper instead of duplicating per-channel loops.
1441    pub async fn drain_available_data_messages(self: &Arc<Self>) -> DataPumpStats {
1442        let mut stats = DataPumpStats::default();
1443        let peer_ids = self.signaling.peer_ids().await;
1444        for peer_id in peer_ids {
1445            let Some(channel) = self.signaling.get_channel(&peer_id).await else {
1446                continue;
1447            };
1448
1449            while let Some(data) = channel.try_recv() {
1450                stats.processed += 1;
1451                stats.processed_bytes += data.len() as u64;
1452                if let Some(msg) = parse_message(&data) {
1453                    match msg {
1454                        DataMessage::Request(_) => stats.request_messages += 1,
1455                        DataMessage::Response(_) => stats.response_messages += 1,
1456                        DataMessage::QuoteRequest(_) => stats.quote_request_messages += 1,
1457                        DataMessage::QuoteResponse(_) => stats.quote_response_messages += 1,
1458                        DataMessage::Payment(_)
1459                        | DataMessage::PaymentAck(_)
1460                        | DataMessage::Chunk(_) => {}
1461                    }
1462                }
1463                self.handle_data_message(&peer_id, &data).await;
1464            }
1465        }
1466        stats
1467    }
1468
1469    /// Apply an out-of-band payment credit to a peer's routing priority.
1470    pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
1471        self.peer_selector
1472            .write()
1473            .await
1474            .record_cashu_payment(peer_id, amount_sat);
1475    }
1476
1477    /// Record a post-delivery payment we received from a peer.
1478    pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
1479        self.peer_selector
1480            .write()
1481            .await
1482            .record_cashu_receipt(peer_id, amount_sat);
1483    }
1484
1485    /// Record that a peer failed to pay after we delivered successfully.
1486    pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
1487        self.peer_selector
1488            .write()
1489            .await
1490            .record_cashu_payment_default(peer_id);
1491    }
1492
1493    /// Snapshot routing/selection summary for inspection/debugging.
1494    pub async fn selector_summary(&self) -> crate::peer_selector::SelectorSummary {
1495        self.peer_selector.read().await.summary()
1496    }
1497
1498    fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
1499        selector.is_peer_blocked_for_payment_defaults(
1500            peer_id,
1501            self.routing.cashu_payment_default_block_threshold,
1502        )
1503    }
1504
1505    /// Export live peer metadata for inspection/debugging.
1506    pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
1507        self.peer_selector
1508            .read()
1509            .await
1510            .export_peer_metadata_snapshot()
1511    }
1512
1513    /// Snapshot current peer metadata and persist it into `local_store`.
1514    ///
1515    /// Uses content-addressed storage for the snapshot body and a reserved
1516    /// mutable pointer slot for the "latest snapshot hash".
1517    pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
1518        let snapshot = self
1519            .peer_selector
1520            .read()
1521            .await
1522            .export_peer_metadata_snapshot();
1523        let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
1524            StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
1525        })?;
1526        let snapshot_hash = hashtree_core::sha256(&bytes);
1527        let _ = self.local_store.put(snapshot_hash, bytes).await?;
1528
1529        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
1530        let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
1531        let _ = self.local_store.delete(&pointer_slot).await?;
1532        let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
1533
1534        Ok(snapshot_hash)
1535    }
1536
1537    /// Load persisted peer metadata from `local_store` if available.
1538    pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
1539        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
1540        let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
1541            return Ok(false);
1542        };
1543        let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
1544            StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
1545        })?;
1546        let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
1547
1548        let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
1549            return Ok(false);
1550        };
1551        let snapshot: PeerMetadataSnapshot =
1552            serde_json::from_slice(&snapshot_bytes).map_err(|e| {
1553                StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
1554            })?;
1555        self.peer_selector
1556            .write()
1557            .await
1558            .import_peer_metadata_snapshot(&snapshot);
1559        Ok(true)
1560    }
1561
1562    /// Request data from peers after negotiating a paid quote.
1563    ///
1564    /// If quote negotiation fails or the quoted peer does not deliver, the store
1565    /// falls back to the normal unpaid retrieval path to preserve liveness.
1566    pub async fn get_with_quote(
1567        &self,
1568        hash: &Hash,
1569        payment_sat: u64,
1570        quote_ttl: Duration,
1571    ) -> Result<Option<Vec<u8>>, StoreError> {
1572        if let Some(data) = self.local_store.get(hash).await? {
1573            return Ok(Some(data));
1574        }
1575        Ok(self
1576            .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
1577            .await)
1578    }
1579
1580    async fn request_from_peers_with_quote(
1581        &self,
1582        hash: &Hash,
1583        payment_sat: u64,
1584        quote_ttl: Duration,
1585    ) -> Option<Vec<u8>> {
1586        let ordered_peer_ids = self.ordered_connected_peers(None).await;
1587        if ordered_peer_ids.is_empty() {
1588            return None;
1589        }
1590
1591        if let Some(quote) = self
1592            .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
1593            .await
1594        {
1595            if let Some(data) = self
1596                .request_from_single_peer(hash, &quote.peer_id, MAX_HTL, Some(quote.quote_id))
1597                .await
1598            {
1599                return Some(data);
1600            }
1601        }
1602
1603        self.request_from_mesh(hash).await
1604    }
1605
1606    async fn request_quote_from_peers(
1607        &self,
1608        hash: &Hash,
1609        payment_sat: u64,
1610        quote_ttl: Duration,
1611        ordered_peer_ids: &[String],
1612    ) -> Option<NegotiatedQuote> {
1613        if ordered_peer_ids.is_empty() {
1614            return None;
1615        }
1616        let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
1617        if ttl_ms == 0 {
1618            return None;
1619        }
1620        let requested_mint = self.requested_quote_mint().map(str::to_string);
1621
1622        let hash_key = hash_to_key(hash);
1623        let (tx, rx) = oneshot::channel();
1624        self.pending_quotes.write().await.insert(
1625            hash_key.clone(),
1626            PendingQuoteRequest {
1627                response_tx: tx,
1628                preferred_mint_url: requested_mint.clone(),
1629                offered_payment_sat: payment_sat,
1630            },
1631        );
1632
1633        let rx = Arc::new(Mutex::new(rx));
1634        let result = run_hedged_waves(
1635            ordered_peer_ids.len(),
1636            self.routing.dispatch,
1637            self.request_timeout,
1638            |range| {
1639                let wave_peer_ids = ordered_peer_ids[range].to_vec();
1640                let requested_mint = requested_mint.clone();
1641                let hash = *hash;
1642                async move {
1643                    let mut sent = 0usize;
1644                    for peer_id in wave_peer_ids {
1645                        if self
1646                            .send_quote_request_to_peer(
1647                                &peer_id,
1648                                &hash,
1649                                payment_sat,
1650                                ttl_ms,
1651                                requested_mint.as_deref(),
1652                            )
1653                            .await
1654                        {
1655                            sent += 1;
1656                        }
1657                    }
1658                    sent
1659                }
1660            },
1661            |wait| {
1662                let rx = rx.clone();
1663                async move {
1664                    let mut rx = rx.lock().await;
1665                    match tokio::time::timeout(wait, &mut *rx).await {
1666                        Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
1667                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1668                        Err(_) => HedgedWaveAction::Continue,
1669                    }
1670                }
1671            },
1672        )
1673        .await;
1674        let _ = self.pending_quotes.write().await.remove(&hash_key);
1675        result
1676    }
1677
1678    async fn request_from_single_peer(
1679        &self,
1680        hash: &Hash,
1681        peer_id: &str,
1682        request_htl: u8,
1683        quote_id: Option<u64>,
1684    ) -> Option<Vec<u8>> {
1685        let hash_key = hash_to_key(hash);
1686        let (tx, rx) = oneshot::channel();
1687        self.pending_requests.write().await.insert(
1688            hash_key.clone(),
1689            PendingRequest {
1690                response_tx: tx,
1691                started_at: Instant::now(),
1692                queried_peers: vec![peer_id.to_string()],
1693            },
1694        );
1695
1696        let mut rx = rx;
1697        if !self
1698            .send_request_to_peer(peer_id, hash, request_htl, quote_id)
1699            .await
1700        {
1701            let _ = self.pending_requests.write().await.remove(&hash_key);
1702            return None;
1703        }
1704        self.reserve_peer_request(peer_id).await;
1705
1706        if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
1707            if hashtree_core::sha256(&data) == *hash {
1708                let _ = self.local_store.put(*hash, data.clone()).await;
1709                return Some(data);
1710            }
1711        }
1712
1713        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1714            self.release_queried_peer_requests(&pending.queried_peers)
1715                .await;
1716            for peer_id in pending.queried_peers {
1717                self.peer_selector.write().await.record_timeout(&peer_id);
1718            }
1719        }
1720        let _ = self.take_forward_requesters(&hash_key).await;
1721        None
1722    }
1723
1724    async fn request_from_ordered_peers(
1725        &self,
1726        hash: &Hash,
1727        ordered_peer_ids: &[String],
1728        request_htl: u8,
1729    ) -> RouteFetchOutcome {
1730        let hash_key = hash_to_key(hash);
1731        let (tx, rx) = oneshot::channel();
1732        self.pending_requests.write().await.insert(
1733            hash_key.clone(),
1734            PendingRequest {
1735                response_tx: tx,
1736                started_at: Instant::now(),
1737                queried_peers: Vec::new(),
1738            },
1739        );
1740
1741        let rx = Arc::new(Mutex::new(rx));
1742        let result = run_hedged_waves(
1743            ordered_peer_ids.len(),
1744            self.routing.dispatch,
1745            self.request_timeout,
1746            |range| {
1747                let wave_peer_ids = ordered_peer_ids[range].to_vec();
1748                let hash = *hash;
1749                let hash_key = hash_key.clone();
1750                async move {
1751                    let mut sent = 0usize;
1752                    for peer_id in wave_peer_ids {
1753                        if self
1754                            .send_request_to_peer(&peer_id, &hash, request_htl, None)
1755                            .await
1756                        {
1757                            sent += 1;
1758                            self.reserve_peer_request(&peer_id).await;
1759                            if let Some(pending) =
1760                                self.pending_requests.write().await.get_mut(&hash_key)
1761                            {
1762                                pending.queried_peers.push(peer_id);
1763                            }
1764                        }
1765                    }
1766                    sent
1767                }
1768            },
1769            |wait| {
1770                let rx = rx.clone();
1771                async move {
1772                    let mut rx = rx.lock().await;
1773                    match tokio::time::timeout(wait, &mut *rx).await {
1774                        Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == *hash => {
1775                            HedgedWaveAction::Success(data)
1776                        }
1777                        Ok(Ok(Some(_))) => HedgedWaveAction::Continue,
1778                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1779                        Err(_) => HedgedWaveAction::Continue,
1780                    }
1781                }
1782            },
1783        )
1784        .await;
1785
1786        let Some(data) = result else {
1787            if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1788                self.release_queried_peer_requests(&pending.queried_peers)
1789                    .await;
1790                for peer_id in pending.queried_peers {
1791                    self.peer_selector.write().await.record_timeout(&peer_id);
1792                }
1793            }
1794            let _ = self.take_forward_requesters(&hash_key).await;
1795            return RouteFetchOutcome::Timeout;
1796        };
1797
1798        let _ = self.local_store.put(*hash, data.clone()).await;
1799        RouteFetchOutcome::Hit(data)
1800    }
1801
1802    async fn request_from_read_sources_inner(&self, hash: &Hash) -> RouteFetchOutcome {
1803        let ordered_sources = self.ordered_read_sources().await;
1804        if ordered_sources.is_empty() {
1805            return RouteFetchOutcome::Miss;
1806        }
1807
1808        let dispatch = normalize_dispatch_config(
1809            self.source_dispatch_for(ordered_sources.len()).await,
1810            ordered_sources.len(),
1811        );
1812        let wave_plan = build_hedged_wave_plan(ordered_sources.len(), dispatch);
1813        if wave_plan.is_empty() {
1814            return RouteFetchOutcome::Miss;
1815        }
1816
1817        let deadline = Instant::now() + self.request_timeout;
1818        let mut pending = FuturesUnordered::new();
1819        let mut pending_source_ids = HashSet::new();
1820        let mut saw_timeout = false;
1821        let mut next_source_idx = 0usize;
1822
1823        for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
1824            let from = next_source_idx;
1825            let to = (next_source_idx + wave_size).min(ordered_sources.len());
1826            next_source_idx = to;
1827
1828            for source in ordered_sources[from..to].iter().cloned() {
1829                let source_id = source.id().to_string();
1830                self.record_read_source_request(&source_id).await;
1831                pending_source_ids.insert(source_id.clone());
1832                let hash = *hash;
1833                pending.push(tokio::spawn(async move {
1834                    let started_at = Instant::now();
1835                    let result = std::panic::AssertUnwindSafe(source.get(&hash))
1836                        .catch_unwind()
1837                        .await;
1838                    match result {
1839                        Ok(Some(data)) => SourceFetchOutcome::Hit {
1840                            source_id,
1841                            data,
1842                            elapsed_ms: started_at.elapsed().as_millis().max(1) as u64,
1843                        },
1844                        Ok(None) => SourceFetchOutcome::Miss { source_id },
1845                        Err(_) => SourceFetchOutcome::Failure { source_id },
1846                    }
1847                }));
1848            }
1849
1850            let is_last_wave =
1851                wave_idx + 1 == wave_plan.len() || next_source_idx >= ordered_sources.len();
1852            let window_end = if is_last_wave {
1853                deadline
1854            } else {
1855                (Instant::now() + Duration::from_millis(dispatch.hedge_interval_ms)).min(deadline)
1856            };
1857
1858            while Instant::now() < window_end {
1859                let remaining = window_end.saturating_duration_since(Instant::now());
1860                let Some(result) = tokio::time::timeout(remaining, pending.next())
1861                    .await
1862                    .ok()
1863                    .flatten()
1864                else {
1865                    break;
1866                };
1867                let Ok(outcome) = result else {
1868                    continue;
1869                };
1870                match outcome {
1871                    SourceFetchOutcome::Hit {
1872                        source_id,
1873                        data,
1874                        elapsed_ms,
1875                    } => {
1876                        pending_source_ids.remove(&source_id);
1877                        self.record_read_source_success(&source_id, elapsed_ms)
1878                            .await;
1879                        return RouteFetchOutcome::Hit(data);
1880                    }
1881                    SourceFetchOutcome::Miss { source_id } => {
1882                        pending_source_ids.remove(&source_id);
1883                        self.record_read_source_miss(&source_id).await;
1884                    }
1885                    SourceFetchOutcome::Failure { source_id } => {
1886                        pending_source_ids.remove(&source_id);
1887                        self.record_read_source_failure(&source_id).await;
1888                    }
1889                }
1890            }
1891
1892            if Instant::now() >= deadline {
1893                break;
1894            }
1895        }
1896
1897        for source_id in pending_source_ids {
1898            saw_timeout = true;
1899            self.record_read_source_timeout(&source_id).await;
1900        }
1901        if saw_timeout {
1902            RouteFetchOutcome::Timeout
1903        } else {
1904            RouteFetchOutcome::Miss
1905        }
1906    }
1907
1908    async fn request_from_read_sources(&self, hash: &Hash) -> RouteFetchOutcome {
1909        let hash_key = hash_to_key(hash);
1910        let existing_wait = {
1911            let mut inflight = self.inflight_source_fetches.lock().await;
1912            if let Some(existing) = inflight.get_mut(&hash_key) {
1913                let (tx, rx) = oneshot::channel();
1914                existing.waiters.push(tx);
1915                Some(rx)
1916            } else {
1917                inflight.insert(
1918                    hash_key.clone(),
1919                    InflightSourceFetch {
1920                        waiters: Vec::new(),
1921                    },
1922                );
1923                None
1924            }
1925        };
1926
1927        if let Some(wait) = existing_wait {
1928            return wait.await.unwrap_or(RouteFetchOutcome::Timeout);
1929        }
1930
1931        let result = self.request_from_read_sources_inner(hash).await;
1932        if let RouteFetchOutcome::Hit(hit) = &result {
1933            let _ = self.local_store.put(*hash, hit.clone()).await;
1934        }
1935
1936        let waiters = self
1937            .inflight_source_fetches
1938            .lock()
1939            .await
1940            .remove(&hash_key)
1941            .map(|inflight| inflight.waiters)
1942            .unwrap_or_default();
1943        for waiter in waiters {
1944            let _ = waiter.send(result.clone());
1945        }
1946
1947        result
1948    }
1949
1950    async fn available_read_routes(&self, context: &MeshReadContext) -> Vec<ReadRoute> {
1951        let mut routes = Vec::new();
1952        let ordered_peers = self
1953            .ordered_connected_peers(context.exclude_peer_id.as_deref())
1954            .await;
1955        if !ordered_peers.is_empty() {
1956            routes.push(ReadRoute::Peers(ordered_peers));
1957        }
1958        if !self.ordered_read_sources().await.is_empty() {
1959            routes.push(ReadRoute::Sources);
1960        }
1961        if routes.len() <= 1 {
1962            return routes;
1963        }
1964
1965        let now = Instant::now();
1966        let stats = self.read_route_stats.read().await;
1967        routes.sort_by(|left, right| {
1968            let left_stats = stats.get(left.id()).cloned().unwrap_or_default();
1969            let right_stats = stats.get(right.id()).cloned().unwrap_or_default();
1970            adaptive_source_score(&right_stats, now)
1971                .partial_cmp(&adaptive_source_score(&left_stats, now))
1972                .unwrap_or(std::cmp::Ordering::Equal)
1973                .then_with(|| left.id().cmp(right.id()))
1974        });
1975        routes
1976    }
1977
1978    async fn should_probe_multiple_routes(&self, routes: &[ReadRoute]) -> bool {
1979        if routes.len() <= 1 {
1980            return false;
1981        }
1982        let stats = self.read_route_stats.read().await;
1983        let best = stats.get(routes[0].id()).cloned().unwrap_or_default();
1984        let second = stats.get(routes[1].id()).cloned().unwrap_or_default();
1985        if !source_has_history(&best) || !source_has_history(&second) {
1986            return true;
1987        }
1988        let now = Instant::now();
1989        adaptive_source_score(&best, now) - adaptive_source_score(&second, now)
1990            < SOURCE_SCORE_TIE_DELTA
1991    }
1992
1993    async fn run_read_route(
1994        &self,
1995        hash: &Hash,
1996        route: &ReadRoute,
1997        context: &MeshReadContext,
1998    ) -> RouteFetchOutcome {
1999        let route_id = route.id();
2000        self.record_route_request(route_id).await;
2001        let started_at = Instant::now();
2002        let result = match route {
2003            ReadRoute::Peers(peer_ids) => {
2004                self.request_from_ordered_peers(hash, peer_ids, context.request_htl)
2005                    .await
2006            }
2007            ReadRoute::Sources => self.request_from_read_sources(hash).await,
2008        };
2009        match &result {
2010            RouteFetchOutcome::Hit(_) => {
2011                self.record_route_success(route_id, started_at.elapsed().as_millis().max(1) as u64)
2012                    .await;
2013            }
2014            RouteFetchOutcome::Miss => {
2015                self.record_route_miss(route_id).await;
2016            }
2017            RouteFetchOutcome::Timeout => {
2018                self.record_route_timeout(route_id).await;
2019            }
2020        }
2021        result
2022    }
2023
2024    async fn request_from_mesh_with_context(
2025        &self,
2026        hash: &Hash,
2027        context: &MeshReadContext,
2028    ) -> Option<Vec<u8>> {
2029        let routes = self.available_read_routes(context).await;
2030        match routes.as_slice() {
2031            [] => None,
2032            [route] => match self.run_read_route(hash, route, context).await {
2033                RouteFetchOutcome::Hit(data) => Some(data),
2034                RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
2035            },
2036            [first, second, ..] => {
2037                if self.should_probe_multiple_routes(&routes).await {
2038                    let first_fut = self.run_read_route(hash, first, context);
2039                    let second_fut = self.run_read_route(hash, second, context);
2040                    tokio::pin!(first_fut);
2041                    tokio::pin!(second_fut);
2042                    let mut first_done = false;
2043                    let mut second_done = false;
2044                    loop {
2045                        tokio::select! {
2046                            result = &mut first_fut, if !first_done => {
2047                                first_done = true;
2048                                if let RouteFetchOutcome::Hit(data) = result {
2049                                    return Some(data);
2050                                }
2051                            }
2052                            result = &mut second_fut, if !second_done => {
2053                                second_done = true;
2054                                if let RouteFetchOutcome::Hit(data) = result {
2055                                    return Some(data);
2056                                }
2057                            }
2058                            else => break,
2059                        }
2060                        if first_done && second_done {
2061                            break;
2062                        }
2063                    }
2064                    None
2065                } else {
2066                    match self.run_read_route(hash, first, context).await {
2067                        RouteFetchOutcome::Hit(data) => return Some(data),
2068                        RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
2069                    }
2070                    match self.run_read_route(hash, second, context).await {
2071                        RouteFetchOutcome::Hit(data) => Some(data),
2072                        RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
2073                    }
2074                }
2075            }
2076        }
2077    }
2078
2079    async fn request_from_mesh(&self, hash: &Hash) -> Option<Vec<u8>> {
2080        self.request_from_mesh_with_context(hash, &MeshReadContext::default())
2081            .await
2082    }
2083
2084    async fn begin_forward_request(&self, hash_key: &str, requester_id: &str) -> bool {
2085        let mut pending = self.pending_forward_requests.write().await;
2086        if let Some(existing) = pending.get_mut(hash_key) {
2087            existing.requester_ids.insert(requester_id.to_string());
2088            return false;
2089        }
2090
2091        let mut requester_ids = HashSet::new();
2092        requester_ids.insert(requester_id.to_string());
2093        pending.insert(
2094            hash_key.to_string(),
2095            PendingForwardRequest { requester_ids },
2096        );
2097        true
2098    }
2099
2100    async fn take_forward_requesters(&self, hash_key: &str) -> Vec<String> {
2101        self.pending_forward_requests
2102            .write()
2103            .await
2104            .remove(hash_key)
2105            .map(|pending| pending.requester_ids.into_iter().collect())
2106            .unwrap_or_default()
2107    }
2108
2109    async fn complete_pending_response(
2110        self: &Arc<Self>,
2111        from_peer: &str,
2112        hash: &Hash,
2113        hash_key: String,
2114        payload: Vec<u8>,
2115    ) {
2116        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2117            self.release_queried_peer_requests(&pending.queried_peers)
2118                .await;
2119            let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
2120            self.peer_selector.write().await.record_success(
2121                from_peer,
2122                rtt_ms,
2123                payload.len() as u64,
2124            );
2125            let forward_requesters = self.take_forward_requesters(&hash_key).await;
2126            let response_bytes = if forward_requesters.is_empty() {
2127                None
2128            } else {
2129                Some(encode_response(&create_response(hash, payload.clone())))
2130            };
2131            let _ = pending.response_tx.send(Some(payload));
2132            if let Some(response_bytes) = response_bytes {
2133                for requester_id in forward_requesters {
2134                    Arc::clone(&self)
2135                        .enqueue_response_send(requester_id, response_bytes.clone(), Instant::now())
2136                        .await;
2137                }
2138            }
2139        }
2140    }
2141
2142    async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
2143        if !res.a {
2144            return;
2145        }
2146
2147        let Some(quote_id) = res.q else {
2148            return;
2149        };
2150
2151        let hash_key = hash_to_key(&res.h);
2152        let (preferred_mint_url, offered_payment_sat) = {
2153            let pending_quotes = self.pending_quotes.read().await;
2154            let Some(pending) = pending_quotes.get(&hash_key) else {
2155                return;
2156            };
2157            (
2158                pending.preferred_mint_url.clone(),
2159                pending.offered_payment_sat,
2160            )
2161        };
2162        if !self
2163            .should_accept_quote_response(
2164                from_peer,
2165                preferred_mint_url.as_deref(),
2166                offered_payment_sat,
2167                &res,
2168            )
2169            .await
2170        {
2171            return;
2172        }
2173        let mut pending_quotes = self.pending_quotes.write().await;
2174        if let Some(pending) = pending_quotes.remove(&hash_key) {
2175            let _ = pending.response_tx.send(Some(NegotiatedQuote {
2176                peer_id: from_peer.to_string(),
2177                quote_id,
2178                mint_url: res.m,
2179            }));
2180        }
2181    }
2182
2183    async fn handle_response_message(
2184        self: &Arc<Self>,
2185        from_peer: &str,
2186        res: crate::protocol::DataResponse,
2187    ) {
2188        let hash_key = hash_to_key(&res.h);
2189        let hash = match crate::protocol::bytes_to_hash(&res.h) {
2190            Some(h) => h,
2191            None => return,
2192        };
2193
2194        // Ignore malformed/corrupt payload and keep waiting for a valid response.
2195        if hashtree_core::sha256(&res.d) != hash {
2196            self.peer_selector.write().await.record_failure(from_peer);
2197            if self.debug {
2198                println!("[MeshStoreCore] Ignoring invalid response payload for {hash_key}");
2199            }
2200            return;
2201        }
2202
2203        self.complete_pending_response(from_peer, &hash, hash_key, res.d)
2204            .await;
2205    }
2206
2207    async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
2208        let hash = match crate::protocol::bytes_to_hash(&req.h) {
2209            Some(h) => h,
2210            None => return,
2211        };
2212        let hash_key = hash_to_key(&hash);
2213
2214        {
2215            let selector = self.peer_selector.read().await;
2216            if self.should_refuse_requests_from_peer(&selector, from_peer) {
2217                if self.debug {
2218                    println!(
2219                        "[MeshStoreCore] Refusing quote request from delinquent peer {}",
2220                        from_peer
2221                    );
2222                }
2223                return;
2224            }
2225        }
2226
2227        let chosen_mint = self.choose_quote_mint(req.m.as_deref());
2228        let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
2229            && !self.should_drop_response(&hash)
2230            && !self.should_corrupt_response(&hash);
2231
2232        let res = if can_serve {
2233            let quote_id = self
2234                .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
2235                .await;
2236            create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
2237        } else {
2238            create_quote_response_unavailable(&hash)
2239        };
2240        let response_bytes = encode_quote_response(&res);
2241        if let Some(channel) = self.signaling.get_channel(from_peer).await {
2242            if channel.send(response_bytes.clone()).await.is_ok() {
2243                self.record_peer_wire_sent(from_peer, response_bytes.len() as u64)
2244                    .await;
2245            }
2246        }
2247    }
2248
2249    async fn handle_request_message(
2250        self: &Arc<Self>,
2251        from_peer: &str,
2252        req: crate::protocol::DataRequest,
2253    ) {
2254        let hash = match crate::protocol::bytes_to_hash(&req.h) {
2255            Some(h) => h,
2256            None => return,
2257        };
2258        let hash_key = hash_to_key(&hash);
2259
2260        if let Some(quote_id) = req.q {
2261            if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
2262                if self.debug {
2263                    println!(
2264                        "[MeshStoreCore] Refusing request with invalid or expired quote {} from {}",
2265                        quote_id, from_peer
2266                    );
2267                }
2268                return;
2269            }
2270        }
2271
2272        let allow_peer_forwarding = {
2273            let selector = self.peer_selector.read().await;
2274            !self.should_refuse_requests_from_peer(&selector, from_peer)
2275        };
2276
2277        // Check local store
2278        if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
2279            if self.should_drop_response(&hash) {
2280                if self.debug {
2281                    println!(
2282                        "[MeshStoreCore] Dropping response for {} due to actor profile",
2283                        hash_to_key(&hash)
2284                    );
2285                }
2286                return;
2287            }
2288
2289            let response_delay = self.response_send_delay(&hash, data.len());
2290            if self.should_corrupt_response(&hash) {
2291                if data.is_empty() {
2292                    data.push(0x80);
2293                } else {
2294                    data[0] ^= 0x80;
2295                }
2296            }
2297
2298            // Send response
2299            let res = create_response(&hash, data);
2300            let response_bytes = encode_response(&res);
2301            let ready_at = Instant::now() + response_delay;
2302            Arc::clone(self)
2303                .enqueue_response_send(from_peer.to_string(), response_bytes, ready_at)
2304                .await;
2305            return;
2306        }
2307
2308        if self.pending_requests.read().await.contains_key(&hash_key) {
2309            let _ = self.begin_forward_request(&hash_key, from_peer).await;
2310            return;
2311        }
2312
2313        if !self.begin_forward_request(&hash_key, from_peer).await {
2314            return;
2315        }
2316
2317        let from_peer = from_peer.to_string();
2318        let this = Arc::clone(self);
2319        let request_htl = req.htl;
2320        tokio::spawn(async move {
2321            let result = if allow_peer_forwarding {
2322                let context = MeshReadContext {
2323                    exclude_peer_id: Some(from_peer.clone()),
2324                    request_htl,
2325                };
2326                this.request_from_mesh_with_context(&hash, &context).await
2327            } else {
2328                if this.debug {
2329                    println!(
2330                        "[MeshStoreCore] Serving request from delinquent peer {} via read sources only",
2331                        from_peer
2332                    );
2333                }
2334                match this.request_from_read_sources(&hash).await {
2335                    RouteFetchOutcome::Hit(data) => Some(data),
2336                    RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
2337                }
2338            };
2339            let requester_ids = this.take_forward_requesters(&hash_key).await;
2340            if let Some(data) = result {
2341                let ready_at = Instant::now() + this.response_send_delay(&hash, data.len());
2342                let res = create_response(&hash, data);
2343                let response_bytes = encode_response(&res);
2344                for requester_id in requester_ids {
2345                    Arc::clone(&this)
2346                        .enqueue_response_send(requester_id, response_bytes.clone(), ready_at)
2347                        .await;
2348                }
2349            }
2350        });
2351    }
2352
2353    /// Handle incoming data message
2354    pub async fn handle_data_message(self: &Arc<Self>, from_peer: &str, data: &[u8]) {
2355        self.record_peer_wire_received(from_peer, data.len() as u64)
2356            .await;
2357        let parsed = match parse_message(data) {
2358            Some(m) => m,
2359            None => return,
2360        };
2361
2362        match parsed {
2363            DataMessage::Request(req) => {
2364                self.handle_request_message(from_peer, req).await;
2365            }
2366            DataMessage::Response(res) => {
2367                self.handle_response_message(from_peer, res).await;
2368            }
2369            DataMessage::QuoteRequest(req) => {
2370                self.handle_quote_request_message(from_peer, req).await;
2371            }
2372            DataMessage::QuoteResponse(res) => {
2373                self.handle_quote_response_message(from_peer, res).await;
2374            }
2375            DataMessage::Payment(_) | DataMessage::PaymentAck(_) | DataMessage::Chunk(_) => {}
2376        }
2377    }
2378}
2379
2380#[async_trait]
2381impl<S, R, F> Store for MeshStoreCore<S, R, F>
2382where
2383    S: Store + Send + Sync + 'static,
2384    R: SignalingTransport + Send + Sync + 'static,
2385    F: PeerLinkFactory + Send + Sync + 'static,
2386{
2387    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2388        self.local_store.put(hash, data).await
2389    }
2390
2391    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2392        // Try local first
2393        if let Some(data) = self.local_store.get(hash).await? {
2394            return Ok(Some(data));
2395        }
2396
2397        // Try peers
2398        Ok(self.request_from_mesh(hash).await)
2399    }
2400
2401    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2402        self.local_store.has(hash).await
2403    }
2404
2405    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2406        self.local_store.delete(hash).await
2407    }
2408}
2409
2410#[cfg(test)]
2411mod tests;
2412
2413/// Type alias for simulation store.
2414pub type SimMeshStore<S> =
2415    MeshStoreCore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
2416
2417/// Type alias for the default production core (Nostr signaling + WebRTC links).
2418pub type ProductionMeshStore<S> =
2419    MeshStoreCore<S, crate::nostr::NostrRelayTransport, crate::real_factory::WebRtcPeerLinkFactory>;