Skip to main content

hashtree_network/
mesh_store_core.rs

1//! Shared routed mesh store core.
2//!
3//! This module provides a concrete store wrapper that works with any local storage
4//! backend plus any signaling transport and peer-link factory. Both production
5//! and simulation (mocks) use this same code.
6
7use async_trait::async_trait;
8use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
9use std::collections::hash_map::DefaultHasher;
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::future::Future;
12use std::hash::{Hash as _, Hasher};
13use std::ops::Range;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{oneshot, Mutex, RwLock};
18use tokio::time::Instant;
19
20use hashtree_core::{Hash, Store, StoreError};
21
22use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
23use crate::protocol::{
24    create_pubsub_frame, create_pubsub_interest, create_pubsub_inventory, create_pubsub_want,
25    create_quote_request, create_quote_response_available, create_quote_response_unavailable,
26    create_request, create_request_with_quote, create_response, encode_pubsub_frame,
27    encode_pubsub_interest, encode_pubsub_inventory, encode_pubsub_want, encode_quote_request,
28    encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
29    DataMessage, DataQuoteRequest, DataQuoteResponse, PubsubFrame, PubsubInterest, PubsubInventory,
30    PubsubWant,
31};
32use crate::pubsub_strategy::{
33    reciprocal_virtual_finish, select_reciprocal_outbound_job, OutboundJobCandidate,
34    PeerTrafficSnapshot, PubsubCandidate, PubsubSchedulerConfig,
35};
36use crate::signaling::MeshRouter;
37use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
38use crate::types::{
39    should_forward_htl, PeerHTLConfig, SignalingMessage, TimedSeenSet, MAX_HTL, MESH_EVENT_POLICY,
40};
41
42// Keep the on-disk namespace stable across the crate rename so existing peer
43// metadata does not disappear for users upgrading from the old package name.
44const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-mesh/peer-metadata/latest/v1";
45const RECENT_FORWARD_MISS_CAPACITY: usize = 4096;
46const MIN_RECENT_FORWARD_MISS_TTL_MS: u64 = 250;
47const PUBSUB_SEEN_CAPACITY: usize = 16_384;
48const PUBSUB_INBOX_CAPACITY: usize = 4_096;
49const PUBSUB_FRAME_CACHE_CAPACITY: usize = 4_096;
50const PUBSUB_SEEN_TTL: Duration = Duration::from_secs(120);
51
52/// Pending request awaiting response
53struct PendingRequest {
54    response_tx: oneshot::Sender<Option<Vec<u8>>>,
55    started_at: Instant,
56    queried_peers: Vec<String>,
57}
58
59struct PendingQuoteRequest {
60    response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
61    preferred_mint_url: Option<String>,
62    offered_payment_sat: u64,
63}
64
65struct PendingForwardRequest {
66    requester_ids: HashSet<String>,
67}
68
69type PeerWireStats = PeerTrafficSnapshot;
70
71struct PendingResponseSend {
72    job_id: u64,
73    peer_id: String,
74    bytes: Vec<u8>,
75    ready_at: Instant,
76    queue_sequence: u64,
77}
78
79#[async_trait]
80pub trait MeshReadSource: Send + Sync {
81    fn id(&self) -> &str;
82
83    fn is_available(&self) -> bool {
84        true
85    }
86
87    async fn get(&self, hash: &Hash) -> Option<Vec<u8>>;
88}
89
90#[derive(Debug, Clone)]
91struct NegotiatedQuote {
92    peer_id: String,
93    quote_id: u64,
94    #[allow(dead_code)]
95    mint_url: Option<String>,
96}
97
98struct IssuedQuote {
99    expires_at: Instant,
100    #[allow(dead_code)]
101    payment_sat: u64,
102    #[allow(dead_code)]
103    mint_url: Option<String>,
104}
105
106#[derive(Debug, Clone, Default)]
107struct AdaptiveSourceStats {
108    requests: u64,
109    successes: u64,
110    misses: u64,
111    failures: u64,
112    timeouts: u64,
113    srtt_ms: f64,
114    rttvar_ms: f64,
115    backoff_level: u32,
116    backed_off_until: Option<Instant>,
117    last_success_at: Option<Instant>,
118    last_failure_at: Option<Instant>,
119}
120
121#[derive(Debug, Clone)]
122enum RouteFetchOutcome {
123    Hit(Vec<u8>),
124    Miss,
125    Timeout,
126}
127
128struct InflightSourceFetch {
129    waiters: Vec<oneshot::Sender<RouteFetchOutcome>>,
130}
131
132enum SourceFetchOutcome {
133    Hit {
134        source_id: String,
135        data: Vec<u8>,
136        elapsed_ms: u64,
137    },
138    Miss {
139        source_id: String,
140    },
141    Failure {
142        source_id: String,
143    },
144}
145
146const INITIAL_SOURCE_BACKOFF_MS: u64 = 250;
147const MAX_SOURCE_BACKOFF_MS: u64 = 10_000;
148const SOURCE_SCORE_TIE_DELTA: f64 = 0.15;
149const RECENT_SOURCE_SUCCESS_WINDOW: Duration = Duration::from_secs(60);
150const ACTIVE_PEER_REQUEST_RANK_PENALTY: usize = 3;
151
152fn source_reliability_score(stats: &AdaptiveSourceStats) -> f64 {
153    (stats.successes as f64 + 1.0) / (stats.requests as f64 + 2.0)
154}
155
156fn source_latency_score(stats: &AdaptiveSourceStats) -> f64 {
157    if stats.srtt_ms <= 0.0 {
158        return 0.5;
159    }
160    (500.0 / (stats.srtt_ms + 50.0)).min(1.0)
161}
162
163fn source_has_history(stats: &AdaptiveSourceStats) -> bool {
164    stats.requests > 0
165        || stats.successes > 0
166        || stats.misses > 0
167        || stats.failures > 0
168        || stats.timeouts > 0
169}
170
171fn adaptive_source_score(stats: &AdaptiveSourceStats, now: Instant) -> f64 {
172    if let Some(backed_off_until) = stats.backed_off_until {
173        if backed_off_until > now {
174            return f64::NEG_INFINITY;
175        }
176    }
177
178    let miss_penalty = if stats.requests > 0 {
179        (stats.misses as f64 / stats.requests as f64) * 0.15
180    } else {
181        0.0
182    };
183    let failure_penalty = if stats.requests > 0 {
184        ((stats.failures + stats.timeouts) as f64 / stats.requests as f64) * 0.3
185    } else {
186        0.0
187    };
188    let recency_bonus = if stats
189        .last_success_at
190        .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
191    {
192        0.1
193    } else {
194        0.0
195    };
196
197    0.6 * source_reliability_score(stats) + 0.3 * source_latency_score(stats) + recency_bonus
198        - miss_penalty
199        - failure_penalty
200}
201
202fn peer_endpoint_has_history(stats: &crate::peer_selector::PeerStats) -> bool {
203    stats.requests_sent > 0 || stats.successes > 0 || stats.failures > 0 || stats.timeouts > 0
204}
205
206fn peer_endpoint_score(stats: &crate::peer_selector::PeerStats, now: Instant) -> f64 {
207    if stats.backed_off_until.is_some_and(|until| until > now) {
208        return f64::NEG_INFINITY;
209    }
210
211    let miss_penalty = 0.0;
212    let failure_penalty = if stats.requests_sent > 0 {
213        ((stats.failures + stats.timeouts) as f64 / stats.requests_sent as f64) * 0.3
214    } else {
215        0.0
216    };
217    let recency_bonus = if stats
218        .last_success
219        .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
220    {
221        0.1
222    } else {
223        0.0
224    };
225
226    0.6 * stats.success_rate()
227        + 0.3
228            * source_latency_score(&AdaptiveSourceStats {
229                srtt_ms: stats.srtt_ms,
230                ..AdaptiveSourceStats::default()
231            })
232        + recency_bonus
233        - miss_penalty
234        - failure_penalty
235}
236
237#[derive(Clone)]
238enum ReadRoute {
239    Peers(Vec<String>),
240    Sources,
241}
242
243impl ReadRoute {
244    fn id(&self) -> &'static str {
245        match self {
246            Self::Peers(_) => "peers",
247            Self::Sources => "sources",
248        }
249    }
250}
251
252struct RankedReadRoute {
253    route: ReadRoute,
254    best_endpoint_id: String,
255    score: f64,
256    has_history: bool,
257}
258
259fn ranked_route_kind(route: &ReadRoute) -> u8 {
260    match route {
261        ReadRoute::Sources => 0,
262        ReadRoute::Peers(_) => 1,
263    }
264}
265
266#[derive(Debug, Clone)]
267struct MeshReadContext {
268    exclude_peer_id: Option<String>,
269    request_htl: u8,
270}
271
272impl Default for MeshReadContext {
273    fn default() -> Self {
274        Self {
275            exclude_peer_id: None,
276            request_htl: MAX_HTL,
277        }
278    }
279}
280
281/// Aggregate stats from draining currently available peer-link messages.
282#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
283pub struct DataPumpStats {
284    pub processed: usize,
285    pub request_messages: usize,
286    pub response_messages: usize,
287    pub quote_request_messages: u64,
288    pub quote_response_messages: u64,
289    pub pubsub_interest_messages: u64,
290    pub pubsub_frame_messages: u64,
291    pub pubsub_inventory_messages: u64,
292    pub pubsub_want_messages: u64,
293    pub processed_bytes: u64,
294}
295
296/// Pubsub data delivered to a local subscription.
297#[derive(Debug, Clone, PartialEq, Eq)]
298pub struct PubsubEvent {
299    pub stream_id: String,
300    pub seq: u64,
301    pub origin_peer_id: String,
302    pub from_peer_id: String,
303    pub payload: Vec<u8>,
304}
305
306/// Send-side accounting from a pubsub publish or forwarded pubsub message.
307#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
308pub struct PubsubPublishStats {
309    pub selected_peers: usize,
310    pub sent_peers: usize,
311    pub sent_bytes: u64,
312    pub deferred_peers: usize,
313}
314
315/// Production pubsub delivery strategy.
316#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
317pub enum PubsubDeliveryMode {
318    /// Push full frames only along advertised interest routes.
319    InterestPush,
320    /// Flood small inventories by HTL and pull payloads back along want paths.
321    #[default]
322    HtlInvWant,
323}
324
325/// Request dispatch strategy for peer queries.
326///
327/// `MeshStoreCore` supports two practical retrieval modes:
328/// - Flood (`usize::MAX` fanout): maximize success/latency at bandwidth cost.
329/// - Staged hedging: probe a subset first, then expand.
330#[derive(Debug, Clone, Copy)]
331pub struct RequestDispatchConfig {
332    /// Number of peers queried immediately.
333    pub initial_fanout: usize,
334    /// Number of additional peers to query on each hedge step.
335    pub hedge_fanout: usize,
336    /// Total peers allowed for this request.
337    pub max_fanout: usize,
338    /// Delay between hedge waves (ms). `0` means send all waves immediately.
339    pub hedge_interval_ms: u64,
340}
341
342impl Default for RequestDispatchConfig {
343    fn default() -> Self {
344        Self {
345            initial_fanout: usize::MAX,
346            hedge_fanout: usize::MAX,
347            max_fanout: usize::MAX,
348            hedge_interval_ms: 0,
349        }
350    }
351}
352
353/// Normalize fanout config against current peer availability.
354pub fn normalize_dispatch_config(
355    dispatch: RequestDispatchConfig,
356    available_peers: usize,
357) -> RequestDispatchConfig {
358    let mut cfg = dispatch;
359    let cap = if cfg.max_fanout == 0 {
360        available_peers
361    } else {
362        cfg.max_fanout.min(available_peers)
363    };
364    cfg.max_fanout = cap;
365    cfg.initial_fanout = if cfg.initial_fanout == 0 {
366        1
367    } else {
368        cfg.initial_fanout.min(cap.max(1))
369    };
370    cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
371        1
372    } else {
373        cfg.hedge_fanout.min(cap.max(1))
374    };
375    cfg
376}
377
378/// Build wave sizes for staged hedged dispatch.
379pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
380    if peer_count == 0 {
381        return Vec::new();
382    }
383    let cap = dispatch.max_fanout.min(peer_count);
384    if cap == 0 {
385        return Vec::new();
386    }
387
388    let mut plan = Vec::new();
389    let mut sent = 0usize;
390    let first = dispatch.initial_fanout.min(cap).max(1);
391    plan.push(first);
392    sent += first;
393
394    while sent < cap {
395        let next = dispatch.hedge_fanout.min(cap - sent).max(1);
396        plan.push(next);
397        sent += next;
398    }
399    plan
400}
401
402/// Outcome returned after waiting on a hedged dispatch wave.
403#[derive(Debug)]
404pub enum HedgedWaveAction<T> {
405    Continue,
406    Success(T),
407    Abort,
408}
409
410/// Run a staged hedged dispatch over peer index ranges.
411///
412/// This scheduler is shared by the reusable `MeshStoreCore` and the native
413/// `hashtree-cli` mesh path so tests and production use the same wave timing.
414pub async fn run_hedged_waves<T, SendWave, SendWaveFut, WaitWave, WaitWaveFut>(
415    peer_count: usize,
416    dispatch: RequestDispatchConfig,
417    request_timeout: Duration,
418    mut send_wave: SendWave,
419    mut wait_wave: WaitWave,
420) -> Option<T>
421where
422    SendWave: FnMut(Range<usize>) -> SendWaveFut,
423    SendWaveFut: Future<Output = usize>,
424    WaitWave: FnMut(Duration) -> WaitWaveFut,
425    WaitWaveFut: Future<Output = HedgedWaveAction<T>>,
426{
427    let dispatch = normalize_dispatch_config(dispatch, peer_count);
428    let wave_plan = build_hedged_wave_plan(peer_count, dispatch);
429    if wave_plan.is_empty() {
430        return None;
431    }
432
433    let deadline = Instant::now() + request_timeout;
434    let mut sent_total = 0usize;
435    let mut next_peer_idx = 0usize;
436
437    for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
438        let from = next_peer_idx;
439        let to = (next_peer_idx + wave_size).min(peer_count);
440        next_peer_idx = to;
441
442        if from == to {
443            continue;
444        }
445
446        sent_total += send_wave(from..to).await;
447        if sent_total == 0 {
448            if next_peer_idx >= peer_count {
449                break;
450            }
451            continue;
452        }
453
454        let now = Instant::now();
455        if now >= deadline {
456            break;
457        }
458        let remaining = deadline.saturating_duration_since(now);
459        let is_last_wave = wave_idx + 1 == wave_plan.len() || next_peer_idx >= peer_count;
460        let wait = if is_last_wave {
461            remaining
462        } else if dispatch.hedge_interval_ms == 0 {
463            Duration::ZERO
464        } else {
465            Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
466        };
467
468        if wait.is_zero() {
469            continue;
470        }
471
472        match wait_wave(wait).await {
473            HedgedWaveAction::Continue => {}
474            HedgedWaveAction::Success(value) => return Some(value),
475            HedgedWaveAction::Abort => break,
476        }
477    }
478
479    None
480}
481
482/// Keep selector membership aligned with currently connected peer IDs.
483pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
484    let mut selector = selector.write().await;
485    let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
486    let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
487    for peer_id in known {
488        if !current.contains(peer_id.as_str()) {
489            selector.remove_peer(&peer_id);
490        }
491    }
492    for peer_id in current_peer_ids {
493        selector.add_peer(peer_id.clone());
494    }
495}
496
497/// Response behavior profile for simulation/game-theory actors.
498///
499/// Defaults to honest behavior (always respond correctly, no extra delay).
500#[derive(Debug, Clone, Copy)]
501pub struct ResponseBehaviorConfig {
502    /// Probability that a node drops a response even when it has data.
503    pub drop_response_prob: f64,
504    /// Probability that a node responds with corrupted payload.
505    pub corrupt_response_prob: f64,
506    /// Baseline response delay before a peer starts sending any data.
507    pub extra_delay_ms: u64,
508    /// Additional delay before the first response byte becomes available.
509    pub first_byte_delay_ms: u64,
510    /// Sustained throughput for delivering large payloads. `0` disables size-based slowdown.
511    pub bytes_per_second: u64,
512    /// Probability that an otherwise honest response experiences an extra stall.
513    pub stall_response_prob: f64,
514    /// Extra delay injected when a stall event happens.
515    pub stall_delay_ms: u64,
516}
517
518impl Default for ResponseBehaviorConfig {
519    fn default() -> Self {
520        Self {
521            drop_response_prob: 0.0,
522            corrupt_response_prob: 0.0,
523            extra_delay_ms: 0,
524            first_byte_delay_ms: 0,
525            bytes_per_second: 0,
526            stall_response_prob: 0.0,
527            stall_delay_ms: 0,
528        }
529    }
530}
531
532impl ResponseBehaviorConfig {
533    fn normalized(self) -> Self {
534        Self {
535            drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
536            corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
537            extra_delay_ms: self.extra_delay_ms,
538            first_byte_delay_ms: self.first_byte_delay_ms,
539            bytes_per_second: self.bytes_per_second,
540            stall_response_prob: self.stall_response_prob.clamp(0.0, 1.0),
541            stall_delay_ms: self.stall_delay_ms,
542        }
543    }
544}
545
546/// Routing policy for request ordering + dispatch fanout.
547#[derive(Debug, Clone)]
548pub struct MeshRoutingConfig {
549    pub selection_strategy: SelectionStrategy,
550    pub fairness_enabled: bool,
551    /// Blend weight for payment-priority ranking in selector (`0.0` disables).
552    pub cashu_payment_weight: f64,
553    /// Refuse serving peers that have reached this many unpaid post-delivery settlements.
554    /// `0` disables refusal and only keeps metadata/downranking.
555    pub cashu_payment_default_block_threshold: u64,
556    /// Cashu mint URLs this node is willing to use for settlement.
557    pub cashu_accepted_mints: Vec<String>,
558    /// Preferred Cashu mint URL when initiating paid retrieval.
559    pub cashu_default_mint: Option<String>,
560    /// Baseline cap for accepting a peer-suggested mint outside the trusted set.
561    pub cashu_peer_suggested_mint_base_cap_sat: u64,
562    /// Additional sats allowed per successful delivery from that peer.
563    pub cashu_peer_suggested_mint_success_step_sat: u64,
564    /// Additional sats allowed per successful post-delivery payment received from that peer.
565    pub cashu_peer_suggested_mint_receipt_step_sat: u64,
566    /// Hard upper bound for any single peer-suggested mint quote we accept.
567    pub cashu_peer_suggested_mint_max_cap_sat: u64,
568    pub dispatch: RequestDispatchConfig,
569    pub response_behavior: ResponseBehaviorConfig,
570    pub pubsub_scheduler: PubsubSchedulerConfig,
571    pub pubsub_delivery_mode: PubsubDeliveryMode,
572}
573
574impl Default for MeshRoutingConfig {
575    fn default() -> Self {
576        Self {
577            selection_strategy: SelectionStrategy::Weighted,
578            fairness_enabled: true,
579            cashu_payment_weight: 0.0,
580            cashu_payment_default_block_threshold: 0,
581            cashu_accepted_mints: Vec::new(),
582            cashu_default_mint: None,
583            cashu_peer_suggested_mint_base_cap_sat: 0,
584            cashu_peer_suggested_mint_success_step_sat: 0,
585            cashu_peer_suggested_mint_receipt_step_sat: 0,
586            cashu_peer_suggested_mint_max_cap_sat: 0,
587            dispatch: RequestDispatchConfig::default(),
588            response_behavior: ResponseBehaviorConfig::default(),
589            pubsub_scheduler: PubsubSchedulerConfig::default(),
590            pubsub_delivery_mode: PubsubDeliveryMode::HtlInvWant,
591        }
592    }
593}
594
595/// Routed mesh store core that works with any storage backend and transport
596/// implementation.
597///
598/// This is the shared code between production and simulation.
599/// - Production: transport-specific crates compose `MeshStoreCore` with their links
600/// - Simulation: `MeshStoreCore<MemoryStore, MockRelayTransport, MockConnectionFactory>`
601pub struct MeshStoreCore<S, R, F>
602where
603    S: Store + Send + Sync + 'static,
604    R: SignalingTransport + Send + Sync + 'static,
605    F: PeerLinkFactory + Send + Sync + 'static,
606{
607    /// Local backing store
608    local_store: Arc<S>,
609    /// Mesh router (handles peer discovery and connection)
610    signaling: Arc<MeshRouter<R, F>>,
611    /// Per-peer HTL config
612    htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
613    /// Pending requests we sent
614    pending_requests: RwLock<HashMap<String, PendingRequest>>,
615    /// Pending quote negotiations keyed by requested hash.
616    pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
617    /// Forwarded peer requests currently being resolved through the mesh/upstream.
618    pending_forward_requests: RwLock<HashMap<String, PendingForwardRequest>>,
619    /// Bounded negative cache for recently forwarded misses/timeouts.
620    recent_forward_misses: Mutex<TimedSeenSet>,
621    /// Quotes we issued to peers and will accept exactly once until expiry.
622    issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
623    /// Monotonic quote identifier generator.
624    next_quote_id: RwLock<u64>,
625    /// Non-peer read sources such as upstream Blossom servers.
626    read_sources: RwLock<HashMap<String, Arc<dyn MeshReadSource>>>,
627    /// Adaptive health stats for non-peer read sources.
628    read_source_stats: RwLock<HashMap<String, AdaptiveSourceStats>>,
629    /// Shared in-flight upstream reads keyed by hash.
630    inflight_source_fetches: Mutex<HashMap<String, InflightSourceFetch>>,
631    /// Adaptive selector for peer ordering.
632    peer_selector: RwLock<PeerSelector>,
633    /// Active per-peer in-flight reads so concurrent block fetches spread across peers.
634    peer_active_requests: RwLock<HashMap<String, usize>>,
635    /// Actual wire traffic stats used for upload-side reciprocity scheduling.
636    peer_wire_stats: RwLock<HashMap<String, PeerWireStats>>,
637    /// Streams this node wants delivered locally.
638    pubsub_local_interests: RwLock<HashSet<String>>,
639    /// Current sequence per local stream interest.
640    pubsub_local_interest_versions: RwLock<HashMap<String, u64>>,
641    /// Reverse pubsub routes: stream id -> peers with local/downstream interest.
642    pubsub_peer_interests: RwLock<HashMap<String, HashSet<String>>>,
643    /// Route owner for each downstream subscriber interest.
644    pubsub_interest_routes: RwLock<HashMap<(String, String), String>>,
645    /// Latest interest sequence observed per subscriber/stream.
646    pubsub_interest_versions: RwLock<HashMap<(String, String), u64>>,
647    /// Bounded dedupe for pubsub interest floods.
648    pubsub_seen_interests: Mutex<TimedSeenSet>,
649    /// Bounded dedupe for pubsub data frames.
650    pubsub_seen_frames: Mutex<TimedSeenSet>,
651    /// Bounded dedupe for pubsub inventory floods.
652    pubsub_seen_inventories: Mutex<TimedSeenSet>,
653    /// Bounded dedupe for pubsub wants by requesting peer.
654    pubsub_seen_wants: Mutex<TimedSeenSet>,
655    /// First upstream peer that announced each inventory key.
656    pubsub_inventory_routes: RwLock<HashMap<String, String>>,
657    /// Downstream peers waiting for a payload after sending a want.
658    pubsub_want_routes: RwLock<HashMap<String, HashSet<String>>>,
659    /// Dedupe for wants this node already sent upstream.
660    pubsub_upstream_wants: Mutex<TimedSeenSet>,
661    /// Small payload cache for serving wants after inventory-first announcements.
662    pubsub_frame_cache: Mutex<VecDeque<(String, PubsubFrame)>>,
663    /// Local pubsub delivery inbox.
664    pubsub_inbox: Mutex<VecDeque<PubsubEvent>>,
665    /// Per stream/peer deferred counts for aging pubsub strategies.
666    pubsub_deferred_counts: RwLock<HashMap<(String, String), u64>>,
667    /// Monotonic sequence for locally originated pubsub interest updates.
668    next_pubsub_interest_seq: AtomicU64,
669    /// Pending content responses waiting for upload arbitration.
670    pending_response_sends: Mutex<Vec<PendingResponseSend>>,
671    /// Upload response scheduler state.
672    response_scheduler_running: AtomicBool,
673    /// Monotonic id for queued response sends.
674    next_response_job_id: AtomicU64,
675    /// Routing/dispatch configuration.
676    routing: MeshRoutingConfig,
677    /// Request timeout
678    request_timeout: Duration,
679    /// Debug mode
680    debug: bool,
681    /// Running flag
682    running: RwLock<bool>,
683}
684
685impl<S, R, F> MeshStoreCore<S, R, F>
686where
687    S: Store + Send + Sync + 'static,
688    R: SignalingTransport + Send + Sync + 'static,
689    F: PeerLinkFactory + Send + Sync + 'static,
690{
691    /// Create a new routed mesh store core.
692    pub fn new(
693        local_store: Arc<S>,
694        signaling: Arc<MeshRouter<R, F>>,
695        request_timeout: Duration,
696        debug: bool,
697    ) -> Self {
698        Self::new_with_routing(
699            local_store,
700            signaling,
701            request_timeout,
702            debug,
703            Default::default(),
704        )
705    }
706
707    /// Create a new routed mesh store core with explicit routing configuration.
708    pub fn new_with_routing(
709        local_store: Arc<S>,
710        signaling: Arc<MeshRouter<R, F>>,
711        request_timeout: Duration,
712        debug: bool,
713        routing: MeshRoutingConfig,
714    ) -> Self {
715        let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
716        selector.set_fairness(routing.fairness_enabled);
717        selector.set_cashu_payment_weight(routing.cashu_payment_weight);
718        Self {
719            local_store,
720            signaling,
721            htl_configs: RwLock::new(HashMap::new()),
722            pending_requests: RwLock::new(HashMap::new()),
723            pending_quotes: RwLock::new(HashMap::new()),
724            pending_forward_requests: RwLock::new(HashMap::new()),
725            recent_forward_misses: Mutex::new(TimedSeenSet::new(
726                RECENT_FORWARD_MISS_CAPACITY,
727                Self::recent_forward_miss_ttl(request_timeout),
728            )),
729            issued_quotes: RwLock::new(HashMap::new()),
730            next_quote_id: RwLock::new(1),
731            read_sources: RwLock::new(HashMap::new()),
732            read_source_stats: RwLock::new(HashMap::new()),
733            inflight_source_fetches: Mutex::new(HashMap::new()),
734            peer_selector: RwLock::new(selector),
735            peer_active_requests: RwLock::new(HashMap::new()),
736            peer_wire_stats: RwLock::new(HashMap::new()),
737            pubsub_local_interests: RwLock::new(HashSet::new()),
738            pubsub_local_interest_versions: RwLock::new(HashMap::new()),
739            pubsub_peer_interests: RwLock::new(HashMap::new()),
740            pubsub_interest_routes: RwLock::new(HashMap::new()),
741            pubsub_interest_versions: RwLock::new(HashMap::new()),
742            pubsub_seen_interests: Mutex::new(TimedSeenSet::new(
743                PUBSUB_SEEN_CAPACITY,
744                PUBSUB_SEEN_TTL,
745            )),
746            pubsub_seen_frames: Mutex::new(TimedSeenSet::new(
747                PUBSUB_SEEN_CAPACITY,
748                PUBSUB_SEEN_TTL,
749            )),
750            pubsub_seen_inventories: Mutex::new(TimedSeenSet::new(
751                PUBSUB_SEEN_CAPACITY,
752                PUBSUB_SEEN_TTL,
753            )),
754            pubsub_seen_wants: Mutex::new(TimedSeenSet::new(PUBSUB_SEEN_CAPACITY, PUBSUB_SEEN_TTL)),
755            pubsub_inventory_routes: RwLock::new(HashMap::new()),
756            pubsub_want_routes: RwLock::new(HashMap::new()),
757            pubsub_upstream_wants: Mutex::new(TimedSeenSet::new(
758                PUBSUB_SEEN_CAPACITY,
759                PUBSUB_SEEN_TTL,
760            )),
761            pubsub_frame_cache: Mutex::new(VecDeque::new()),
762            pubsub_inbox: Mutex::new(VecDeque::new()),
763            pubsub_deferred_counts: RwLock::new(HashMap::new()),
764            next_pubsub_interest_seq: AtomicU64::new(1),
765            pending_response_sends: Mutex::new(Vec::new()),
766            response_scheduler_running: AtomicBool::new(false),
767            next_response_job_id: AtomicU64::new(1),
768            routing,
769            request_timeout,
770            debug,
771            running: RwLock::new(false),
772        }
773    }
774
775    fn recent_forward_miss_ttl(request_timeout: Duration) -> Duration {
776        let ttl_ms = request_timeout
777            .as_millis()
778            .saturating_mul(2)
779            .max(MIN_RECENT_FORWARD_MISS_TTL_MS as u128)
780            .min(u64::MAX as u128) as u64;
781        Duration::from_millis(ttl_ms)
782    }
783
784    /// Start the store (begin listening for messages)
785    pub async fn start(&self) -> Result<(), TransportError> {
786        *self.running.write().await = true;
787
788        // Send initial hello
789        self.signaling.send_hello(vec![]).await?;
790
791        Ok(())
792    }
793
794    /// Stop the store
795    pub async fn stop(&self) {
796        *self.running.write().await = false;
797    }
798
799    /// Process incoming signaling message
800    pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
801        // When a new peer connects, initialize their HTL config
802        let peer_id = msg.peer_id().to_string();
803        {
804            let mut configs = self.htl_configs.write().await;
805            if !configs.contains_key(&peer_id) {
806                configs.insert(peer_id.clone(), PeerHTLConfig::random());
807            }
808        }
809        self.peer_selector.write().await.add_peer(peer_id.clone());
810
811        let result = self.signaling.handle_message(msg).await;
812        if result.is_ok() {
813            self.announce_pubsub_interests_to_peer(&peer_id).await;
814        }
815        result
816    }
817
818    /// Get signaling manager reference
819    pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
820        &self.signaling
821    }
822
823    fn response_behavior(&self) -> ResponseBehaviorConfig {
824        self.routing.response_behavior.normalized()
825    }
826
827    async fn record_peer_wire_sent(&self, peer_id: &str, bytes: u64) {
828        if bytes == 0 {
829            return;
830        }
831        let mut stats = self.peer_wire_stats.write().await;
832        let entry = stats.entry(peer_id.to_string()).or_default();
833        entry.bytes_sent = entry.bytes_sent.saturating_add(bytes);
834    }
835
836    async fn record_peer_wire_received(&self, peer_id: &str, bytes: u64) {
837        if bytes == 0 {
838            return;
839        }
840        let mut stats = self.peer_wire_stats.write().await;
841        let entry = stats.entry(peer_id.to_string()).or_default();
842        entry.bytes_received = entry.bytes_received.saturating_add(bytes);
843    }
844
845    /// Record ingress from a peer that matched local or downstream demand.
846    ///
847    /// Raw bytes are tracked separately in `record_peer_wire_received`; this
848    /// counter is the reciprocity signal used by shared outbound scheduling.
849    pub async fn record_useful_bytes_received_from_peer(&self, peer_id: &str, bytes: u64) {
850        if bytes == 0 {
851            return;
852        }
853        let mut stats = self.peer_wire_stats.write().await;
854        let entry = stats.entry(peer_id.to_string()).or_default();
855        entry.useful_bytes_received = entry.useful_bytes_received.saturating_add(bytes);
856    }
857
858    /// Snapshot peer traffic for production pubsub scheduling or diagnostics.
859    pub async fn peer_traffic_snapshot(&self, peer_id: &str) -> PeerTrafficSnapshot {
860        self.peer_wire_stats
861            .read()
862            .await
863            .get(peer_id)
864            .copied()
865            .unwrap_or_default()
866    }
867
868    /// Snapshot all known peer traffic for production pubsub scheduling.
869    pub async fn peer_traffic_snapshots(&self) -> HashMap<String, PeerTrafficSnapshot> {
870        self.peer_wire_stats.read().await.clone()
871    }
872
873    fn pubsub_key(origin_peer_id: &str, stream_id: &str, seq: u64) -> String {
874        format!("{origin_peer_id}:{stream_id}:{seq}")
875    }
876
877    fn pubsub_frame_key(frame: &PubsubFrame) -> String {
878        Self::pubsub_key(&frame.origin_peer_id, &frame.stream_id, frame.seq)
879    }
880
881    fn pubsub_interest_key(interest: &PubsubInterest) -> String {
882        format!(
883            "{}:{}:{}:{}",
884            interest.subscriber_peer_id, interest.stream_id, interest.seq, interest.active
885        )
886    }
887
888    fn next_pubsub_interest_seq(&self) -> u64 {
889        self.next_pubsub_interest_seq
890            .fetch_add(1, Ordering::Relaxed)
891    }
892
893    async fn record_peer_pubsub_wire_sent(&self, peer_id: &str, bytes: u64, bandwidth_debt: f64) {
894        if bytes == 0 {
895            return;
896        }
897        let mut stats = self.peer_wire_stats.write().await;
898        let entry = stats.entry(peer_id.to_string()).or_default();
899        entry.bytes_sent = entry.bytes_sent.saturating_add(bytes);
900        entry.bandwidth_debt = bandwidth_debt;
901    }
902
903    async fn send_pubsub_interest_to_peers(
904        &self,
905        interest: &PubsubInterest,
906        exclude_peer_id: Option<&str>,
907    ) -> PubsubPublishStats {
908        if !should_forward_htl(interest.htl) {
909            return PubsubPublishStats::default();
910        }
911
912        let mut peer_ids = self.signaling.peer_ids().await;
913        peer_ids.sort();
914        peer_ids.retain(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude));
915
916        let bytes = encode_pubsub_interest(interest);
917        let mut stats = PubsubPublishStats {
918            selected_peers: peer_ids.len(),
919            ..Default::default()
920        };
921        for peer_id in peer_ids {
922            let Some(channel) = self.signaling.get_channel(&peer_id).await else {
923                continue;
924            };
925            if channel.send(bytes.clone()).await.is_ok() {
926                stats.sent_peers += 1;
927                stats.sent_bytes = stats.sent_bytes.saturating_add(bytes.len() as u64);
928                self.record_peer_wire_sent(&peer_id, bytes.len() as u64)
929                    .await;
930            }
931        }
932        stats
933    }
934
935    async fn announce_pubsub_interests_to_peer(&self, peer_id: &str) {
936        let mut interests = self
937            .pubsub_local_interests
938            .read()
939            .await
940            .iter()
941            .cloned()
942            .collect::<Vec<_>>();
943        interests.sort();
944        if interests.is_empty() {
945            return;
946        }
947
948        let interests = {
949            let versions = self.pubsub_local_interest_versions.read().await;
950            interests
951                .into_iter()
952                .filter_map(|stream_id| {
953                    versions
954                        .get(&stream_id)
955                        .copied()
956                        .map(|seq| (stream_id, seq))
957                })
958                .collect::<Vec<_>>()
959        };
960
961        for (stream_id, seq) in interests {
962            let interest = create_pubsub_interest(
963                stream_id,
964                self.signaling.peer_id().to_string(),
965                seq,
966                true,
967                MAX_HTL,
968            );
969            let Some(channel) = self.signaling.get_channel(peer_id).await else {
970                continue;
971            };
972            let bytes = encode_pubsub_interest(&interest);
973            if channel.send(bytes.clone()).await.is_ok() {
974                self.record_peer_wire_sent(peer_id, bytes.len() as u64)
975                    .await;
976            }
977        }
978    }
979
980    fn remove_pubsub_peer_interest(
981        peer_interests: &mut HashMap<String, HashSet<String>>,
982        routes: &HashMap<(String, String), String>,
983        stream_id: &str,
984        peer_id: &str,
985    ) {
986        let still_has_route = routes
987            .iter()
988            .any(|((stream, _subscriber), peer)| stream == stream_id && peer == peer_id);
989        if still_has_route {
990            return;
991        }
992        if let Some(peers) = peer_interests.get_mut(stream_id) {
993            peers.remove(peer_id);
994            if peers.is_empty() {
995                peer_interests.remove(stream_id);
996            }
997        }
998    }
999
1000    async fn apply_pubsub_interest_route(
1001        &self,
1002        from_peer: &str,
1003        interest: &PubsubInterest,
1004    ) -> bool {
1005        if interest.stream_id.is_empty() || interest.subscriber_peer_id.is_empty() {
1006            return false;
1007        }
1008        if interest.subscriber_peer_id == self.signaling.peer_id() {
1009            return false;
1010        }
1011
1012        let interest_key = Self::pubsub_interest_key(interest);
1013        if !self
1014            .pubsub_seen_interests
1015            .lock()
1016            .await
1017            .insert_if_new(interest_key)
1018        {
1019            return false;
1020        }
1021
1022        let route_key = (
1023            interest.stream_id.clone(),
1024            interest.subscriber_peer_id.clone(),
1025        );
1026        {
1027            let mut versions = self.pubsub_interest_versions.write().await;
1028            if versions
1029                .get(&route_key)
1030                .is_some_and(|latest| *latest >= interest.seq)
1031            {
1032                return false;
1033            }
1034            versions.insert(route_key.clone(), interest.seq);
1035        }
1036
1037        let mut peer_interests = self.pubsub_peer_interests.write().await;
1038        let mut routes = self.pubsub_interest_routes.write().await;
1039        if interest.active {
1040            if let Some(previous_peer) = routes.insert(route_key, from_peer.to_string()) {
1041                if previous_peer != from_peer {
1042                    Self::remove_pubsub_peer_interest(
1043                        &mut peer_interests,
1044                        &routes,
1045                        &interest.stream_id,
1046                        &previous_peer,
1047                    );
1048                }
1049            }
1050            peer_interests
1051                .entry(interest.stream_id.clone())
1052                .or_default()
1053                .insert(from_peer.to_string());
1054        } else if let Some(previous_peer) = routes.remove(&route_key) {
1055            Self::remove_pubsub_peer_interest(
1056                &mut peer_interests,
1057                &routes,
1058                &interest.stream_id,
1059                &previous_peer,
1060            );
1061        } else {
1062            Self::remove_pubsub_peer_interest(
1063                &mut peer_interests,
1064                &routes,
1065                &interest.stream_id,
1066                from_peer,
1067            );
1068        }
1069
1070        true
1071    }
1072
1073    async fn interested_pubsub_peers(
1074        &self,
1075        stream_id: &str,
1076        exclude_peer_id: Option<&str>,
1077    ) -> Vec<String> {
1078        let connected = self
1079            .signaling
1080            .peer_ids()
1081            .await
1082            .into_iter()
1083            .collect::<HashSet<_>>();
1084        let mut peers = self
1085            .pubsub_peer_interests
1086            .read()
1087            .await
1088            .get(stream_id)
1089            .map(|peers| peers.iter().cloned().collect::<Vec<_>>())
1090            .unwrap_or_default();
1091        peers.retain(|peer_id| {
1092            connected.contains(peer_id) && exclude_peer_id.is_none_or(|exclude| peer_id != exclude)
1093        });
1094        peers.sort();
1095        peers
1096    }
1097
1098    async fn decrement_pubsub_htl_for_peer(&self, peer_id: &str, htl: u8) -> u8 {
1099        let htl_config = {
1100            let configs = self.htl_configs.read().await;
1101            configs
1102                .get(peer_id)
1103                .cloned()
1104                .unwrap_or_else(PeerHTLConfig::random)
1105        };
1106        htl_config.decrement_with_policy(htl, &MESH_EVENT_POLICY)
1107    }
1108
1109    async fn send_pubsub_inventory_to_peers(
1110        &self,
1111        inv: &PubsubInventory,
1112        peer_ids: &[String],
1113    ) -> PubsubPublishStats {
1114        if peer_ids.is_empty() || !should_forward_htl(inv.htl) {
1115            return PubsubPublishStats::default();
1116        }
1117
1118        let mut stats = PubsubPublishStats {
1119            selected_peers: peer_ids.len(),
1120            ..Default::default()
1121        };
1122        for peer_id in peer_ids {
1123            let send_htl = self.decrement_pubsub_htl_for_peer(peer_id, inv.htl).await;
1124            if !should_forward_htl(send_htl) {
1125                continue;
1126            }
1127            let Some(channel) = self.signaling.get_channel(peer_id).await else {
1128                continue;
1129            };
1130            let mut outgoing = inv.clone();
1131            outgoing.htl = send_htl;
1132            let bytes = encode_pubsub_inventory(&outgoing);
1133            let message_bytes = bytes.len() as u64;
1134            if channel.send(bytes).await.is_ok() {
1135                stats.sent_peers += 1;
1136                stats.sent_bytes = stats.sent_bytes.saturating_add(message_bytes);
1137                self.record_peer_wire_sent(peer_id, message_bytes).await;
1138            }
1139        }
1140        stats
1141    }
1142
1143    async fn flood_pubsub_inventory(
1144        &self,
1145        inv: &PubsubInventory,
1146        exclude_peer_id: Option<&str>,
1147    ) -> PubsubPublishStats {
1148        let mut peer_ids = self.signaling.peer_ids().await;
1149        peer_ids.sort();
1150        peer_ids.retain(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude));
1151        self.send_pubsub_inventory_to_peers(inv, &peer_ids).await
1152    }
1153
1154    async fn send_pubsub_want_to_peer(&self, want: &PubsubWant, peer_id: &str) -> bool {
1155        let Some(channel) = self.signaling.get_channel(peer_id).await else {
1156            return false;
1157        };
1158        let bytes = encode_pubsub_want(want);
1159        let message_bytes = bytes.len() as u64;
1160        match channel.send(bytes).await {
1161            Ok(()) => {
1162                self.record_peer_wire_sent(peer_id, message_bytes).await;
1163                true
1164            }
1165            Err(_) => false,
1166        }
1167    }
1168
1169    async fn send_pubsub_want_upstream(
1170        &self,
1171        key: &str,
1172        want: &PubsubWant,
1173        exclude_peer_id: Option<&str>,
1174    ) -> bool {
1175        let upstream = {
1176            let routes = self.pubsub_inventory_routes.read().await;
1177            routes.get(key).cloned()
1178        };
1179        let Some(upstream) = upstream else {
1180            return false;
1181        };
1182        if exclude_peer_id.is_some_and(|exclude| exclude == upstream) {
1183            return false;
1184        }
1185        let want_key = format!("{key}:{upstream}");
1186        if !self
1187            .pubsub_upstream_wants
1188            .lock()
1189            .await
1190            .insert_if_new(want_key)
1191        {
1192            return false;
1193        }
1194        self.send_pubsub_want_to_peer(want, &upstream).await
1195    }
1196
1197    async fn cache_pubsub_frame(&self, key: String, frame: PubsubFrame) {
1198        let mut cache = self.pubsub_frame_cache.lock().await;
1199        if let Some(index) = cache.iter().position(|(cached_key, _)| cached_key == &key) {
1200            cache.remove(index);
1201        }
1202        cache.push_back((key, frame));
1203        while cache.len() > PUBSUB_FRAME_CACHE_CAPACITY {
1204            cache.pop_front();
1205        }
1206    }
1207
1208    async fn cached_pubsub_frame(&self, key: &str) -> Option<PubsubFrame> {
1209        self.pubsub_frame_cache
1210            .lock()
1211            .await
1212            .iter()
1213            .find_map(|(cached_key, frame)| {
1214                if cached_key == key {
1215                    Some(frame.clone())
1216                } else {
1217                    None
1218                }
1219            })
1220    }
1221
1222    async fn remember_pubsub_want_peer(&self, key: String, from_peer: &str) -> bool {
1223        let mut routes = self.pubsub_want_routes.write().await;
1224        routes.entry(key).or_default().insert(from_peer.to_string())
1225    }
1226
1227    async fn take_pubsub_want_peers(
1228        &self,
1229        key: &str,
1230        exclude_peer_id: Option<&str>,
1231    ) -> Vec<String> {
1232        let connected = self
1233            .signaling
1234            .peer_ids()
1235            .await
1236            .into_iter()
1237            .collect::<HashSet<_>>();
1238        let mut peers = self
1239            .pubsub_want_routes
1240            .write()
1241            .await
1242            .remove(key)
1243            .map(|peers| peers.into_iter().collect::<Vec<_>>())
1244            .unwrap_or_default();
1245        peers.retain(|peer_id| {
1246            connected.contains(peer_id) && exclude_peer_id.is_none_or(|exclude| peer_id != exclude)
1247        });
1248        peers.sort();
1249        peers
1250    }
1251
1252    async fn select_pubsub_peers(
1253        &self,
1254        stream_id: &str,
1255        seq: u64,
1256        message_bytes: u64,
1257        peer_ids: &[String],
1258    ) -> (Vec<String>, Vec<String>) {
1259        let traffic = self.peer_wire_stats.read().await;
1260        let deferred_counts = self.pubsub_deferred_counts.read().await;
1261        let candidates = peer_ids
1262            .iter()
1263            .map(|peer_id| PubsubCandidate {
1264                peer_id: peer_id.clone(),
1265                traffic: traffic.get(peer_id).copied().unwrap_or_default(),
1266                deferred_count: deferred_counts
1267                    .get(&(stream_id.to_string(), peer_id.clone()))
1268                    .copied()
1269                    .unwrap_or_default(),
1270            })
1271            .collect::<Vec<_>>();
1272        drop(deferred_counts);
1273        drop(traffic);
1274
1275        let selection = self.routing.pubsub_scheduler.select(
1276            stream_id,
1277            seq,
1278            self.signaling.peer_id(),
1279            message_bytes,
1280            &candidates,
1281        );
1282
1283        {
1284            let mut deferred_counts = self.pubsub_deferred_counts.write().await;
1285            for peer_id in &selection.deferred {
1286                *deferred_counts
1287                    .entry((stream_id.to_string(), peer_id.clone()))
1288                    .or_insert(0) += 1;
1289            }
1290            for peer_id in &selection.selected {
1291                deferred_counts.remove(&(stream_id.to_string(), peer_id.clone()));
1292            }
1293        }
1294
1295        (selection.selected, selection.deferred)
1296    }
1297
1298    async fn send_pubsub_frame_to_peers(
1299        &self,
1300        frame: &PubsubFrame,
1301        peer_ids: &[String],
1302    ) -> PubsubPublishStats {
1303        if peer_ids.is_empty() || !should_forward_htl(frame.htl) {
1304            return PubsubPublishStats::default();
1305        }
1306
1307        let bytes = encode_pubsub_frame(frame);
1308        let message_bytes = bytes.len() as u64;
1309        let (selected, deferred) = self
1310            .select_pubsub_peers(&frame.stream_id, frame.seq, message_bytes, peer_ids)
1311            .await;
1312        let mut stats = PubsubPublishStats {
1313            selected_peers: selected.len(),
1314            deferred_peers: deferred.len(),
1315            ..Default::default()
1316        };
1317
1318        for peer_id in selected {
1319            let Some(channel) = self.signaling.get_channel(&peer_id).await else {
1320                continue;
1321            };
1322            let snapshot = self.peer_traffic_snapshot(&peer_id).await;
1323            let bandwidth_debt = reciprocal_virtual_finish(snapshot, message_bytes);
1324            if channel.send(bytes.clone()).await.is_ok() {
1325                stats.sent_peers += 1;
1326                stats.sent_bytes = stats.sent_bytes.saturating_add(message_bytes);
1327                self.record_peer_pubsub_wire_sent(&peer_id, message_bytes, bandwidth_debt)
1328                    .await;
1329            }
1330        }
1331
1332        stats
1333    }
1334
1335    async fn enqueue_pubsub_event(&self, event: PubsubEvent) {
1336        let mut inbox = self.pubsub_inbox.lock().await;
1337        inbox.push_back(event);
1338        while inbox.len() > PUBSUB_INBOX_CAPACITY {
1339            inbox.pop_front();
1340        }
1341    }
1342
1343    /// Subscribe this node to a pubsub stream and advertise that interest.
1344    pub async fn subscribe_pubsub(
1345        self: &Arc<Self>,
1346        stream_id: impl Into<String>,
1347    ) -> PubsubPublishStats {
1348        let stream_id = stream_id.into();
1349        if stream_id.is_empty() {
1350            return PubsubPublishStats::default();
1351        }
1352        self.pubsub_local_interests
1353            .write()
1354            .await
1355            .insert(stream_id.clone());
1356        let seq = {
1357            let mut versions = self.pubsub_local_interest_versions.write().await;
1358            match versions.get(&stream_id).copied() {
1359                Some(seq) => seq,
1360                None => {
1361                    let seq = self.next_pubsub_interest_seq();
1362                    versions.insert(stream_id.clone(), seq);
1363                    seq
1364                }
1365            }
1366        };
1367        let interest = create_pubsub_interest(
1368            stream_id,
1369            self.signaling.peer_id().to_string(),
1370            seq,
1371            true,
1372            MAX_HTL,
1373        );
1374        self.send_pubsub_interest_to_peers(&interest, None).await
1375    }
1376
1377    /// Stop local delivery for a pubsub stream and advertise the withdrawn interest.
1378    pub async fn unsubscribe_pubsub(
1379        self: &Arc<Self>,
1380        stream_id: impl Into<String>,
1381    ) -> PubsubPublishStats {
1382        let stream_id = stream_id.into();
1383        if stream_id.is_empty() {
1384            return PubsubPublishStats::default();
1385        }
1386        self.pubsub_local_interests.write().await.remove(&stream_id);
1387        self.pubsub_local_interest_versions
1388            .write()
1389            .await
1390            .remove(&stream_id);
1391        let interest = create_pubsub_interest(
1392            stream_id,
1393            self.signaling.peer_id().to_string(),
1394            self.next_pubsub_interest_seq(),
1395            false,
1396            MAX_HTL,
1397        );
1398        self.send_pubsub_interest_to_peers(&interest, None).await
1399    }
1400
1401    /// Publish bytes on a pubsub stream through the configured mesh delivery mode.
1402    pub async fn publish_pubsub(
1403        self: &Arc<Self>,
1404        stream_id: impl Into<String>,
1405        seq: u64,
1406        payload: Vec<u8>,
1407    ) -> PubsubPublishStats {
1408        let stream_id = stream_id.into();
1409        if stream_id.is_empty() {
1410            return PubsubPublishStats::default();
1411        }
1412        let payload_bytes = payload.len() as u64;
1413        let frame = create_pubsub_frame(
1414            stream_id.clone(),
1415            seq,
1416            self.signaling.peer_id().to_string(),
1417            payload.clone(),
1418            MAX_HTL,
1419        );
1420        let frame_key = Self::pubsub_frame_key(&frame);
1421        self.pubsub_seen_frames
1422            .lock()
1423            .await
1424            .insert_if_new(frame_key.clone());
1425        self.cache_pubsub_frame(frame_key, frame.clone()).await;
1426
1427        if self
1428            .pubsub_local_interests
1429            .read()
1430            .await
1431            .contains(&stream_id)
1432        {
1433            self.enqueue_pubsub_event(PubsubEvent {
1434                stream_id: stream_id.clone(),
1435                seq,
1436                origin_peer_id: self.signaling.peer_id().to_string(),
1437                from_peer_id: self.signaling.peer_id().to_string(),
1438                payload,
1439            })
1440            .await;
1441        }
1442
1443        match self.routing.pubsub_delivery_mode {
1444            PubsubDeliveryMode::InterestPush => {
1445                let peers = self.interested_pubsub_peers(&stream_id, None).await;
1446                self.send_pubsub_frame_to_peers(&frame, &peers).await
1447            }
1448            PubsubDeliveryMode::HtlInvWant => {
1449                let inv = create_pubsub_inventory(
1450                    stream_id,
1451                    seq,
1452                    self.signaling.peer_id().to_string(),
1453                    payload_bytes,
1454                    MESH_EVENT_POLICY.max_htl,
1455                );
1456                self.flood_pubsub_inventory(&inv, None).await
1457            }
1458        }
1459    }
1460
1461    /// Drain locally delivered pubsub events.
1462    pub async fn drain_pubsub_events(&self) -> Vec<PubsubEvent> {
1463        self.pubsub_inbox.lock().await.drain(..).collect()
1464    }
1465
1466    /// Connected peers that currently have local or downstream interest in a stream.
1467    pub async fn pubsub_interest_peers(&self, stream_id: &str) -> Vec<String> {
1468        self.interested_pubsub_peers(stream_id, None).await
1469    }
1470
1471    fn choose_ready_response_job(
1472        ready_jobs: &[(u64, String, usize, Instant, u64)],
1473        stats: &HashMap<String, PeerWireStats>,
1474    ) -> Option<(u64, f64)> {
1475        let jobs = ready_jobs
1476            .iter()
1477            .map(|job| OutboundJobCandidate {
1478                job_id: job.0,
1479                peer_id: job.1.clone(),
1480                message_bytes: job.2 as u64,
1481                queue_sequence: job.4,
1482            })
1483            .collect::<Vec<_>>();
1484        select_reciprocal_outbound_job(&jobs, |peer_id| {
1485            stats.get(peer_id).copied().unwrap_or_default()
1486        })
1487        .map(|choice| (choice.job_id, choice.virtual_finish))
1488    }
1489
1490    async fn enqueue_response_send(
1491        self: &Arc<Self>,
1492        peer_id: String,
1493        bytes: Vec<u8>,
1494        ready_at: Instant,
1495    ) {
1496        let job_id = self.next_response_job_id.fetch_add(1, Ordering::Relaxed);
1497        {
1498            let mut queue = self.pending_response_sends.lock().await;
1499            queue.push(PendingResponseSend {
1500                job_id,
1501                peer_id,
1502                bytes,
1503                ready_at,
1504                queue_sequence: job_id,
1505            });
1506        }
1507
1508        if self
1509            .response_scheduler_running
1510            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1511            .is_ok()
1512        {
1513            let this = Arc::clone(self);
1514            tokio::spawn(async move {
1515                this.run_response_scheduler().await;
1516            });
1517        }
1518    }
1519
1520    async fn run_response_scheduler(self: Arc<Self>) {
1521        loop {
1522            let snapshot = {
1523                let queue = self.pending_response_sends.lock().await;
1524                if queue.is_empty() {
1525                    self.response_scheduler_running
1526                        .store(false, Ordering::Release);
1527                    return;
1528                }
1529                queue
1530                    .iter()
1531                    .map(|job| {
1532                        (
1533                            job.job_id,
1534                            job.peer_id.clone(),
1535                            job.bytes.len(),
1536                            job.ready_at,
1537                            job.queue_sequence,
1538                        )
1539                    })
1540                    .collect::<Vec<_>>()
1541            };
1542
1543            let now = Instant::now();
1544            let mut earliest_ready_at: Option<Instant> = None;
1545            let mut ready_jobs = Vec::new();
1546            for job in &snapshot {
1547                if job.3 <= now {
1548                    ready_jobs.push(job.clone());
1549                } else {
1550                    earliest_ready_at = Some(match earliest_ready_at {
1551                        Some(current) => current.min(job.3),
1552                        None => job.3,
1553                    });
1554                }
1555            }
1556
1557            if ready_jobs.is_empty() {
1558                if let Some(ready_at) = earliest_ready_at {
1559                    tokio::time::sleep(ready_at.saturating_duration_since(Instant::now())).await;
1560                    continue;
1561                }
1562                self.response_scheduler_running
1563                    .store(false, Ordering::Release);
1564                return;
1565            }
1566
1567            let (selected_job_id, selected_finish) = {
1568                let stats = self.peer_wire_stats.read().await;
1569                Self::choose_ready_response_job(&ready_jobs, &stats).expect("ready response job")
1570            };
1571
1572            let selected = {
1573                let mut queue = self.pending_response_sends.lock().await;
1574                let Some(index) = queue.iter().position(|job| job.job_id == selected_job_id) else {
1575                    continue;
1576                };
1577                queue.swap_remove(index)
1578            };
1579
1580            let sent = if let Some(channel) = self.signaling.get_channel(&selected.peer_id).await {
1581                channel.send(selected.bytes.clone()).await.is_ok()
1582            } else {
1583                false
1584            };
1585
1586            let queued_peers = {
1587                let queue = self.pending_response_sends.lock().await;
1588                queue
1589                    .iter()
1590                    .map(|job| job.peer_id.clone())
1591                    .collect::<HashSet<_>>()
1592            };
1593            let mut stats = self.peer_wire_stats.write().await;
1594            let entry = stats.entry(selected.peer_id.clone()).or_default();
1595            if sent {
1596                entry.bytes_sent = entry.bytes_sent.saturating_add(selected.bytes.len() as u64);
1597                entry.bandwidth_debt = selected_finish;
1598            }
1599            if queued_peers.is_empty() {
1600                for peer_stats in stats.values_mut() {
1601                    peer_stats.bandwidth_debt = 0.0;
1602                }
1603            } else {
1604                let floor = queued_peers
1605                    .iter()
1606                    .filter_map(|peer_id| stats.get(peer_id).map(|peer| peer.bandwidth_debt))
1607                    .fold(f64::INFINITY, f64::min);
1608                if floor.is_finite() && floor > 0.0 {
1609                    for peer_id in queued_peers {
1610                        if let Some(peer_stats) = stats.get_mut(&peer_id) {
1611                            peer_stats.bandwidth_debt =
1612                                (peer_stats.bandwidth_debt - floor).max(0.0);
1613                        }
1614                    }
1615                }
1616            }
1617        }
1618    }
1619
1620    fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
1621        let mut hasher = DefaultHasher::new();
1622        peer_id.hash(&mut hasher);
1623        hash.hash(&mut hasher);
1624        salt.hash(&mut hasher);
1625        let v = hasher.finish();
1626        (v as f64) / (u64::MAX as f64)
1627    }
1628
1629    fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
1630        Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
1631    }
1632
1633    fn peer_metadata_pointer_slot_hash() -> Hash {
1634        hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
1635    }
1636
1637    fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
1638        let bytes = hex::decode(hash_hex)
1639            .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
1640        if bytes.len() != 32 {
1641            return Err(StoreError::Other(format!(
1642                "Invalid hash length {}, expected 32 bytes",
1643                bytes.len()
1644            )));
1645        }
1646        let mut hash = [0u8; 32];
1647        hash.copy_from_slice(&bytes);
1648        Ok(hash)
1649    }
1650
1651    fn should_drop_response(&self, hash: &Hash) -> bool {
1652        let p = self.response_behavior().drop_response_prob;
1653        if p <= 0.0 {
1654            return false;
1655        }
1656        self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
1657    }
1658
1659    fn should_corrupt_response(&self, hash: &Hash) -> bool {
1660        let p = self.response_behavior().corrupt_response_prob;
1661        if p <= 0.0 {
1662            return false;
1663        }
1664        self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
1665    }
1666
1667    fn should_stall_response(&self, hash: &Hash) -> bool {
1668        let p = self.response_behavior().stall_response_prob;
1669        if p <= 0.0 {
1670            return false;
1671        }
1672        self.deterministic_actor_draw(hash, 0x5A_11_5A_11_5A_11_5A_11) < p
1673    }
1674
1675    fn response_send_delay(&self, hash: &Hash, payload_len: usize) -> Duration {
1676        let behavior = self.response_behavior();
1677        let mut total_ms = behavior
1678            .extra_delay_ms
1679            .saturating_add(behavior.first_byte_delay_ms);
1680
1681        if behavior.bytes_per_second > 0 && payload_len > 0 {
1682            let throughput_ms = ((payload_len as u128) * 1000)
1683                .div_ceil(behavior.bytes_per_second as u128)
1684                .min(u64::MAX as u128) as u64;
1685            total_ms = total_ms.saturating_add(throughput_ms);
1686        }
1687
1688        if behavior.stall_delay_ms > 0 && self.should_stall_response(hash) {
1689            total_ms = total_ms.saturating_add(behavior.stall_delay_ms);
1690        }
1691
1692        Duration::from_millis(total_ms)
1693    }
1694
1695    async fn ordered_connected_peers(&self, exclude_peer_id: Option<&str>) -> Vec<String> {
1696        let current_peer_ids = self.signaling.peer_ids().await;
1697        if current_peer_ids.is_empty() {
1698            return Vec::new();
1699        }
1700
1701        sync_selector_peers(&self.peer_selector, &current_peer_ids).await;
1702        let hash_get_peer_ids: HashSet<String> = self
1703            .signaling
1704            .hash_get_peer_ids()
1705            .await
1706            .into_iter()
1707            .collect();
1708        let mut candidate_peer_ids: Vec<String> = current_peer_ids
1709            .into_iter()
1710            .filter(|peer_id| hash_get_peer_ids.contains(peer_id))
1711            .filter(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude))
1712            .collect();
1713        if candidate_peer_ids.is_empty() {
1714            return Vec::new();
1715        }
1716
1717        let current_set: HashSet<&str> = candidate_peer_ids.iter().map(String::as_str).collect();
1718        let mut selector = self.peer_selector.write().await;
1719        let mut selector_order = selector.select_peers();
1720        selector_order.retain(|peer_id| current_set.contains(peer_id.as_str()));
1721        if selector_order.is_empty() {
1722            let mut fallback = candidate_peer_ids;
1723            fallback.sort();
1724            return fallback;
1725        }
1726        let backed_off: HashMap<String, bool> = candidate_peer_ids
1727            .iter()
1728            .map(|peer_id| (peer_id.clone(), selector.is_peer_backed_off(peer_id)))
1729            .collect();
1730        drop(selector);
1731
1732        let rank: HashMap<&str, usize> = selector_order
1733            .iter()
1734            .enumerate()
1735            .map(|(idx, peer_id)| (peer_id.as_str(), idx))
1736            .collect();
1737        let active = self.peer_active_requests.read().await;
1738        candidate_peer_ids.sort_by(|left, right| {
1739            let left_backed_off = backed_off.get(left).copied().unwrap_or(false);
1740            let right_backed_off = backed_off.get(right).copied().unwrap_or(false);
1741            if left_backed_off != right_backed_off {
1742                return if left_backed_off {
1743                    std::cmp::Ordering::Greater
1744                } else {
1745                    std::cmp::Ordering::Less
1746                };
1747            }
1748            let left_rank = rank.get(left.as_str()).copied().unwrap_or(usize::MAX / 2);
1749            let right_rank = rank.get(right.as_str()).copied().unwrap_or(usize::MAX / 2);
1750            let left_load = active.get(left).copied().unwrap_or(0);
1751            let right_load = active.get(right).copied().unwrap_or(0);
1752            (left_rank + left_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY))
1753                .cmp(&(right_rank + right_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY)))
1754                .then_with(|| left.cmp(right))
1755        });
1756        candidate_peer_ids
1757    }
1758
1759    async fn reserve_peer_request(&self, peer_id: &str) {
1760        let mut active = self.peer_active_requests.write().await;
1761        *active.entry(peer_id.to_string()).or_insert(0) += 1;
1762    }
1763
1764    async fn release_peer_request(&self, peer_id: &str) {
1765        let mut active = self.peer_active_requests.write().await;
1766        let Some(count) = active.get_mut(peer_id) else {
1767            return;
1768        };
1769        if *count <= 1 {
1770            active.remove(peer_id);
1771        } else {
1772            *count -= 1;
1773        }
1774    }
1775
1776    async fn release_queried_peer_requests(&self, peer_ids: &[String]) {
1777        for peer_id in peer_ids {
1778            self.release_peer_request(peer_id).await;
1779        }
1780    }
1781
1782    fn requested_quote_mint(&self) -> Option<&str> {
1783        if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
1784            if self.routing.cashu_accepted_mints.is_empty()
1785                || self
1786                    .routing
1787                    .cashu_accepted_mints
1788                    .iter()
1789                    .any(|mint| mint == default_mint)
1790            {
1791                return Some(default_mint);
1792            }
1793        }
1794
1795        self.routing
1796            .cashu_accepted_mints
1797            .first()
1798            .map(String::as_str)
1799    }
1800
1801    fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
1802        if let Some(requested_mint) = requested_mint {
1803            if self.accepts_quote_mint(Some(requested_mint)) {
1804                return Some(requested_mint.to_string());
1805            }
1806        }
1807        if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
1808            return Some(default_mint.clone());
1809        }
1810        if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
1811            return Some(first_mint.clone());
1812        }
1813        requested_mint.map(str::to_string)
1814    }
1815
1816    fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
1817        if self.routing.cashu_accepted_mints.is_empty() {
1818            return true;
1819        }
1820
1821        let Some(mint_url) = mint_url else {
1822            return false;
1823        };
1824        self.routing
1825            .cashu_accepted_mints
1826            .iter()
1827            .any(|mint| mint == mint_url)
1828    }
1829
1830    fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
1831        let Some(mint_url) = mint_url else {
1832            return self.routing.cashu_default_mint.is_none()
1833                && self.routing.cashu_accepted_mints.is_empty();
1834        };
1835        self.routing.cashu_default_mint.as_deref() == Some(mint_url)
1836            || self
1837                .routing
1838                .cashu_accepted_mints
1839                .iter()
1840                .any(|mint| mint == mint_url)
1841    }
1842
1843    async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
1844        let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
1845        if base == 0 {
1846            return 0;
1847        }
1848
1849        let selector = self.peer_selector.read().await;
1850        let Some(stats) = selector.get_stats(peer_id) else {
1851            let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
1852            return if max_cap > 0 { base.min(max_cap) } else { base };
1853        };
1854
1855        if stats.cashu_payment_defaults > 0
1856            && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
1857        {
1858            return 0;
1859        }
1860
1861        let success_bonus = stats
1862            .successes
1863            .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
1864        let receipt_bonus = stats
1865            .cashu_payment_receipts
1866            .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
1867        let mut cap = base
1868            .saturating_add(success_bonus)
1869            .saturating_add(receipt_bonus);
1870        let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
1871        if max_cap > 0 {
1872            cap = cap.min(max_cap);
1873        }
1874        cap
1875    }
1876
1877    async fn should_accept_quote_response(
1878        &self,
1879        from_peer: &str,
1880        preferred_mint_url: Option<&str>,
1881        offered_payment_sat: u64,
1882        res: &DataQuoteResponse,
1883    ) -> bool {
1884        let Some(payment_sat) = res.p else {
1885            return false;
1886        };
1887        if payment_sat > offered_payment_sat {
1888            return false;
1889        }
1890
1891        let response_mint = res.m.as_deref();
1892        if response_mint == preferred_mint_url {
1893            return true;
1894        }
1895        if self.trusts_quote_mint(response_mint) {
1896            return true;
1897        }
1898        if response_mint.is_none() {
1899            return false;
1900        }
1901
1902        payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
1903    }
1904
1905    async fn issue_quote(
1906        &self,
1907        peer_id: &str,
1908        hash_key: &str,
1909        payment_sat: u64,
1910        ttl_ms: u32,
1911        mint_url: Option<&str>,
1912    ) -> u64 {
1913        let quote_id = {
1914            let mut next = self.next_quote_id.write().await;
1915            let quote_id = *next;
1916            *next = next.saturating_add(1);
1917            quote_id
1918        };
1919
1920        let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
1921        self.issued_quotes.write().await.insert(
1922            (peer_id.to_string(), hash_key.to_string(), quote_id),
1923            IssuedQuote {
1924                expires_at,
1925                payment_sat,
1926                mint_url: mint_url.map(str::to_string),
1927            },
1928        );
1929        quote_id
1930    }
1931
1932    async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
1933        let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
1934        let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
1935            return false;
1936        };
1937        quote.expires_at > Instant::now()
1938    }
1939
1940    async fn send_request_to_peer(
1941        &self,
1942        peer_id: &str,
1943        hash: &Hash,
1944        request_htl: u8,
1945        quote_id: Option<u64>,
1946    ) -> bool {
1947        if !should_forward_htl(request_htl) {
1948            return false;
1949        }
1950
1951        let channel = match self.signaling.get_channel(peer_id).await {
1952            Some(c) => c,
1953            None => return false,
1954        };
1955
1956        let htl_config = {
1957            let configs = self.htl_configs.read().await;
1958            configs
1959                .get(peer_id)
1960                .cloned()
1961                .unwrap_or_else(PeerHTLConfig::random)
1962        };
1963
1964        let send_htl = htl_config.decrement(request_htl);
1965        let req = match quote_id {
1966            Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
1967            None => create_request(hash, send_htl),
1968        };
1969        let request_bytes = encode_request(&req);
1970        let request_len = request_bytes.len() as u64;
1971
1972        {
1973            let mut selector = self.peer_selector.write().await;
1974            selector.record_request(peer_id, request_len);
1975        }
1976
1977        match channel.send(request_bytes).await {
1978            Ok(()) => {
1979                self.record_peer_wire_sent(peer_id, request_len).await;
1980                true
1981            }
1982            Err(_) => {
1983                self.peer_selector.write().await.record_failure(peer_id);
1984                false
1985            }
1986        }
1987    }
1988
1989    async fn send_quote_request_to_peer(
1990        &self,
1991        peer_id: &str,
1992        hash: &Hash,
1993        payment_sat: u64,
1994        ttl_ms: u32,
1995        mint_url: Option<&str>,
1996    ) -> bool {
1997        let channel = match self.signaling.get_channel(peer_id).await {
1998            Some(c) => c,
1999            None => return false,
2000        };
2001
2002        let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
2003        let request_bytes = encode_quote_request(&req);
2004        let request_len = request_bytes.len() as u64;
2005
2006        match channel.send(request_bytes).await {
2007            Ok(()) => {
2008                self.record_peer_wire_sent(peer_id, request_len).await;
2009                true
2010            }
2011            Err(_) => false,
2012        }
2013    }
2014
2015    pub async fn set_read_sources(&self, sources: Vec<Arc<dyn MeshReadSource>>) {
2016        let mut by_id = HashMap::new();
2017        let mut stats = self.read_source_stats.write().await;
2018        for source in sources {
2019            let source_id = source.id().to_string();
2020            by_id.insert(source_id.clone(), source);
2021            stats
2022                .entry(source_id)
2023                .or_insert_with(AdaptiveSourceStats::default);
2024        }
2025        *self.read_sources.write().await = by_id;
2026    }
2027
2028    async fn record_read_source_request(&self, source_id: &str) {
2029        let mut stats = self.read_source_stats.write().await;
2030        stats
2031            .entry(source_id.to_string())
2032            .or_insert_with(AdaptiveSourceStats::default)
2033            .requests += 1;
2034    }
2035
2036    async fn record_read_source_miss(&self, source_id: &str) {
2037        let mut stats = self.read_source_stats.write().await;
2038        stats
2039            .entry(source_id.to_string())
2040            .or_insert_with(AdaptiveSourceStats::default)
2041            .misses += 1;
2042    }
2043
2044    async fn record_read_source_success(&self, source_id: &str, elapsed_ms: u64) {
2045        let now = Instant::now();
2046        let mut stats = self.read_source_stats.write().await;
2047        let stats = stats
2048            .entry(source_id.to_string())
2049            .or_insert_with(AdaptiveSourceStats::default);
2050        stats.successes += 1;
2051        stats.last_success_at = Some(now);
2052        stats.backoff_level = 0;
2053        stats.backed_off_until = None;
2054        if stats.srtt_ms <= 0.0 {
2055            stats.srtt_ms = elapsed_ms as f64;
2056            stats.rttvar_ms = elapsed_ms as f64 / 2.0;
2057            return;
2058        }
2059        let elapsed = elapsed_ms as f64;
2060        stats.rttvar_ms = 0.75 * stats.rttvar_ms + 0.25 * (stats.srtt_ms - elapsed).abs();
2061        stats.srtt_ms = 0.875 * stats.srtt_ms + 0.125 * elapsed;
2062    }
2063
2064    async fn record_read_source_failure(&self, source_id: &str) {
2065        let now = Instant::now();
2066        let mut stats = self.read_source_stats.write().await;
2067        let stats = stats
2068            .entry(source_id.to_string())
2069            .or_insert_with(AdaptiveSourceStats::default);
2070        stats.failures += 1;
2071        stats.last_failure_at = Some(now);
2072        Self::apply_source_backoff(stats, now);
2073    }
2074
2075    async fn record_read_source_timeout(&self, source_id: &str) {
2076        let now = Instant::now();
2077        let mut stats = self.read_source_stats.write().await;
2078        let stats = stats
2079            .entry(source_id.to_string())
2080            .or_insert_with(AdaptiveSourceStats::default);
2081        stats.timeouts += 1;
2082        stats.last_failure_at = Some(now);
2083        Self::apply_source_backoff(stats, now);
2084    }
2085
2086    fn apply_source_backoff(stats: &mut AdaptiveSourceStats, now: Instant) {
2087        stats.backoff_level = stats.backoff_level.saturating_add(1);
2088        let backoff_ms = (INITIAL_SOURCE_BACKOFF_MS
2089            .saturating_mul(2u64.saturating_pow(stats.backoff_level.saturating_sub(1))))
2090        .min(MAX_SOURCE_BACKOFF_MS);
2091        stats.backed_off_until = Some(now + Duration::from_millis(backoff_ms));
2092    }
2093
2094    async fn ordered_read_sources(&self) -> Vec<Arc<dyn MeshReadSource>> {
2095        let sources = self.read_sources.read().await;
2096        if sources.is_empty() {
2097            return Vec::new();
2098        }
2099
2100        let mut available: Vec<Arc<dyn MeshReadSource>> = sources
2101            .values()
2102            .filter(|source| source.is_available())
2103            .cloned()
2104            .collect();
2105        if available.is_empty() {
2106            return Vec::new();
2107        }
2108
2109        let now = Instant::now();
2110        let stats = self.read_source_stats.read().await;
2111        let mut healthy: Vec<Arc<dyn MeshReadSource>> = available
2112            .iter()
2113            .filter(|source| {
2114                stats
2115                    .get(source.id())
2116                    .and_then(|s| s.backed_off_until)
2117                    .is_none_or(|until| until <= now)
2118            })
2119            .cloned()
2120            .collect();
2121        if !healthy.is_empty() {
2122            available = std::mem::take(&mut healthy);
2123        }
2124
2125        available.sort_by(|left, right| {
2126            let left_stats = stats.get(left.id()).cloned().unwrap_or_default();
2127            let right_stats = stats.get(right.id()).cloned().unwrap_or_default();
2128            adaptive_source_score(&right_stats, now)
2129                .partial_cmp(&adaptive_source_score(&left_stats, now))
2130                .unwrap_or(std::cmp::Ordering::Equal)
2131                .then_with(|| left.id().cmp(right.id()))
2132        });
2133        available
2134    }
2135
2136    async fn should_probe_multiple_read_sources(
2137        &self,
2138        ordered_sources: &[Arc<dyn MeshReadSource>],
2139    ) -> bool {
2140        if ordered_sources.len() <= 1 {
2141            return false;
2142        }
2143        let stats = self.read_source_stats.read().await;
2144        let best = stats
2145            .get(ordered_sources[0].id())
2146            .cloned()
2147            .unwrap_or_default();
2148        let second = stats
2149            .get(ordered_sources[1].id())
2150            .cloned()
2151            .unwrap_or_default();
2152        if !source_has_history(&best) || !source_has_history(&second) {
2153            return false;
2154        }
2155        let now = Instant::now();
2156        adaptive_source_score(&best, now) - adaptive_source_score(&second, now)
2157            < SOURCE_SCORE_TIE_DELTA
2158    }
2159
2160    async fn source_dispatch_for(&self, source_count: usize) -> RequestDispatchConfig {
2161        if source_count == 0 {
2162            return self.routing.dispatch;
2163        }
2164        let ordered_sources = self.ordered_read_sources().await;
2165        let probe_multiple = self
2166            .should_probe_multiple_read_sources(&ordered_sources)
2167            .await;
2168        let initial_fanout = if probe_multiple {
2169            source_count.min(2)
2170        } else {
2171            1
2172        };
2173        RequestDispatchConfig {
2174            initial_fanout,
2175            hedge_fanout: self.routing.dispatch.hedge_fanout,
2176            max_fanout: self.routing.dispatch.max_fanout.min(source_count),
2177            hedge_interval_ms: self.routing.dispatch.hedge_interval_ms,
2178        }
2179    }
2180
2181    /// Get peer count
2182    pub async fn peer_count(&self) -> usize {
2183        self.signaling.peer_count().await
2184    }
2185
2186    /// Get connected mesh peer IDs.
2187    pub async fn peer_ids(&self) -> Vec<String> {
2188        self.signaling.peer_ids().await
2189    }
2190
2191    /// Check if we need more peers
2192    pub async fn needs_peers(&self) -> bool {
2193        self.signaling.needs_peers().await
2194    }
2195
2196    /// Re-broadcast hello to refresh discovery as topology changes.
2197    pub async fn send_hello(&self) -> Result<(), TransportError> {
2198        self.signaling.send_hello(vec![]).await
2199    }
2200
2201    /// Drain all currently available peer-link messages and handle them.
2202    ///
2203    /// This keeps the message pump logic shared between simulation and the
2204    /// default production wrapper instead of duplicating per-channel loops.
2205    pub async fn drain_available_data_messages(self: &Arc<Self>) -> DataPumpStats {
2206        let mut stats = DataPumpStats::default();
2207        let peer_ids = self.signaling.peer_ids().await;
2208        for peer_id in peer_ids {
2209            let Some(channel) = self.signaling.get_channel(&peer_id).await else {
2210                continue;
2211            };
2212
2213            while let Some(data) = channel.try_recv() {
2214                stats.processed += 1;
2215                stats.processed_bytes += data.len() as u64;
2216                if let Some(msg) = parse_message(&data) {
2217                    match msg {
2218                        DataMessage::Request(_) => stats.request_messages += 1,
2219                        DataMessage::Response(_) => stats.response_messages += 1,
2220                        DataMessage::QuoteRequest(_) => stats.quote_request_messages += 1,
2221                        DataMessage::QuoteResponse(_) => stats.quote_response_messages += 1,
2222                        DataMessage::PubsubInterest(_) => stats.pubsub_interest_messages += 1,
2223                        DataMessage::PubsubFrame(_) => stats.pubsub_frame_messages += 1,
2224                        DataMessage::PubsubInventory(_) => stats.pubsub_inventory_messages += 1,
2225                        DataMessage::PubsubWant(_) => stats.pubsub_want_messages += 1,
2226                        DataMessage::Payment(_)
2227                        | DataMessage::PaymentAck(_)
2228                        | DataMessage::Chunk(_)
2229                        | DataMessage::PeerHints(_) => {}
2230                    }
2231                }
2232                self.handle_data_message(&peer_id, &data).await;
2233            }
2234        }
2235        stats
2236    }
2237
2238    /// Apply an out-of-band payment credit to a peer's routing priority.
2239    pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
2240        self.peer_selector
2241            .write()
2242            .await
2243            .record_cashu_payment(peer_id, amount_sat);
2244    }
2245
2246    /// Record a post-delivery payment we received from a peer.
2247    pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
2248        self.peer_selector
2249            .write()
2250            .await
2251            .record_cashu_receipt(peer_id, amount_sat);
2252    }
2253
2254    /// Record that a peer failed to pay after we delivered successfully.
2255    pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
2256        self.peer_selector
2257            .write()
2258            .await
2259            .record_cashu_payment_default(peer_id);
2260    }
2261
2262    /// Snapshot routing/selection summary for inspection/debugging.
2263    pub async fn selector_summary(&self) -> crate::peer_selector::SelectorSummary {
2264        self.peer_selector.read().await.summary()
2265    }
2266
2267    fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
2268        selector.is_peer_blocked_for_payment_defaults(
2269            peer_id,
2270            self.routing.cashu_payment_default_block_threshold,
2271        )
2272    }
2273
2274    /// Export live peer metadata for inspection/debugging.
2275    pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
2276        self.peer_selector
2277            .read()
2278            .await
2279            .export_peer_metadata_snapshot()
2280    }
2281
2282    /// Snapshot current peer metadata and persist it into `local_store`.
2283    ///
2284    /// Uses content-addressed storage for the snapshot body and a reserved
2285    /// mutable pointer slot for the "latest snapshot hash".
2286    pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
2287        let snapshot = self
2288            .peer_selector
2289            .read()
2290            .await
2291            .export_peer_metadata_snapshot();
2292        let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
2293            StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
2294        })?;
2295        let snapshot_hash = hashtree_core::sha256(&bytes);
2296        let _ = self.local_store.put(snapshot_hash, bytes).await?;
2297
2298        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
2299        let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
2300        let _ = self.local_store.delete(&pointer_slot).await?;
2301        let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
2302
2303        Ok(snapshot_hash)
2304    }
2305
2306    /// Load persisted peer metadata from `local_store` if available.
2307    pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
2308        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
2309        let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
2310            return Ok(false);
2311        };
2312        let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
2313            StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
2314        })?;
2315        let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
2316
2317        let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
2318            return Ok(false);
2319        };
2320        let snapshot: PeerMetadataSnapshot =
2321            serde_json::from_slice(&snapshot_bytes).map_err(|e| {
2322                StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
2323            })?;
2324        self.peer_selector
2325            .write()
2326            .await
2327            .import_peer_metadata_snapshot(&snapshot);
2328        Ok(true)
2329    }
2330
2331    /// Request data from peers after negotiating a paid quote.
2332    ///
2333    /// If quote negotiation fails or the quoted peer does not deliver, the store
2334    /// falls back to the normal unpaid retrieval path to preserve liveness.
2335    pub async fn get_with_quote(
2336        &self,
2337        hash: &Hash,
2338        payment_sat: u64,
2339        quote_ttl: Duration,
2340    ) -> Result<Option<Vec<u8>>, StoreError> {
2341        if let Some(data) = self.local_store.get(hash).await? {
2342            return Ok(Some(data));
2343        }
2344        Ok(self
2345            .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
2346            .await)
2347    }
2348
2349    async fn request_from_peers_with_quote(
2350        &self,
2351        hash: &Hash,
2352        payment_sat: u64,
2353        quote_ttl: Duration,
2354    ) -> Option<Vec<u8>> {
2355        let ordered_peer_ids = self.ordered_connected_peers(None).await;
2356        if ordered_peer_ids.is_empty() {
2357            return None;
2358        }
2359
2360        if let Some(quote) = self
2361            .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
2362            .await
2363        {
2364            if let Some(data) = self
2365                .request_from_single_peer(hash, &quote.peer_id, MAX_HTL, Some(quote.quote_id))
2366                .await
2367            {
2368                return Some(data);
2369            }
2370        }
2371
2372        self.request_from_mesh(hash).await
2373    }
2374
2375    async fn request_quote_from_peers(
2376        &self,
2377        hash: &Hash,
2378        payment_sat: u64,
2379        quote_ttl: Duration,
2380        ordered_peer_ids: &[String],
2381    ) -> Option<NegotiatedQuote> {
2382        if ordered_peer_ids.is_empty() {
2383            return None;
2384        }
2385        let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
2386        if ttl_ms == 0 {
2387            return None;
2388        }
2389        let requested_mint = self.requested_quote_mint().map(str::to_string);
2390
2391        let hash_key = hash_to_key(hash);
2392        let (tx, rx) = oneshot::channel();
2393        self.pending_quotes.write().await.insert(
2394            hash_key.clone(),
2395            PendingQuoteRequest {
2396                response_tx: tx,
2397                preferred_mint_url: requested_mint.clone(),
2398                offered_payment_sat: payment_sat,
2399            },
2400        );
2401
2402        let rx = Arc::new(Mutex::new(rx));
2403        let result = run_hedged_waves(
2404            ordered_peer_ids.len(),
2405            self.routing.dispatch,
2406            self.request_timeout,
2407            |range| {
2408                let wave_peer_ids = ordered_peer_ids[range].to_vec();
2409                let requested_mint = requested_mint.clone();
2410                let hash = *hash;
2411                async move {
2412                    let mut sent = 0usize;
2413                    for peer_id in wave_peer_ids {
2414                        if self
2415                            .send_quote_request_to_peer(
2416                                &peer_id,
2417                                &hash,
2418                                payment_sat,
2419                                ttl_ms,
2420                                requested_mint.as_deref(),
2421                            )
2422                            .await
2423                        {
2424                            sent += 1;
2425                        }
2426                    }
2427                    sent
2428                }
2429            },
2430            |wait| {
2431                let rx = rx.clone();
2432                async move {
2433                    let mut rx = rx.lock().await;
2434                    match tokio::time::timeout(wait, &mut *rx).await {
2435                        Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
2436                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
2437                        Err(_) => HedgedWaveAction::Continue,
2438                    }
2439                }
2440            },
2441        )
2442        .await;
2443        let _ = self.pending_quotes.write().await.remove(&hash_key);
2444        result
2445    }
2446
2447    async fn request_from_single_peer(
2448        &self,
2449        hash: &Hash,
2450        peer_id: &str,
2451        request_htl: u8,
2452        quote_id: Option<u64>,
2453    ) -> Option<Vec<u8>> {
2454        let hash_key = hash_to_key(hash);
2455        let (tx, rx) = oneshot::channel();
2456        self.pending_requests.write().await.insert(
2457            hash_key.clone(),
2458            PendingRequest {
2459                response_tx: tx,
2460                started_at: Instant::now(),
2461                queried_peers: vec![peer_id.to_string()],
2462            },
2463        );
2464
2465        let mut rx = rx;
2466        if !self
2467            .send_request_to_peer(peer_id, hash, request_htl, quote_id)
2468            .await
2469        {
2470            let _ = self.pending_requests.write().await.remove(&hash_key);
2471            return None;
2472        }
2473        self.reserve_peer_request(peer_id).await;
2474
2475        if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
2476            if hashtree_core::sha256(&data) == *hash {
2477                let _ = self.local_store.put(*hash, data.clone()).await;
2478                return Some(data);
2479            }
2480        }
2481
2482        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2483            self.release_queried_peer_requests(&pending.queried_peers)
2484                .await;
2485            for peer_id in pending.queried_peers {
2486                self.peer_selector.write().await.record_timeout(&peer_id);
2487            }
2488        }
2489        let _ = self.take_forward_requesters(&hash_key).await;
2490        None
2491    }
2492
2493    async fn request_from_ordered_peers(
2494        &self,
2495        hash: &Hash,
2496        ordered_peer_ids: &[String],
2497        request_htl: u8,
2498    ) -> RouteFetchOutcome {
2499        let hash_key = hash_to_key(hash);
2500        let (tx, rx) = oneshot::channel();
2501        self.pending_requests.write().await.insert(
2502            hash_key.clone(),
2503            PendingRequest {
2504                response_tx: tx,
2505                started_at: Instant::now(),
2506                queried_peers: Vec::new(),
2507            },
2508        );
2509
2510        let rx = Arc::new(Mutex::new(rx));
2511        let result = run_hedged_waves(
2512            ordered_peer_ids.len(),
2513            self.routing.dispatch,
2514            self.request_timeout,
2515            |range| {
2516                let wave_peer_ids = ordered_peer_ids[range].to_vec();
2517                let hash = *hash;
2518                let hash_key = hash_key.clone();
2519                async move {
2520                    let mut sent = 0usize;
2521                    for peer_id in wave_peer_ids {
2522                        if self
2523                            .send_request_to_peer(&peer_id, &hash, request_htl, None)
2524                            .await
2525                        {
2526                            sent += 1;
2527                            self.reserve_peer_request(&peer_id).await;
2528                            if let Some(pending) =
2529                                self.pending_requests.write().await.get_mut(&hash_key)
2530                            {
2531                                pending.queried_peers.push(peer_id);
2532                            }
2533                        }
2534                    }
2535                    sent
2536                }
2537            },
2538            |wait| {
2539                let rx = rx.clone();
2540                async move {
2541                    let mut rx = rx.lock().await;
2542                    match tokio::time::timeout(wait, &mut *rx).await {
2543                        Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == *hash => {
2544                            HedgedWaveAction::Success(data)
2545                        }
2546                        Ok(Ok(Some(_))) => HedgedWaveAction::Continue,
2547                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
2548                        Err(_) => HedgedWaveAction::Continue,
2549                    }
2550                }
2551            },
2552        )
2553        .await;
2554
2555        let Some(data) = result else {
2556            if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2557                self.release_queried_peer_requests(&pending.queried_peers)
2558                    .await;
2559                for peer_id in pending.queried_peers {
2560                    self.peer_selector.write().await.record_timeout(&peer_id);
2561                }
2562            }
2563            let _ = self.take_forward_requesters(&hash_key).await;
2564            return RouteFetchOutcome::Timeout;
2565        };
2566
2567        let _ = self.local_store.put(*hash, data.clone()).await;
2568        RouteFetchOutcome::Hit(data)
2569    }
2570
2571    async fn request_from_read_sources_inner(&self, hash: &Hash) -> RouteFetchOutcome {
2572        let ordered_sources = self.ordered_read_sources().await;
2573        if ordered_sources.is_empty() {
2574            return RouteFetchOutcome::Miss;
2575        }
2576
2577        let dispatch = normalize_dispatch_config(
2578            self.source_dispatch_for(ordered_sources.len()).await,
2579            ordered_sources.len(),
2580        );
2581        let wave_plan = build_hedged_wave_plan(ordered_sources.len(), dispatch);
2582        if wave_plan.is_empty() {
2583            return RouteFetchOutcome::Miss;
2584        }
2585
2586        let deadline = Instant::now() + self.request_timeout;
2587        let mut pending = FuturesUnordered::new();
2588        let mut pending_source_ids = HashSet::new();
2589        let mut saw_timeout = false;
2590        let mut next_source_idx = 0usize;
2591
2592        for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
2593            let from = next_source_idx;
2594            let to = (next_source_idx + wave_size).min(ordered_sources.len());
2595            next_source_idx = to;
2596
2597            for source in &ordered_sources[from..to] {
2598                let source = Arc::clone(source);
2599                let source_id = source.id().to_string();
2600                self.record_read_source_request(&source_id).await;
2601                pending_source_ids.insert(source_id.clone());
2602                let hash = *hash;
2603                pending.push(tokio::spawn(async move {
2604                    let started_at = Instant::now();
2605                    let result = std::panic::AssertUnwindSafe(source.get(&hash))
2606                        .catch_unwind()
2607                        .await;
2608                    match result {
2609                        Ok(Some(data)) => SourceFetchOutcome::Hit {
2610                            source_id,
2611                            data,
2612                            elapsed_ms: started_at.elapsed().as_millis().max(1) as u64,
2613                        },
2614                        Ok(None) => SourceFetchOutcome::Miss { source_id },
2615                        Err(_) => SourceFetchOutcome::Failure { source_id },
2616                    }
2617                }));
2618            }
2619
2620            let is_last_wave =
2621                wave_idx + 1 == wave_plan.len() || next_source_idx >= ordered_sources.len();
2622            let window_end = if is_last_wave {
2623                deadline
2624            } else {
2625                (Instant::now() + Duration::from_millis(dispatch.hedge_interval_ms)).min(deadline)
2626            };
2627
2628            while Instant::now() < window_end {
2629                let remaining = window_end.saturating_duration_since(Instant::now());
2630                let Some(result) = tokio::time::timeout(remaining, pending.next())
2631                    .await
2632                    .ok()
2633                    .flatten()
2634                else {
2635                    break;
2636                };
2637                let Ok(outcome) = result else {
2638                    continue;
2639                };
2640                match outcome {
2641                    SourceFetchOutcome::Hit {
2642                        source_id,
2643                        data,
2644                        elapsed_ms,
2645                    } => {
2646                        pending_source_ids.remove(&source_id);
2647                        self.record_read_source_success(&source_id, elapsed_ms)
2648                            .await;
2649                        return RouteFetchOutcome::Hit(data);
2650                    }
2651                    SourceFetchOutcome::Miss { source_id } => {
2652                        pending_source_ids.remove(&source_id);
2653                        self.record_read_source_miss(&source_id).await;
2654                    }
2655                    SourceFetchOutcome::Failure { source_id } => {
2656                        pending_source_ids.remove(&source_id);
2657                        self.record_read_source_failure(&source_id).await;
2658                    }
2659                }
2660            }
2661
2662            if Instant::now() >= deadline {
2663                break;
2664            }
2665        }
2666
2667        for source_id in pending_source_ids {
2668            saw_timeout = true;
2669            self.record_read_source_timeout(&source_id).await;
2670        }
2671        if saw_timeout {
2672            RouteFetchOutcome::Timeout
2673        } else {
2674            RouteFetchOutcome::Miss
2675        }
2676    }
2677
2678    async fn request_from_read_sources(&self, hash: &Hash) -> RouteFetchOutcome {
2679        let hash_key = hash_to_key(hash);
2680        let existing_wait = {
2681            let mut inflight = self.inflight_source_fetches.lock().await;
2682            if let Some(existing) = inflight.get_mut(&hash_key) {
2683                let (tx, rx) = oneshot::channel();
2684                existing.waiters.push(tx);
2685                Some(rx)
2686            } else {
2687                inflight.insert(
2688                    hash_key.clone(),
2689                    InflightSourceFetch {
2690                        waiters: Vec::new(),
2691                    },
2692                );
2693                None
2694            }
2695        };
2696
2697        if let Some(wait) = existing_wait {
2698            return wait.await.unwrap_or(RouteFetchOutcome::Timeout);
2699        }
2700
2701        let result = self.request_from_read_sources_inner(hash).await;
2702        if let RouteFetchOutcome::Hit(hit) = &result {
2703            let _ = self.local_store.put(*hash, hit.clone()).await;
2704        }
2705        self.complete_inflight_source_fetch(&hash_key, result.clone())
2706            .await;
2707
2708        result
2709    }
2710
2711    async fn complete_inflight_source_fetch(&self, hash_key: &str, result: RouteFetchOutcome) {
2712        let waiters = self
2713            .inflight_source_fetches
2714            .lock()
2715            .await
2716            .remove(hash_key)
2717            .map(|inflight| inflight.waiters)
2718            .unwrap_or_default();
2719        for waiter in waiters {
2720            let _ = waiter.send(result.clone());
2721        }
2722    }
2723
2724    async fn cancel_pending_peer_route(&self, hash: &Hash) {
2725        let hash_key = hash_to_key(hash);
2726        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2727            self.release_queried_peer_requests(&pending.queried_peers)
2728                .await;
2729        }
2730    }
2731
2732    async fn cancel_losing_route(&self, hash: &Hash, route: &ReadRoute, winner_data: &[u8]) {
2733        match route {
2734            ReadRoute::Peers(_) => self.cancel_pending_peer_route(hash).await,
2735            ReadRoute::Sources => {
2736                let hash_key = hash_to_key(hash);
2737                self.complete_inflight_source_fetch(
2738                    &hash_key,
2739                    RouteFetchOutcome::Hit(winner_data.to_vec()),
2740                )
2741                .await;
2742            }
2743        }
2744    }
2745
2746    async fn ranked_read_routes(&self, context: &MeshReadContext) -> Vec<RankedReadRoute> {
2747        let mut routes = Vec::new();
2748        let ordered_peers = if should_forward_htl(context.request_htl) {
2749            self.ordered_connected_peers(context.exclude_peer_id.as_deref())
2750                .await
2751        } else {
2752            Vec::new()
2753        };
2754        if !ordered_peers.is_empty() {
2755            let best_peer_id = ordered_peers[0].clone();
2756            let selector = self.peer_selector.read().await;
2757            let best_peer = selector.get_stats(&best_peer_id).cloned();
2758            let now = Instant::now();
2759            let (score, has_history) = match best_peer.as_ref() {
2760                Some(stats) => (
2761                    peer_endpoint_score(stats, now),
2762                    peer_endpoint_has_history(stats),
2763                ),
2764                None => (0.0, false),
2765            };
2766            routes.push(RankedReadRoute {
2767                route: ReadRoute::Peers(ordered_peers),
2768                best_endpoint_id: format!("peer:{best_peer_id}"),
2769                score,
2770                has_history,
2771            });
2772        }
2773        let ordered_sources = self.ordered_read_sources().await;
2774        if let Some(best_source) = ordered_sources.first() {
2775            let stats = self.read_source_stats.read().await;
2776            let best_source_stats = stats.get(best_source.id()).cloned().unwrap_or_default();
2777            let now = Instant::now();
2778            routes.push(RankedReadRoute {
2779                route: ReadRoute::Sources,
2780                best_endpoint_id: format!("source:{}", best_source.id()),
2781                score: adaptive_source_score(&best_source_stats, now),
2782                has_history: source_has_history(&best_source_stats),
2783            });
2784        }
2785        if routes.len() <= 1 {
2786            return routes;
2787        }
2788
2789        routes.sort_by(|left, right| {
2790            right
2791                .score
2792                .partial_cmp(&left.score)
2793                .unwrap_or(std::cmp::Ordering::Equal)
2794                .then_with(|| ranked_route_kind(&left.route).cmp(&ranked_route_kind(&right.route)))
2795                .then_with(|| left.best_endpoint_id.cmp(&right.best_endpoint_id))
2796                .then_with(|| left.route.id().cmp(right.route.id()))
2797        });
2798        routes
2799    }
2800
2801    fn should_probe_multiple_routes(&self, routes: &[RankedReadRoute]) -> bool {
2802        if routes.len() <= 1 {
2803            return false;
2804        }
2805        if !routes[0].has_history || !routes[1].has_history {
2806            return false;
2807        }
2808        (routes[0].score - routes[1].score) < SOURCE_SCORE_TIE_DELTA
2809    }
2810
2811    async fn run_read_route(
2812        &self,
2813        hash: &Hash,
2814        route: &ReadRoute,
2815        context: &MeshReadContext,
2816    ) -> RouteFetchOutcome {
2817        match route {
2818            ReadRoute::Peers(peer_ids) => {
2819                self.request_from_ordered_peers(hash, peer_ids, context.request_htl)
2820                    .await
2821            }
2822            ReadRoute::Sources => self.request_from_read_sources(hash).await,
2823        }
2824    }
2825
2826    async fn request_from_mesh_with_context(
2827        &self,
2828        hash: &Hash,
2829        context: &MeshReadContext,
2830    ) -> Option<Vec<u8>> {
2831        let routes = self.ranked_read_routes(context).await;
2832        match routes.as_slice() {
2833            [] => None,
2834            [ranked] => match self.run_read_route(hash, &ranked.route, context).await {
2835                RouteFetchOutcome::Hit(data) => Some(data),
2836                RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
2837            },
2838            [first, second, ..] => {
2839                if self.should_probe_multiple_routes(&routes) {
2840                    let first_fut = self.run_read_route(hash, &first.route, context);
2841                    let second_fut = self.run_read_route(hash, &second.route, context);
2842                    tokio::pin!(first_fut);
2843                    tokio::pin!(second_fut);
2844                    let mut first_done = false;
2845                    let mut second_done = false;
2846                    loop {
2847                        tokio::select! {
2848                            result = &mut first_fut, if !first_done => {
2849                                first_done = true;
2850                                if let RouteFetchOutcome::Hit(data) = result {
2851                                    if !second_done {
2852                                        self.cancel_losing_route(hash, &second.route, &data).await;
2853                                    }
2854                                    return Some(data);
2855                                }
2856                            }
2857                            result = &mut second_fut, if !second_done => {
2858                                second_done = true;
2859                                if let RouteFetchOutcome::Hit(data) = result {
2860                                    if !first_done {
2861                                        self.cancel_losing_route(hash, &first.route, &data).await;
2862                                    }
2863                                    return Some(data);
2864                                }
2865                            }
2866                            else => break,
2867                        }
2868                        if first_done && second_done {
2869                            break;
2870                        }
2871                    }
2872                    None
2873                } else {
2874                    match self.run_read_route(hash, &first.route, context).await {
2875                        RouteFetchOutcome::Hit(data) => return Some(data),
2876                        RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
2877                    }
2878                    for ranked in routes.iter().skip(1) {
2879                        match self.run_read_route(hash, &ranked.route, context).await {
2880                            RouteFetchOutcome::Hit(data) => return Some(data),
2881                            RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
2882                        }
2883                    }
2884                    None
2885                }
2886            }
2887        }
2888    }
2889
2890    async fn request_from_mesh(&self, hash: &Hash) -> Option<Vec<u8>> {
2891        self.request_from_mesh_with_context(hash, &MeshReadContext::default())
2892            .await
2893    }
2894
2895    async fn begin_forward_request(&self, hash_key: &str, requester_id: &str) -> bool {
2896        let mut pending = self.pending_forward_requests.write().await;
2897        if let Some(existing) = pending.get_mut(hash_key) {
2898            existing.requester_ids.insert(requester_id.to_string());
2899            return false;
2900        }
2901
2902        let mut requester_ids = HashSet::new();
2903        requester_ids.insert(requester_id.to_string());
2904        pending.insert(
2905            hash_key.to_string(),
2906            PendingForwardRequest { requester_ids },
2907        );
2908        true
2909    }
2910
2911    async fn was_recent_forward_miss(&self, hash_key: &str) -> bool {
2912        self.recent_forward_misses.lock().await.contains(hash_key)
2913    }
2914
2915    async fn mark_recent_forward_miss(&self, hash_key: &str) {
2916        let _ = self
2917            .recent_forward_misses
2918            .lock()
2919            .await
2920            .insert_if_new(hash_key.to_string());
2921    }
2922
2923    async fn take_forward_requesters(&self, hash_key: &str) -> Vec<String> {
2924        self.pending_forward_requests
2925            .write()
2926            .await
2927            .remove(hash_key)
2928            .map(|pending| pending.requester_ids.into_iter().collect())
2929            .unwrap_or_default()
2930    }
2931
2932    async fn complete_pending_response(
2933        self: &Arc<Self>,
2934        from_peer: &str,
2935        hash: &Hash,
2936        hash_key: String,
2937        payload: Vec<u8>,
2938    ) {
2939        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2940            self.release_queried_peer_requests(&pending.queried_peers)
2941                .await;
2942            let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
2943            self.peer_selector.write().await.record_success(
2944                from_peer,
2945                rtt_ms,
2946                payload.len() as u64,
2947            );
2948            let forward_requesters = self.take_forward_requesters(&hash_key).await;
2949            let response_bytes = if forward_requesters.is_empty() {
2950                None
2951            } else {
2952                Some(encode_response(&create_response(hash, payload.clone())))
2953            };
2954            let _ = pending.response_tx.send(Some(payload));
2955            if let Some(response_bytes) = response_bytes {
2956                for requester_id in forward_requesters {
2957                    Arc::clone(self)
2958                        .enqueue_response_send(requester_id, response_bytes.clone(), Instant::now())
2959                        .await;
2960                }
2961            }
2962        }
2963    }
2964
2965    async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
2966        if !res.a {
2967            return;
2968        }
2969
2970        let Some(quote_id) = res.q else {
2971            return;
2972        };
2973
2974        let hash_key = hash_to_key(&res.h);
2975        let (preferred_mint_url, offered_payment_sat) = {
2976            let pending_quotes = self.pending_quotes.read().await;
2977            let Some(pending) = pending_quotes.get(&hash_key) else {
2978                return;
2979            };
2980            (
2981                pending.preferred_mint_url.clone(),
2982                pending.offered_payment_sat,
2983            )
2984        };
2985        if !self
2986            .should_accept_quote_response(
2987                from_peer,
2988                preferred_mint_url.as_deref(),
2989                offered_payment_sat,
2990                &res,
2991            )
2992            .await
2993        {
2994            return;
2995        }
2996        let mut pending_quotes = self.pending_quotes.write().await;
2997        if let Some(pending) = pending_quotes.remove(&hash_key) {
2998            let _ = pending.response_tx.send(Some(NegotiatedQuote {
2999                peer_id: from_peer.to_string(),
3000                quote_id,
3001                mint_url: res.m,
3002            }));
3003        }
3004    }
3005
3006    async fn handle_response_message(
3007        self: &Arc<Self>,
3008        from_peer: &str,
3009        res: crate::protocol::DataResponse,
3010    ) {
3011        let hash_key = hash_to_key(&res.h);
3012        let hash = match crate::protocol::bytes_to_hash(&res.h) {
3013            Some(h) => h,
3014            None => return,
3015        };
3016
3017        // Ignore malformed/corrupt payload and keep waiting for a valid response.
3018        if hashtree_core::sha256(&res.d) != hash {
3019            self.peer_selector.write().await.record_failure(from_peer);
3020            if self.debug {
3021                println!("[MeshStoreCore] Ignoring invalid response payload for {hash_key}");
3022            }
3023            return;
3024        }
3025
3026        self.record_useful_bytes_received_from_peer(from_peer, res.d.len() as u64)
3027            .await;
3028        self.complete_pending_response(from_peer, &hash, hash_key, res.d)
3029            .await;
3030    }
3031
3032    async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
3033        let hash = match crate::protocol::bytes_to_hash(&req.h) {
3034            Some(h) => h,
3035            None => return,
3036        };
3037        let hash_key = hash_to_key(&hash);
3038
3039        {
3040            let selector = self.peer_selector.read().await;
3041            if self.should_refuse_requests_from_peer(&selector, from_peer) {
3042                if self.debug {
3043                    println!(
3044                        "[MeshStoreCore] Refusing quote request from delinquent peer {}",
3045                        from_peer
3046                    );
3047                }
3048                return;
3049            }
3050        }
3051
3052        let chosen_mint = self.choose_quote_mint(req.m.as_deref());
3053        let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
3054            && !self.should_drop_response(&hash)
3055            && !self.should_corrupt_response(&hash);
3056
3057        let res = if can_serve {
3058            let quote_id = self
3059                .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
3060                .await;
3061            create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
3062        } else {
3063            create_quote_response_unavailable(&hash)
3064        };
3065        let response_bytes = encode_quote_response(&res);
3066        if let Some(channel) = self.signaling.get_channel(from_peer).await {
3067            if channel.send(response_bytes.clone()).await.is_ok() {
3068                self.record_peer_wire_sent(from_peer, response_bytes.len() as u64)
3069                    .await;
3070            }
3071        }
3072    }
3073
3074    async fn handle_request_message(
3075        self: &Arc<Self>,
3076        from_peer: &str,
3077        req: crate::protocol::DataRequest,
3078    ) {
3079        let hash = match crate::protocol::bytes_to_hash(&req.h) {
3080            Some(h) => h,
3081            None => return,
3082        };
3083        let hash_key = hash_to_key(&hash);
3084
3085        if let Some(quote_id) = req.q {
3086            if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
3087                if self.debug {
3088                    println!(
3089                        "[MeshStoreCore] Refusing request with invalid or expired quote {} from {}",
3090                        quote_id, from_peer
3091                    );
3092                }
3093                return;
3094            }
3095        }
3096
3097        let allow_peer_forwarding = {
3098            let selector = self.peer_selector.read().await;
3099            !self.should_refuse_requests_from_peer(&selector, from_peer)
3100        };
3101
3102        // Check local store
3103        if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
3104            if self.should_drop_response(&hash) {
3105                if self.debug {
3106                    println!(
3107                        "[MeshStoreCore] Dropping response for {} due to actor profile",
3108                        hash_to_key(&hash)
3109                    );
3110                }
3111                return;
3112            }
3113
3114            let response_delay = self.response_send_delay(&hash, data.len());
3115            if self.should_corrupt_response(&hash) {
3116                if data.is_empty() {
3117                    data.push(0x80);
3118                } else {
3119                    data[0] ^= 0x80;
3120                }
3121            }
3122
3123            // Send response
3124            let res = create_response(&hash, data);
3125            let response_bytes = encode_response(&res);
3126            let ready_at = Instant::now() + response_delay;
3127            Arc::clone(self)
3128                .enqueue_response_send(from_peer.to_string(), response_bytes, ready_at)
3129                .await;
3130            return;
3131        }
3132
3133        if self.pending_requests.read().await.contains_key(&hash_key) {
3134            let _ = self.begin_forward_request(&hash_key, from_peer).await;
3135            return;
3136        }
3137
3138        if self.was_recent_forward_miss(&hash_key).await {
3139            if self.debug {
3140                println!(
3141                    "[MeshStoreCore] Suppressing recently missed forwarded request for {}",
3142                    hash_key
3143                );
3144            }
3145            return;
3146        }
3147
3148        if !self.begin_forward_request(&hash_key, from_peer).await {
3149            return;
3150        }
3151
3152        let from_peer = from_peer.to_string();
3153        let this = Arc::clone(self);
3154        let request_htl = req.htl;
3155        tokio::spawn(async move {
3156            let result = if allow_peer_forwarding {
3157                let context = MeshReadContext {
3158                    exclude_peer_id: Some(from_peer.clone()),
3159                    request_htl,
3160                };
3161                this.request_from_mesh_with_context(&hash, &context).await
3162            } else {
3163                if this.debug {
3164                    println!(
3165                        "[MeshStoreCore] Serving request from delinquent peer {} via read sources only",
3166                        from_peer
3167                    );
3168                }
3169                match this.request_from_read_sources(&hash).await {
3170                    RouteFetchOutcome::Hit(data) => Some(data),
3171                    RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
3172                }
3173            };
3174            let requester_ids = this.take_forward_requesters(&hash_key).await;
3175            if let Some(data) = result {
3176                let ready_at = Instant::now() + this.response_send_delay(&hash, data.len());
3177                let res = create_response(&hash, data);
3178                let response_bytes = encode_response(&res);
3179                for requester_id in requester_ids {
3180                    Arc::clone(&this)
3181                        .enqueue_response_send(requester_id, response_bytes.clone(), ready_at)
3182                        .await;
3183                }
3184            } else {
3185                this.mark_recent_forward_miss(&hash_key).await;
3186            }
3187        });
3188    }
3189
3190    async fn handle_pubsub_interest_message(
3191        self: &Arc<Self>,
3192        from_peer: &str,
3193        mut interest: PubsubInterest,
3194    ) {
3195        if !self.apply_pubsub_interest_route(from_peer, &interest).await {
3196            return;
3197        }
3198
3199        if interest.htl <= 1 {
3200            return;
3201        }
3202        interest.htl = interest.htl.saturating_sub(1);
3203        let _ = self
3204            .send_pubsub_interest_to_peers(&interest, Some(from_peer))
3205            .await;
3206    }
3207
3208    async fn handle_pubsub_frame_message(
3209        self: &Arc<Self>,
3210        from_peer: &str,
3211        mut frame: PubsubFrame,
3212        wire_bytes: usize,
3213    ) {
3214        if frame.stream_id.is_empty() || frame.origin_peer_id.is_empty() {
3215            return;
3216        }
3217        if frame.origin_peer_id == self.signaling.peer_id() {
3218            return;
3219        }
3220
3221        let frame_key = Self::pubsub_frame_key(&frame);
3222        if !self
3223            .pubsub_seen_frames
3224            .lock()
3225            .await
3226            .insert_if_new(frame_key.clone())
3227        {
3228            return;
3229        }
3230        self.cache_pubsub_frame(frame_key.clone(), frame.clone())
3231            .await;
3232
3233        let local_interested = self
3234            .pubsub_local_interests
3235            .read()
3236            .await
3237            .contains(&frame.stream_id);
3238        let mut downstream_peers = if frame.htl > 1 {
3239            match self.routing.pubsub_delivery_mode {
3240                PubsubDeliveryMode::InterestPush => {
3241                    let mut peers = self
3242                        .interested_pubsub_peers(&frame.stream_id, Some(from_peer))
3243                        .await;
3244                    peers.extend(
3245                        self.take_pubsub_want_peers(&frame_key, Some(from_peer))
3246                            .await,
3247                    );
3248                    peers.sort();
3249                    peers.dedup();
3250                    peers
3251                }
3252                PubsubDeliveryMode::HtlInvWant => {
3253                    self.take_pubsub_want_peers(&frame_key, Some(from_peer))
3254                        .await
3255                }
3256            }
3257        } else {
3258            Vec::new()
3259        };
3260        downstream_peers.retain(|peer_id| peer_id != from_peer);
3261
3262        if local_interested || !downstream_peers.is_empty() {
3263            self.record_useful_bytes_received_from_peer(from_peer, wire_bytes as u64)
3264                .await;
3265        }
3266
3267        if local_interested {
3268            self.enqueue_pubsub_event(PubsubEvent {
3269                stream_id: frame.stream_id.clone(),
3270                seq: frame.seq,
3271                origin_peer_id: frame.origin_peer_id.clone(),
3272                from_peer_id: from_peer.to_string(),
3273                payload: frame.payload.clone(),
3274            })
3275            .await;
3276        }
3277
3278        if downstream_peers.is_empty() {
3279            return;
3280        }
3281
3282        frame.htl = frame.htl.saturating_sub(1);
3283        let _ = self
3284            .send_pubsub_frame_to_peers(&frame, &downstream_peers)
3285            .await;
3286    }
3287
3288    async fn handle_pubsub_inventory_message(
3289        self: &Arc<Self>,
3290        from_peer: &str,
3291        inv: PubsubInventory,
3292        wire_bytes: usize,
3293    ) {
3294        if inv.stream_id.is_empty() || inv.origin_peer_id.is_empty() {
3295            return;
3296        }
3297        if inv.origin_peer_id == self.signaling.peer_id() {
3298            return;
3299        }
3300
3301        let key = Self::pubsub_key(&inv.origin_peer_id, &inv.stream_id, inv.seq);
3302        if !self
3303            .pubsub_seen_inventories
3304            .lock()
3305            .await
3306            .insert_if_new(key.clone())
3307        {
3308            return;
3309        }
3310        {
3311            let mut routes = self.pubsub_inventory_routes.write().await;
3312            routes
3313                .entry(key.clone())
3314                .or_insert_with(|| from_peer.to_string());
3315        }
3316
3317        let local_interested = self
3318            .pubsub_local_interests
3319            .read()
3320            .await
3321            .contains(&inv.stream_id);
3322        let downstream_peers = self
3323            .interested_pubsub_peers(&inv.stream_id, Some(from_peer))
3324            .await;
3325        if local_interested || !downstream_peers.is_empty() {
3326            self.record_useful_bytes_received_from_peer(from_peer, wire_bytes as u64)
3327                .await;
3328            let want =
3329                create_pubsub_want(inv.stream_id.clone(), inv.seq, inv.origin_peer_id.clone());
3330            let _ = self.send_pubsub_want_upstream(&key, &want, None).await;
3331        }
3332
3333        if !should_forward_htl(inv.htl) {
3334            return;
3335        }
3336        let _ = self.flood_pubsub_inventory(&inv, Some(from_peer)).await;
3337    }
3338
3339    async fn handle_pubsub_want_message(
3340        self: &Arc<Self>,
3341        from_peer: &str,
3342        want: PubsubWant,
3343        wire_bytes: usize,
3344    ) {
3345        if want.stream_id.is_empty() || want.origin_peer_id.is_empty() {
3346            return;
3347        }
3348        if want.origin_peer_id == from_peer {
3349            return;
3350        }
3351
3352        let key = Self::pubsub_key(&want.origin_peer_id, &want.stream_id, want.seq);
3353        let want_key = format!("{from_peer}:{key}");
3354        if !self.pubsub_seen_wants.lock().await.insert_if_new(want_key) {
3355            return;
3356        }
3357
3358        if let Some(frame) = self.cached_pubsub_frame(&key).await {
3359            self.record_useful_bytes_received_from_peer(from_peer, wire_bytes as u64)
3360                .await;
3361            let peers = vec![from_peer.to_string()];
3362            let _ = self.send_pubsub_frame_to_peers(&frame, &peers).await;
3363            return;
3364        }
3365
3366        let has_upstream_route = self.pubsub_inventory_routes.read().await.contains_key(&key);
3367        if !has_upstream_route {
3368            return;
3369        }
3370
3371        if self.remember_pubsub_want_peer(key.clone(), from_peer).await {
3372            self.record_useful_bytes_received_from_peer(from_peer, wire_bytes as u64)
3373                .await;
3374        }
3375        let _ = self
3376            .send_pubsub_want_upstream(&key, &want, Some(from_peer))
3377            .await;
3378    }
3379
3380    /// Handle incoming data message
3381    pub async fn handle_data_message(self: &Arc<Self>, from_peer: &str, data: &[u8]) {
3382        self.record_peer_wire_received(from_peer, data.len() as u64)
3383            .await;
3384        let parsed = match parse_message(data) {
3385            Some(m) => m,
3386            None => return,
3387        };
3388
3389        match parsed {
3390            DataMessage::Request(req) => {
3391                self.handle_request_message(from_peer, req).await;
3392            }
3393            DataMessage::Response(res) => {
3394                self.handle_response_message(from_peer, res).await;
3395            }
3396            DataMessage::QuoteRequest(req) => {
3397                self.handle_quote_request_message(from_peer, req).await;
3398            }
3399            DataMessage::QuoteResponse(res) => {
3400                self.handle_quote_response_message(from_peer, res).await;
3401            }
3402            DataMessage::PubsubInterest(interest) => {
3403                self.handle_pubsub_interest_message(from_peer, interest)
3404                    .await;
3405            }
3406            DataMessage::PubsubFrame(frame) => {
3407                self.handle_pubsub_frame_message(from_peer, frame, data.len())
3408                    .await;
3409            }
3410            DataMessage::PubsubInventory(inv) => {
3411                self.handle_pubsub_inventory_message(from_peer, inv, data.len())
3412                    .await;
3413            }
3414            DataMessage::PubsubWant(want) => {
3415                self.handle_pubsub_want_message(from_peer, want, data.len())
3416                    .await;
3417            }
3418            DataMessage::Payment(_)
3419            | DataMessage::PaymentAck(_)
3420            | DataMessage::Chunk(_)
3421            | DataMessage::PeerHints(_) => {}
3422        }
3423    }
3424}
3425
3426#[async_trait]
3427impl<S, R, F> Store for MeshStoreCore<S, R, F>
3428where
3429    S: Store + Send + Sync + 'static,
3430    R: SignalingTransport + Send + Sync + 'static,
3431    F: PeerLinkFactory + Send + Sync + 'static,
3432{
3433    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
3434        self.local_store.put(hash, data).await
3435    }
3436
3437    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
3438        // Try local first
3439        if let Some(data) = self.local_store.get(hash).await? {
3440            return Ok(Some(data));
3441        }
3442
3443        // Try peers
3444        Ok(self.request_from_mesh(hash).await)
3445    }
3446
3447    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
3448        self.local_store.has(hash).await
3449    }
3450
3451    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
3452        self.local_store.delete(hash).await
3453    }
3454}
3455
3456#[cfg(test)]
3457mod tests;
3458
3459/// Type alias for simulation store.
3460pub type SimMeshStore<S> =
3461    MeshStoreCore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;