Skip to main content

hashtree_network/
mesh_store_core.rs

1//! Shared routed mesh store core.
2//!
3//! This module provides a concrete store wrapper that works with any local storage
4//! backend plus any signaling transport and peer-link factory. Both production
5//! (Nostr websockets + WebRTC) and simulation (mocks) use this same code.
6
7use async_trait::async_trait;
8use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
9use std::collections::hash_map::DefaultHasher;
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::future::Future;
12use std::hash::{Hash as _, Hasher};
13use std::ops::Range;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{oneshot, Mutex, RwLock};
18use tokio::time::Instant;
19
20use hashtree_core::{Hash, Store, StoreError};
21
22use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
23use crate::protocol::{
24    create_pubsub_frame, create_pubsub_interest, create_pubsub_inventory, create_pubsub_want,
25    create_quote_request, create_quote_response_available, create_quote_response_unavailable,
26    create_request, create_request_with_quote, create_response, encode_pubsub_frame,
27    encode_pubsub_interest, encode_pubsub_inventory, encode_pubsub_want, encode_quote_request,
28    encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
29    DataMessage, DataQuoteRequest, DataQuoteResponse, PubsubFrame, PubsubInterest, PubsubInventory,
30    PubsubWant,
31};
32use crate::pubsub_strategy::{
33    reciprocal_virtual_finish, select_reciprocal_outbound_job, OutboundJobCandidate,
34    PeerTrafficSnapshot, PubsubCandidate, PubsubSchedulerConfig,
35};
36use crate::signaling::MeshRouter;
37use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
38use crate::types::{
39    should_forward_htl, PeerHTLConfig, SignalingMessage, TimedSeenSet, MAX_HTL, MESH_EVENT_POLICY,
40};
41
42// Keep the on-disk namespace stable across the crate rename so existing peer
43// metadata does not disappear for users upgrading from the old package name.
44const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
45const RECENT_FORWARD_MISS_CAPACITY: usize = 4096;
46const MIN_RECENT_FORWARD_MISS_TTL_MS: u64 = 250;
47const PUBSUB_SEEN_CAPACITY: usize = 16_384;
48const PUBSUB_INBOX_CAPACITY: usize = 4_096;
49const PUBSUB_FRAME_CACHE_CAPACITY: usize = 4_096;
50const PUBSUB_SEEN_TTL: Duration = Duration::from_secs(120);
51
52/// Pending request awaiting response
53struct PendingRequest {
54    response_tx: oneshot::Sender<Option<Vec<u8>>>,
55    started_at: Instant,
56    queried_peers: Vec<String>,
57}
58
59struct PendingQuoteRequest {
60    response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
61    preferred_mint_url: Option<String>,
62    offered_payment_sat: u64,
63}
64
65struct PendingForwardRequest {
66    requester_ids: HashSet<String>,
67}
68
69type PeerWireStats = PeerTrafficSnapshot;
70
71struct PendingResponseSend {
72    job_id: u64,
73    peer_id: String,
74    bytes: Vec<u8>,
75    ready_at: Instant,
76    queue_sequence: u64,
77}
78
79#[async_trait]
80pub trait MeshReadSource: Send + Sync {
81    fn id(&self) -> &str;
82
83    fn is_available(&self) -> bool {
84        true
85    }
86
87    async fn get(&self, hash: &Hash) -> Option<Vec<u8>>;
88}
89
90#[derive(Debug, Clone)]
91struct NegotiatedQuote {
92    peer_id: String,
93    quote_id: u64,
94    #[allow(dead_code)]
95    mint_url: Option<String>,
96}
97
98struct IssuedQuote {
99    expires_at: Instant,
100    #[allow(dead_code)]
101    payment_sat: u64,
102    #[allow(dead_code)]
103    mint_url: Option<String>,
104}
105
106#[derive(Debug, Clone, Default)]
107struct AdaptiveSourceStats {
108    requests: u64,
109    successes: u64,
110    misses: u64,
111    failures: u64,
112    timeouts: u64,
113    srtt_ms: f64,
114    rttvar_ms: f64,
115    backoff_level: u32,
116    backed_off_until: Option<Instant>,
117    last_success_at: Option<Instant>,
118    last_failure_at: Option<Instant>,
119}
120
121#[derive(Debug, Clone)]
122enum RouteFetchOutcome {
123    Hit(Vec<u8>),
124    Miss,
125    Timeout,
126}
127
128struct InflightSourceFetch {
129    waiters: Vec<oneshot::Sender<RouteFetchOutcome>>,
130}
131
132enum SourceFetchOutcome {
133    Hit {
134        source_id: String,
135        data: Vec<u8>,
136        elapsed_ms: u64,
137    },
138    Miss {
139        source_id: String,
140    },
141    Failure {
142        source_id: String,
143    },
144}
145
146const INITIAL_SOURCE_BACKOFF_MS: u64 = 250;
147const MAX_SOURCE_BACKOFF_MS: u64 = 10_000;
148const SOURCE_SCORE_TIE_DELTA: f64 = 0.15;
149const RECENT_SOURCE_SUCCESS_WINDOW: Duration = Duration::from_secs(60);
150const ACTIVE_PEER_REQUEST_RANK_PENALTY: usize = 3;
151
152fn source_reliability_score(stats: &AdaptiveSourceStats) -> f64 {
153    (stats.successes as f64 + 1.0) / (stats.requests as f64 + 2.0)
154}
155
156fn source_latency_score(stats: &AdaptiveSourceStats) -> f64 {
157    if stats.srtt_ms <= 0.0 {
158        return 0.5;
159    }
160    (500.0 / (stats.srtt_ms + 50.0)).min(1.0)
161}
162
163fn source_has_history(stats: &AdaptiveSourceStats) -> bool {
164    stats.requests > 0
165        || stats.successes > 0
166        || stats.misses > 0
167        || stats.failures > 0
168        || stats.timeouts > 0
169}
170
171fn adaptive_source_score(stats: &AdaptiveSourceStats, now: Instant) -> f64 {
172    if let Some(backed_off_until) = stats.backed_off_until {
173        if backed_off_until > now {
174            return f64::NEG_INFINITY;
175        }
176    }
177
178    let miss_penalty = if stats.requests > 0 {
179        (stats.misses as f64 / stats.requests as f64) * 0.15
180    } else {
181        0.0
182    };
183    let failure_penalty = if stats.requests > 0 {
184        ((stats.failures + stats.timeouts) as f64 / stats.requests as f64) * 0.3
185    } else {
186        0.0
187    };
188    let recency_bonus = if stats
189        .last_success_at
190        .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
191    {
192        0.1
193    } else {
194        0.0
195    };
196
197    0.6 * source_reliability_score(stats) + 0.3 * source_latency_score(stats) + recency_bonus
198        - miss_penalty
199        - failure_penalty
200}
201
202fn peer_endpoint_has_history(stats: &crate::peer_selector::PeerStats) -> bool {
203    stats.requests_sent > 0 || stats.successes > 0 || stats.failures > 0 || stats.timeouts > 0
204}
205
206fn peer_endpoint_score(stats: &crate::peer_selector::PeerStats, now: Instant) -> f64 {
207    if stats.backed_off_until.is_some_and(|until| until > now) {
208        return f64::NEG_INFINITY;
209    }
210
211    let miss_penalty = 0.0;
212    let failure_penalty = if stats.requests_sent > 0 {
213        ((stats.failures + stats.timeouts) as f64 / stats.requests_sent as f64) * 0.3
214    } else {
215        0.0
216    };
217    let recency_bonus = if stats
218        .last_success
219        .is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
220    {
221        0.1
222    } else {
223        0.0
224    };
225
226    0.6 * stats.success_rate()
227        + 0.3
228            * source_latency_score(&AdaptiveSourceStats {
229                srtt_ms: stats.srtt_ms,
230                ..AdaptiveSourceStats::default()
231            })
232        + recency_bonus
233        - miss_penalty
234        - failure_penalty
235}
236
237#[derive(Clone)]
238enum ReadRoute {
239    Peers(Vec<String>),
240    Sources,
241}
242
243impl ReadRoute {
244    fn id(&self) -> &'static str {
245        match self {
246            Self::Peers(_) => "peers",
247            Self::Sources => "sources",
248        }
249    }
250}
251
252struct RankedReadRoute {
253    route: ReadRoute,
254    best_endpoint_id: String,
255    score: f64,
256    has_history: bool,
257}
258
259fn ranked_route_kind(route: &ReadRoute) -> u8 {
260    match route {
261        ReadRoute::Sources => 0,
262        ReadRoute::Peers(_) => 1,
263    }
264}
265
266#[derive(Debug, Clone)]
267struct MeshReadContext {
268    exclude_peer_id: Option<String>,
269    request_htl: u8,
270}
271
272impl Default for MeshReadContext {
273    fn default() -> Self {
274        Self {
275            exclude_peer_id: None,
276            request_htl: MAX_HTL,
277        }
278    }
279}
280
281/// Aggregate stats from draining currently available peer-link messages.
282#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
283pub struct DataPumpStats {
284    pub processed: usize,
285    pub request_messages: usize,
286    pub response_messages: usize,
287    pub quote_request_messages: u64,
288    pub quote_response_messages: u64,
289    pub pubsub_interest_messages: u64,
290    pub pubsub_frame_messages: u64,
291    pub pubsub_inventory_messages: u64,
292    pub pubsub_want_messages: u64,
293    pub processed_bytes: u64,
294}
295
296/// Pubsub data delivered to a local subscription.
297#[derive(Debug, Clone, PartialEq, Eq)]
298pub struct PubsubEvent {
299    pub stream_id: String,
300    pub seq: u64,
301    pub origin_peer_id: String,
302    pub from_peer_id: String,
303    pub payload: Vec<u8>,
304}
305
306/// Send-side accounting from a pubsub publish or forwarded pubsub message.
307#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
308pub struct PubsubPublishStats {
309    pub selected_peers: usize,
310    pub sent_peers: usize,
311    pub sent_bytes: u64,
312    pub deferred_peers: usize,
313}
314
315/// Production pubsub delivery strategy.
316#[derive(Debug, Clone, Copy, PartialEq, Eq)]
317pub enum PubsubDeliveryMode {
318    /// Push full frames only along advertised interest routes.
319    InterestPush,
320    /// Flood small inventories by HTL and pull payloads back along want paths.
321    HtlInvWant,
322}
323
324impl Default for PubsubDeliveryMode {
325    fn default() -> Self {
326        Self::HtlInvWant
327    }
328}
329
330/// Request dispatch strategy for peer queries.
331///
332/// `MeshStoreCore` supports two practical retrieval modes:
333/// - Flood (`usize::MAX` fanout): maximize success/latency at bandwidth cost.
334/// - Staged hedging: probe a subset first, then expand.
335#[derive(Debug, Clone, Copy)]
336pub struct RequestDispatchConfig {
337    /// Number of peers queried immediately.
338    pub initial_fanout: usize,
339    /// Number of additional peers to query on each hedge step.
340    pub hedge_fanout: usize,
341    /// Total peers allowed for this request.
342    pub max_fanout: usize,
343    /// Delay between hedge waves (ms). `0` means send all waves immediately.
344    pub hedge_interval_ms: u64,
345}
346
347impl Default for RequestDispatchConfig {
348    fn default() -> Self {
349        Self {
350            initial_fanout: usize::MAX,
351            hedge_fanout: usize::MAX,
352            max_fanout: usize::MAX,
353            hedge_interval_ms: 0,
354        }
355    }
356}
357
358/// Normalize fanout config against current peer availability.
359pub fn normalize_dispatch_config(
360    dispatch: RequestDispatchConfig,
361    available_peers: usize,
362) -> RequestDispatchConfig {
363    let mut cfg = dispatch;
364    let cap = if cfg.max_fanout == 0 {
365        available_peers
366    } else {
367        cfg.max_fanout.min(available_peers)
368    };
369    cfg.max_fanout = cap;
370    cfg.initial_fanout = if cfg.initial_fanout == 0 {
371        1
372    } else {
373        cfg.initial_fanout.min(cap.max(1))
374    };
375    cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
376        1
377    } else {
378        cfg.hedge_fanout.min(cap.max(1))
379    };
380    cfg
381}
382
383/// Build wave sizes for staged hedged dispatch.
384pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
385    if peer_count == 0 {
386        return Vec::new();
387    }
388    let cap = dispatch.max_fanout.min(peer_count);
389    if cap == 0 {
390        return Vec::new();
391    }
392
393    let mut plan = Vec::new();
394    let mut sent = 0usize;
395    let first = dispatch.initial_fanout.min(cap).max(1);
396    plan.push(first);
397    sent += first;
398
399    while sent < cap {
400        let next = dispatch.hedge_fanout.min(cap - sent).max(1);
401        plan.push(next);
402        sent += next;
403    }
404    plan
405}
406
407/// Outcome returned after waiting on a hedged dispatch wave.
408#[derive(Debug)]
409pub enum HedgedWaveAction<T> {
410    Continue,
411    Success(T),
412    Abort,
413}
414
415/// Run a staged hedged dispatch over peer index ranges.
416///
417/// This scheduler is shared by the reusable `MeshStoreCore` and the native
418/// `hashtree-cli` mesh path so tests and production use the same wave timing.
419pub async fn run_hedged_waves<T, SendWave, SendWaveFut, WaitWave, WaitWaveFut>(
420    peer_count: usize,
421    dispatch: RequestDispatchConfig,
422    request_timeout: Duration,
423    mut send_wave: SendWave,
424    mut wait_wave: WaitWave,
425) -> Option<T>
426where
427    SendWave: FnMut(Range<usize>) -> SendWaveFut,
428    SendWaveFut: Future<Output = usize>,
429    WaitWave: FnMut(Duration) -> WaitWaveFut,
430    WaitWaveFut: Future<Output = HedgedWaveAction<T>>,
431{
432    let dispatch = normalize_dispatch_config(dispatch, peer_count);
433    let wave_plan = build_hedged_wave_plan(peer_count, dispatch);
434    if wave_plan.is_empty() {
435        return None;
436    }
437
438    let deadline = Instant::now() + request_timeout;
439    let mut sent_total = 0usize;
440    let mut next_peer_idx = 0usize;
441
442    for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
443        let from = next_peer_idx;
444        let to = (next_peer_idx + wave_size).min(peer_count);
445        next_peer_idx = to;
446
447        if from == to {
448            continue;
449        }
450
451        sent_total += send_wave(from..to).await;
452        if sent_total == 0 {
453            if next_peer_idx >= peer_count {
454                break;
455            }
456            continue;
457        }
458
459        let now = Instant::now();
460        if now >= deadline {
461            break;
462        }
463        let remaining = deadline.saturating_duration_since(now);
464        let is_last_wave = wave_idx + 1 == wave_plan.len() || next_peer_idx >= peer_count;
465        let wait = if is_last_wave {
466            remaining
467        } else if dispatch.hedge_interval_ms == 0 {
468            Duration::ZERO
469        } else {
470            Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
471        };
472
473        if wait.is_zero() {
474            continue;
475        }
476
477        match wait_wave(wait).await {
478            HedgedWaveAction::Continue => {}
479            HedgedWaveAction::Success(value) => return Some(value),
480            HedgedWaveAction::Abort => break,
481        }
482    }
483
484    None
485}
486
487/// Keep selector membership aligned with currently connected peer IDs.
488pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
489    let mut selector = selector.write().await;
490    let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
491    let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
492    for peer_id in known {
493        if !current.contains(peer_id.as_str()) {
494            selector.remove_peer(&peer_id);
495        }
496    }
497    for peer_id in current_peer_ids {
498        selector.add_peer(peer_id.clone());
499    }
500}
501
502/// Response behavior profile for simulation/game-theory actors.
503///
504/// Defaults to honest behavior (always respond correctly, no extra delay).
505#[derive(Debug, Clone, Copy)]
506pub struct ResponseBehaviorConfig {
507    /// Probability that a node drops a response even when it has data.
508    pub drop_response_prob: f64,
509    /// Probability that a node responds with corrupted payload.
510    pub corrupt_response_prob: f64,
511    /// Baseline response delay before a peer starts sending any data.
512    pub extra_delay_ms: u64,
513    /// Additional delay before the first response byte becomes available.
514    pub first_byte_delay_ms: u64,
515    /// Sustained throughput for delivering large payloads. `0` disables size-based slowdown.
516    pub bytes_per_second: u64,
517    /// Probability that an otherwise honest response experiences an extra stall.
518    pub stall_response_prob: f64,
519    /// Extra delay injected when a stall event happens.
520    pub stall_delay_ms: u64,
521}
522
523impl Default for ResponseBehaviorConfig {
524    fn default() -> Self {
525        Self {
526            drop_response_prob: 0.0,
527            corrupt_response_prob: 0.0,
528            extra_delay_ms: 0,
529            first_byte_delay_ms: 0,
530            bytes_per_second: 0,
531            stall_response_prob: 0.0,
532            stall_delay_ms: 0,
533        }
534    }
535}
536
537impl ResponseBehaviorConfig {
538    fn normalized(self) -> Self {
539        Self {
540            drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
541            corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
542            extra_delay_ms: self.extra_delay_ms,
543            first_byte_delay_ms: self.first_byte_delay_ms,
544            bytes_per_second: self.bytes_per_second,
545            stall_response_prob: self.stall_response_prob.clamp(0.0, 1.0),
546            stall_delay_ms: self.stall_delay_ms,
547        }
548    }
549}
550
551/// Routing policy for request ordering + dispatch fanout.
552#[derive(Debug, Clone)]
553pub struct MeshRoutingConfig {
554    pub selection_strategy: SelectionStrategy,
555    pub fairness_enabled: bool,
556    /// Blend weight for payment-priority ranking in selector (`0.0` disables).
557    pub cashu_payment_weight: f64,
558    /// Refuse serving peers that have reached this many unpaid post-delivery settlements.
559    /// `0` disables refusal and only keeps metadata/downranking.
560    pub cashu_payment_default_block_threshold: u64,
561    /// Cashu mint URLs this node is willing to use for settlement.
562    pub cashu_accepted_mints: Vec<String>,
563    /// Preferred Cashu mint URL when initiating paid retrieval.
564    pub cashu_default_mint: Option<String>,
565    /// Baseline cap for accepting a peer-suggested mint outside the trusted set.
566    pub cashu_peer_suggested_mint_base_cap_sat: u64,
567    /// Additional sats allowed per successful delivery from that peer.
568    pub cashu_peer_suggested_mint_success_step_sat: u64,
569    /// Additional sats allowed per successful post-delivery payment received from that peer.
570    pub cashu_peer_suggested_mint_receipt_step_sat: u64,
571    /// Hard upper bound for any single peer-suggested mint quote we accept.
572    pub cashu_peer_suggested_mint_max_cap_sat: u64,
573    pub dispatch: RequestDispatchConfig,
574    pub response_behavior: ResponseBehaviorConfig,
575    pub pubsub_scheduler: PubsubSchedulerConfig,
576    pub pubsub_delivery_mode: PubsubDeliveryMode,
577}
578
579impl Default for MeshRoutingConfig {
580    fn default() -> Self {
581        Self {
582            selection_strategy: SelectionStrategy::Weighted,
583            fairness_enabled: true,
584            cashu_payment_weight: 0.0,
585            cashu_payment_default_block_threshold: 0,
586            cashu_accepted_mints: Vec::new(),
587            cashu_default_mint: None,
588            cashu_peer_suggested_mint_base_cap_sat: 0,
589            cashu_peer_suggested_mint_success_step_sat: 0,
590            cashu_peer_suggested_mint_receipt_step_sat: 0,
591            cashu_peer_suggested_mint_max_cap_sat: 0,
592            dispatch: RequestDispatchConfig::default(),
593            response_behavior: ResponseBehaviorConfig::default(),
594            pubsub_scheduler: PubsubSchedulerConfig::default(),
595            pubsub_delivery_mode: PubsubDeliveryMode::HtlInvWant,
596        }
597    }
598}
599
600/// Routed mesh store core that works with any storage backend and transport
601/// implementation.
602///
603/// This is the shared code between production and simulation.
604/// - Production: `MeshStoreCore<LmdbStore, NostrRelayTransport, WebRtcPeerLinkFactory>`
605/// - Simulation: `MeshStoreCore<MemoryStore, MockRelayTransport, MockConnectionFactory>`
606pub struct MeshStoreCore<S, R, F>
607where
608    S: Store + Send + Sync + 'static,
609    R: SignalingTransport + Send + Sync + 'static,
610    F: PeerLinkFactory + Send + Sync + 'static,
611{
612    /// Local backing store
613    local_store: Arc<S>,
614    /// Mesh router (handles peer discovery and connection)
615    signaling: Arc<MeshRouter<R, F>>,
616    /// Per-peer HTL config
617    htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
618    /// Pending requests we sent
619    pending_requests: RwLock<HashMap<String, PendingRequest>>,
620    /// Pending quote negotiations keyed by requested hash.
621    pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
622    /// Forwarded peer requests currently being resolved through the mesh/upstream.
623    pending_forward_requests: RwLock<HashMap<String, PendingForwardRequest>>,
624    /// Bounded negative cache for recently forwarded misses/timeouts.
625    recent_forward_misses: Mutex<TimedSeenSet>,
626    /// Quotes we issued to peers and will accept exactly once until expiry.
627    issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
628    /// Monotonic quote identifier generator.
629    next_quote_id: RwLock<u64>,
630    /// Non-peer read sources such as upstream Blossom servers.
631    read_sources: RwLock<HashMap<String, Arc<dyn MeshReadSource>>>,
632    /// Adaptive health stats for non-peer read sources.
633    read_source_stats: RwLock<HashMap<String, AdaptiveSourceStats>>,
634    /// Shared in-flight upstream reads keyed by hash.
635    inflight_source_fetches: Mutex<HashMap<String, InflightSourceFetch>>,
636    /// Adaptive selector for peer ordering.
637    peer_selector: RwLock<PeerSelector>,
638    /// Active per-peer in-flight reads so concurrent block fetches spread across peers.
639    peer_active_requests: RwLock<HashMap<String, usize>>,
640    /// Actual wire traffic stats used for upload-side reciprocity scheduling.
641    peer_wire_stats: RwLock<HashMap<String, PeerWireStats>>,
642    /// Streams this node wants delivered locally.
643    pubsub_local_interests: RwLock<HashSet<String>>,
644    /// Current sequence per local stream interest.
645    pubsub_local_interest_versions: RwLock<HashMap<String, u64>>,
646    /// Reverse pubsub routes: stream id -> peers with local/downstream interest.
647    pubsub_peer_interests: RwLock<HashMap<String, HashSet<String>>>,
648    /// Route owner for each downstream subscriber interest.
649    pubsub_interest_routes: RwLock<HashMap<(String, String), String>>,
650    /// Latest interest sequence observed per subscriber/stream.
651    pubsub_interest_versions: RwLock<HashMap<(String, String), u64>>,
652    /// Bounded dedupe for pubsub interest floods.
653    pubsub_seen_interests: Mutex<TimedSeenSet>,
654    /// Bounded dedupe for pubsub data frames.
655    pubsub_seen_frames: Mutex<TimedSeenSet>,
656    /// Bounded dedupe for pubsub inventory floods.
657    pubsub_seen_inventories: Mutex<TimedSeenSet>,
658    /// Bounded dedupe for pubsub wants by requesting peer.
659    pubsub_seen_wants: Mutex<TimedSeenSet>,
660    /// First upstream peer that announced each inventory key.
661    pubsub_inventory_routes: RwLock<HashMap<String, String>>,
662    /// Downstream peers waiting for a payload after sending a want.
663    pubsub_want_routes: RwLock<HashMap<String, HashSet<String>>>,
664    /// Dedupe for wants this node already sent upstream.
665    pubsub_upstream_wants: Mutex<TimedSeenSet>,
666    /// Small payload cache for serving wants after inventory-first announcements.
667    pubsub_frame_cache: Mutex<VecDeque<(String, PubsubFrame)>>,
668    /// Local pubsub delivery inbox.
669    pubsub_inbox: Mutex<VecDeque<PubsubEvent>>,
670    /// Per stream/peer deferred counts for aging pubsub strategies.
671    pubsub_deferred_counts: RwLock<HashMap<(String, String), u64>>,
672    /// Monotonic sequence for locally originated pubsub interest updates.
673    next_pubsub_interest_seq: AtomicU64,
674    /// Pending content responses waiting for upload arbitration.
675    pending_response_sends: Mutex<Vec<PendingResponseSend>>,
676    /// Upload response scheduler state.
677    response_scheduler_running: AtomicBool,
678    /// Monotonic id for queued response sends.
679    next_response_job_id: AtomicU64,
680    /// Routing/dispatch configuration.
681    routing: MeshRoutingConfig,
682    /// Request timeout
683    request_timeout: Duration,
684    /// Debug mode
685    debug: bool,
686    /// Running flag
687    running: RwLock<bool>,
688}
689
690impl<S, R, F> MeshStoreCore<S, R, F>
691where
692    S: Store + Send + Sync + 'static,
693    R: SignalingTransport + Send + Sync + 'static,
694    F: PeerLinkFactory + Send + Sync + 'static,
695{
696    /// Create a new routed mesh store core.
697    pub fn new(
698        local_store: Arc<S>,
699        signaling: Arc<MeshRouter<R, F>>,
700        request_timeout: Duration,
701        debug: bool,
702    ) -> Self {
703        Self::new_with_routing(
704            local_store,
705            signaling,
706            request_timeout,
707            debug,
708            Default::default(),
709        )
710    }
711
712    /// Create a new routed mesh store core with explicit routing configuration.
713    pub fn new_with_routing(
714        local_store: Arc<S>,
715        signaling: Arc<MeshRouter<R, F>>,
716        request_timeout: Duration,
717        debug: bool,
718        routing: MeshRoutingConfig,
719    ) -> Self {
720        let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
721        selector.set_fairness(routing.fairness_enabled);
722        selector.set_cashu_payment_weight(routing.cashu_payment_weight);
723        Self {
724            local_store,
725            signaling,
726            htl_configs: RwLock::new(HashMap::new()),
727            pending_requests: RwLock::new(HashMap::new()),
728            pending_quotes: RwLock::new(HashMap::new()),
729            pending_forward_requests: RwLock::new(HashMap::new()),
730            recent_forward_misses: Mutex::new(TimedSeenSet::new(
731                RECENT_FORWARD_MISS_CAPACITY,
732                Self::recent_forward_miss_ttl(request_timeout),
733            )),
734            issued_quotes: RwLock::new(HashMap::new()),
735            next_quote_id: RwLock::new(1),
736            read_sources: RwLock::new(HashMap::new()),
737            read_source_stats: RwLock::new(HashMap::new()),
738            inflight_source_fetches: Mutex::new(HashMap::new()),
739            peer_selector: RwLock::new(selector),
740            peer_active_requests: RwLock::new(HashMap::new()),
741            peer_wire_stats: RwLock::new(HashMap::new()),
742            pubsub_local_interests: RwLock::new(HashSet::new()),
743            pubsub_local_interest_versions: RwLock::new(HashMap::new()),
744            pubsub_peer_interests: RwLock::new(HashMap::new()),
745            pubsub_interest_routes: RwLock::new(HashMap::new()),
746            pubsub_interest_versions: RwLock::new(HashMap::new()),
747            pubsub_seen_interests: Mutex::new(TimedSeenSet::new(
748                PUBSUB_SEEN_CAPACITY,
749                PUBSUB_SEEN_TTL,
750            )),
751            pubsub_seen_frames: Mutex::new(TimedSeenSet::new(
752                PUBSUB_SEEN_CAPACITY,
753                PUBSUB_SEEN_TTL,
754            )),
755            pubsub_seen_inventories: Mutex::new(TimedSeenSet::new(
756                PUBSUB_SEEN_CAPACITY,
757                PUBSUB_SEEN_TTL,
758            )),
759            pubsub_seen_wants: Mutex::new(TimedSeenSet::new(PUBSUB_SEEN_CAPACITY, PUBSUB_SEEN_TTL)),
760            pubsub_inventory_routes: RwLock::new(HashMap::new()),
761            pubsub_want_routes: RwLock::new(HashMap::new()),
762            pubsub_upstream_wants: Mutex::new(TimedSeenSet::new(
763                PUBSUB_SEEN_CAPACITY,
764                PUBSUB_SEEN_TTL,
765            )),
766            pubsub_frame_cache: Mutex::new(VecDeque::new()),
767            pubsub_inbox: Mutex::new(VecDeque::new()),
768            pubsub_deferred_counts: RwLock::new(HashMap::new()),
769            next_pubsub_interest_seq: AtomicU64::new(1),
770            pending_response_sends: Mutex::new(Vec::new()),
771            response_scheduler_running: AtomicBool::new(false),
772            next_response_job_id: AtomicU64::new(1),
773            routing,
774            request_timeout,
775            debug,
776            running: RwLock::new(false),
777        }
778    }
779
780    fn recent_forward_miss_ttl(request_timeout: Duration) -> Duration {
781        let ttl_ms = request_timeout
782            .as_millis()
783            .saturating_mul(2)
784            .max(MIN_RECENT_FORWARD_MISS_TTL_MS as u128)
785            .min(u64::MAX as u128) as u64;
786        Duration::from_millis(ttl_ms)
787    }
788
789    /// Start the store (begin listening for messages)
790    pub async fn start(&self) -> Result<(), TransportError> {
791        *self.running.write().await = true;
792
793        // Send initial hello
794        self.signaling.send_hello(vec![]).await?;
795
796        Ok(())
797    }
798
799    /// Stop the store
800    pub async fn stop(&self) {
801        *self.running.write().await = false;
802    }
803
804    /// Process incoming signaling message
805    pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
806        // When a new peer connects, initialize their HTL config
807        let peer_id = msg.peer_id().to_string();
808        {
809            let mut configs = self.htl_configs.write().await;
810            if !configs.contains_key(&peer_id) {
811                configs.insert(peer_id.clone(), PeerHTLConfig::random());
812            }
813        }
814        self.peer_selector.write().await.add_peer(peer_id.clone());
815
816        let result = self.signaling.handle_message(msg).await;
817        if result.is_ok() {
818            self.announce_pubsub_interests_to_peer(&peer_id).await;
819        }
820        result
821    }
822
823    /// Get signaling manager reference
824    pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
825        &self.signaling
826    }
827
828    fn response_behavior(&self) -> ResponseBehaviorConfig {
829        self.routing.response_behavior.normalized()
830    }
831
832    async fn record_peer_wire_sent(&self, peer_id: &str, bytes: u64) {
833        if bytes == 0 {
834            return;
835        }
836        let mut stats = self.peer_wire_stats.write().await;
837        let entry = stats.entry(peer_id.to_string()).or_default();
838        entry.bytes_sent = entry.bytes_sent.saturating_add(bytes);
839    }
840
841    async fn record_peer_wire_received(&self, peer_id: &str, bytes: u64) {
842        if bytes == 0 {
843            return;
844        }
845        let mut stats = self.peer_wire_stats.write().await;
846        let entry = stats.entry(peer_id.to_string()).or_default();
847        entry.bytes_received = entry.bytes_received.saturating_add(bytes);
848    }
849
850    /// Record ingress from a peer that matched local or downstream demand.
851    ///
852    /// Raw bytes are tracked separately in `record_peer_wire_received`; this
853    /// counter is the reciprocity signal used by shared outbound scheduling.
854    pub async fn record_useful_bytes_received_from_peer(&self, peer_id: &str, bytes: u64) {
855        if bytes == 0 {
856            return;
857        }
858        let mut stats = self.peer_wire_stats.write().await;
859        let entry = stats.entry(peer_id.to_string()).or_default();
860        entry.useful_bytes_received = entry.useful_bytes_received.saturating_add(bytes);
861    }
862
863    /// Snapshot peer traffic for production pubsub scheduling or diagnostics.
864    pub async fn peer_traffic_snapshot(&self, peer_id: &str) -> PeerTrafficSnapshot {
865        self.peer_wire_stats
866            .read()
867            .await
868            .get(peer_id)
869            .copied()
870            .unwrap_or_default()
871    }
872
873    /// Snapshot all known peer traffic for production pubsub scheduling.
874    pub async fn peer_traffic_snapshots(&self) -> HashMap<String, PeerTrafficSnapshot> {
875        self.peer_wire_stats.read().await.clone()
876    }
877
878    fn pubsub_key(origin_peer_id: &str, stream_id: &str, seq: u64) -> String {
879        format!("{origin_peer_id}:{stream_id}:{seq}")
880    }
881
882    fn pubsub_frame_key(frame: &PubsubFrame) -> String {
883        Self::pubsub_key(&frame.origin_peer_id, &frame.stream_id, frame.seq)
884    }
885
886    fn pubsub_interest_key(interest: &PubsubInterest) -> String {
887        format!(
888            "{}:{}:{}:{}",
889            interest.subscriber_peer_id, interest.stream_id, interest.seq, interest.active
890        )
891    }
892
893    fn next_pubsub_interest_seq(&self) -> u64 {
894        self.next_pubsub_interest_seq
895            .fetch_add(1, Ordering::Relaxed)
896    }
897
898    async fn record_peer_pubsub_wire_sent(&self, peer_id: &str, bytes: u64, bandwidth_debt: f64) {
899        if bytes == 0 {
900            return;
901        }
902        let mut stats = self.peer_wire_stats.write().await;
903        let entry = stats.entry(peer_id.to_string()).or_default();
904        entry.bytes_sent = entry.bytes_sent.saturating_add(bytes);
905        entry.bandwidth_debt = bandwidth_debt;
906    }
907
908    async fn send_pubsub_interest_to_peers(
909        &self,
910        interest: &PubsubInterest,
911        exclude_peer_id: Option<&str>,
912    ) -> PubsubPublishStats {
913        if !should_forward_htl(interest.htl) {
914            return PubsubPublishStats::default();
915        }
916
917        let mut peer_ids = self.signaling.peer_ids().await;
918        peer_ids.sort();
919        peer_ids.retain(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude));
920
921        let bytes = encode_pubsub_interest(interest);
922        let mut stats = PubsubPublishStats {
923            selected_peers: peer_ids.len(),
924            ..Default::default()
925        };
926        for peer_id in peer_ids {
927            let Some(channel) = self.signaling.get_channel(&peer_id).await else {
928                continue;
929            };
930            if channel.send(bytes.clone()).await.is_ok() {
931                stats.sent_peers += 1;
932                stats.sent_bytes = stats.sent_bytes.saturating_add(bytes.len() as u64);
933                self.record_peer_wire_sent(&peer_id, bytes.len() as u64)
934                    .await;
935            }
936        }
937        stats
938    }
939
940    async fn announce_pubsub_interests_to_peer(&self, peer_id: &str) {
941        let mut interests = self
942            .pubsub_local_interests
943            .read()
944            .await
945            .iter()
946            .cloned()
947            .collect::<Vec<_>>();
948        interests.sort();
949        if interests.is_empty() {
950            return;
951        }
952
953        let interests = {
954            let versions = self.pubsub_local_interest_versions.read().await;
955            interests
956                .into_iter()
957                .filter_map(|stream_id| {
958                    versions
959                        .get(&stream_id)
960                        .copied()
961                        .map(|seq| (stream_id, seq))
962                })
963                .collect::<Vec<_>>()
964        };
965
966        for (stream_id, seq) in interests {
967            let interest = create_pubsub_interest(
968                stream_id,
969                self.signaling.peer_id().to_string(),
970                seq,
971                true,
972                MAX_HTL,
973            );
974            let Some(channel) = self.signaling.get_channel(peer_id).await else {
975                continue;
976            };
977            let bytes = encode_pubsub_interest(&interest);
978            if channel.send(bytes.clone()).await.is_ok() {
979                self.record_peer_wire_sent(peer_id, bytes.len() as u64)
980                    .await;
981            }
982        }
983    }
984
985    fn remove_pubsub_peer_interest(
986        peer_interests: &mut HashMap<String, HashSet<String>>,
987        routes: &HashMap<(String, String), String>,
988        stream_id: &str,
989        peer_id: &str,
990    ) {
991        let still_has_route = routes
992            .iter()
993            .any(|((stream, _subscriber), peer)| stream == stream_id && peer == peer_id);
994        if still_has_route {
995            return;
996        }
997        if let Some(peers) = peer_interests.get_mut(stream_id) {
998            peers.remove(peer_id);
999            if peers.is_empty() {
1000                peer_interests.remove(stream_id);
1001            }
1002        }
1003    }
1004
1005    async fn apply_pubsub_interest_route(
1006        &self,
1007        from_peer: &str,
1008        interest: &PubsubInterest,
1009    ) -> bool {
1010        if interest.stream_id.is_empty() || interest.subscriber_peer_id.is_empty() {
1011            return false;
1012        }
1013        if interest.subscriber_peer_id == self.signaling.peer_id() {
1014            return false;
1015        }
1016
1017        let interest_key = Self::pubsub_interest_key(interest);
1018        if !self
1019            .pubsub_seen_interests
1020            .lock()
1021            .await
1022            .insert_if_new(interest_key)
1023        {
1024            return false;
1025        }
1026
1027        let route_key = (
1028            interest.stream_id.clone(),
1029            interest.subscriber_peer_id.clone(),
1030        );
1031        {
1032            let mut versions = self.pubsub_interest_versions.write().await;
1033            if versions
1034                .get(&route_key)
1035                .is_some_and(|latest| *latest >= interest.seq)
1036            {
1037                return false;
1038            }
1039            versions.insert(route_key.clone(), interest.seq);
1040        }
1041
1042        let mut peer_interests = self.pubsub_peer_interests.write().await;
1043        let mut routes = self.pubsub_interest_routes.write().await;
1044        if interest.active {
1045            if let Some(previous_peer) = routes.insert(route_key, from_peer.to_string()) {
1046                if previous_peer != from_peer {
1047                    Self::remove_pubsub_peer_interest(
1048                        &mut peer_interests,
1049                        &routes,
1050                        &interest.stream_id,
1051                        &previous_peer,
1052                    );
1053                }
1054            }
1055            peer_interests
1056                .entry(interest.stream_id.clone())
1057                .or_default()
1058                .insert(from_peer.to_string());
1059        } else if let Some(previous_peer) = routes.remove(&route_key) {
1060            Self::remove_pubsub_peer_interest(
1061                &mut peer_interests,
1062                &routes,
1063                &interest.stream_id,
1064                &previous_peer,
1065            );
1066        } else {
1067            Self::remove_pubsub_peer_interest(
1068                &mut peer_interests,
1069                &routes,
1070                &interest.stream_id,
1071                from_peer,
1072            );
1073        }
1074
1075        true
1076    }
1077
1078    async fn interested_pubsub_peers(
1079        &self,
1080        stream_id: &str,
1081        exclude_peer_id: Option<&str>,
1082    ) -> Vec<String> {
1083        let connected = self
1084            .signaling
1085            .peer_ids()
1086            .await
1087            .into_iter()
1088            .collect::<HashSet<_>>();
1089        let mut peers = self
1090            .pubsub_peer_interests
1091            .read()
1092            .await
1093            .get(stream_id)
1094            .map(|peers| peers.iter().cloned().collect::<Vec<_>>())
1095            .unwrap_or_default();
1096        peers.retain(|peer_id| {
1097            connected.contains(peer_id) && exclude_peer_id.is_none_or(|exclude| peer_id != exclude)
1098        });
1099        peers.sort();
1100        peers
1101    }
1102
1103    async fn decrement_pubsub_htl_for_peer(&self, peer_id: &str, htl: u8) -> u8 {
1104        let htl_config = {
1105            let configs = self.htl_configs.read().await;
1106            configs
1107                .get(peer_id)
1108                .cloned()
1109                .unwrap_or_else(PeerHTLConfig::random)
1110        };
1111        htl_config.decrement_with_policy(htl, &MESH_EVENT_POLICY)
1112    }
1113
1114    async fn send_pubsub_inventory_to_peers(
1115        &self,
1116        inv: &PubsubInventory,
1117        peer_ids: &[String],
1118    ) -> PubsubPublishStats {
1119        if peer_ids.is_empty() || !should_forward_htl(inv.htl) {
1120            return PubsubPublishStats::default();
1121        }
1122
1123        let mut stats = PubsubPublishStats {
1124            selected_peers: peer_ids.len(),
1125            ..Default::default()
1126        };
1127        for peer_id in peer_ids {
1128            let send_htl = self.decrement_pubsub_htl_for_peer(peer_id, inv.htl).await;
1129            if !should_forward_htl(send_htl) {
1130                continue;
1131            }
1132            let Some(channel) = self.signaling.get_channel(peer_id).await else {
1133                continue;
1134            };
1135            let mut outgoing = inv.clone();
1136            outgoing.htl = send_htl;
1137            let bytes = encode_pubsub_inventory(&outgoing);
1138            let message_bytes = bytes.len() as u64;
1139            if channel.send(bytes).await.is_ok() {
1140                stats.sent_peers += 1;
1141                stats.sent_bytes = stats.sent_bytes.saturating_add(message_bytes);
1142                self.record_peer_wire_sent(peer_id, message_bytes).await;
1143            }
1144        }
1145        stats
1146    }
1147
1148    async fn flood_pubsub_inventory(
1149        &self,
1150        inv: &PubsubInventory,
1151        exclude_peer_id: Option<&str>,
1152    ) -> PubsubPublishStats {
1153        let mut peer_ids = self.signaling.peer_ids().await;
1154        peer_ids.sort();
1155        peer_ids.retain(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude));
1156        self.send_pubsub_inventory_to_peers(inv, &peer_ids).await
1157    }
1158
1159    async fn send_pubsub_want_to_peer(&self, want: &PubsubWant, peer_id: &str) -> bool {
1160        let Some(channel) = self.signaling.get_channel(peer_id).await else {
1161            return false;
1162        };
1163        let bytes = encode_pubsub_want(want);
1164        let message_bytes = bytes.len() as u64;
1165        match channel.send(bytes).await {
1166            Ok(()) => {
1167                self.record_peer_wire_sent(peer_id, message_bytes).await;
1168                true
1169            }
1170            Err(_) => false,
1171        }
1172    }
1173
1174    async fn send_pubsub_want_upstream(
1175        &self,
1176        key: &str,
1177        want: &PubsubWant,
1178        exclude_peer_id: Option<&str>,
1179    ) -> bool {
1180        let upstream = {
1181            let routes = self.pubsub_inventory_routes.read().await;
1182            routes.get(key).cloned()
1183        };
1184        let Some(upstream) = upstream else {
1185            return false;
1186        };
1187        if exclude_peer_id.is_some_and(|exclude| exclude == upstream) {
1188            return false;
1189        }
1190        let want_key = format!("{key}:{upstream}");
1191        if !self
1192            .pubsub_upstream_wants
1193            .lock()
1194            .await
1195            .insert_if_new(want_key)
1196        {
1197            return false;
1198        }
1199        self.send_pubsub_want_to_peer(want, &upstream).await
1200    }
1201
1202    async fn cache_pubsub_frame(&self, key: String, frame: PubsubFrame) {
1203        let mut cache = self.pubsub_frame_cache.lock().await;
1204        if let Some(index) = cache.iter().position(|(cached_key, _)| cached_key == &key) {
1205            cache.remove(index);
1206        }
1207        cache.push_back((key, frame));
1208        while cache.len() > PUBSUB_FRAME_CACHE_CAPACITY {
1209            cache.pop_front();
1210        }
1211    }
1212
1213    async fn cached_pubsub_frame(&self, key: &str) -> Option<PubsubFrame> {
1214        self.pubsub_frame_cache
1215            .lock()
1216            .await
1217            .iter()
1218            .find_map(|(cached_key, frame)| {
1219                if cached_key == key {
1220                    Some(frame.clone())
1221                } else {
1222                    None
1223                }
1224            })
1225    }
1226
1227    async fn remember_pubsub_want_peer(&self, key: String, from_peer: &str) -> bool {
1228        let mut routes = self.pubsub_want_routes.write().await;
1229        routes.entry(key).or_default().insert(from_peer.to_string())
1230    }
1231
1232    async fn take_pubsub_want_peers(
1233        &self,
1234        key: &str,
1235        exclude_peer_id: Option<&str>,
1236    ) -> Vec<String> {
1237        let connected = self
1238            .signaling
1239            .peer_ids()
1240            .await
1241            .into_iter()
1242            .collect::<HashSet<_>>();
1243        let mut peers = self
1244            .pubsub_want_routes
1245            .write()
1246            .await
1247            .remove(key)
1248            .map(|peers| peers.into_iter().collect::<Vec<_>>())
1249            .unwrap_or_default();
1250        peers.retain(|peer_id| {
1251            connected.contains(peer_id) && exclude_peer_id.is_none_or(|exclude| peer_id != exclude)
1252        });
1253        peers.sort();
1254        peers
1255    }
1256
1257    async fn select_pubsub_peers(
1258        &self,
1259        stream_id: &str,
1260        seq: u64,
1261        message_bytes: u64,
1262        peer_ids: &[String],
1263    ) -> (Vec<String>, Vec<String>) {
1264        let traffic = self.peer_wire_stats.read().await;
1265        let deferred_counts = self.pubsub_deferred_counts.read().await;
1266        let candidates = peer_ids
1267            .iter()
1268            .map(|peer_id| PubsubCandidate {
1269                peer_id: peer_id.clone(),
1270                traffic: traffic.get(peer_id).copied().unwrap_or_default(),
1271                deferred_count: deferred_counts
1272                    .get(&(stream_id.to_string(), peer_id.clone()))
1273                    .copied()
1274                    .unwrap_or_default(),
1275            })
1276            .collect::<Vec<_>>();
1277        drop(deferred_counts);
1278        drop(traffic);
1279
1280        let selection = self.routing.pubsub_scheduler.select(
1281            stream_id,
1282            seq,
1283            self.signaling.peer_id(),
1284            message_bytes,
1285            &candidates,
1286        );
1287
1288        {
1289            let mut deferred_counts = self.pubsub_deferred_counts.write().await;
1290            for peer_id in &selection.deferred {
1291                *deferred_counts
1292                    .entry((stream_id.to_string(), peer_id.clone()))
1293                    .or_insert(0) += 1;
1294            }
1295            for peer_id in &selection.selected {
1296                deferred_counts.remove(&(stream_id.to_string(), peer_id.clone()));
1297            }
1298        }
1299
1300        (selection.selected, selection.deferred)
1301    }
1302
1303    async fn send_pubsub_frame_to_peers(
1304        &self,
1305        frame: &PubsubFrame,
1306        peer_ids: &[String],
1307    ) -> PubsubPublishStats {
1308        if peer_ids.is_empty() || !should_forward_htl(frame.htl) {
1309            return PubsubPublishStats::default();
1310        }
1311
1312        let bytes = encode_pubsub_frame(frame);
1313        let message_bytes = bytes.len() as u64;
1314        let (selected, deferred) = self
1315            .select_pubsub_peers(&frame.stream_id, frame.seq, message_bytes, peer_ids)
1316            .await;
1317        let mut stats = PubsubPublishStats {
1318            selected_peers: selected.len(),
1319            deferred_peers: deferred.len(),
1320            ..Default::default()
1321        };
1322
1323        for peer_id in selected {
1324            let Some(channel) = self.signaling.get_channel(&peer_id).await else {
1325                continue;
1326            };
1327            let snapshot = self.peer_traffic_snapshot(&peer_id).await;
1328            let bandwidth_debt = reciprocal_virtual_finish(snapshot, message_bytes);
1329            if channel.send(bytes.clone()).await.is_ok() {
1330                stats.sent_peers += 1;
1331                stats.sent_bytes = stats.sent_bytes.saturating_add(message_bytes);
1332                self.record_peer_pubsub_wire_sent(&peer_id, message_bytes, bandwidth_debt)
1333                    .await;
1334            }
1335        }
1336
1337        stats
1338    }
1339
1340    async fn enqueue_pubsub_event(&self, event: PubsubEvent) {
1341        let mut inbox = self.pubsub_inbox.lock().await;
1342        inbox.push_back(event);
1343        while inbox.len() > PUBSUB_INBOX_CAPACITY {
1344            inbox.pop_front();
1345        }
1346    }
1347
1348    /// Subscribe this node to a pubsub stream and advertise that interest.
1349    pub async fn subscribe_pubsub(
1350        self: &Arc<Self>,
1351        stream_id: impl Into<String>,
1352    ) -> PubsubPublishStats {
1353        let stream_id = stream_id.into();
1354        if stream_id.is_empty() {
1355            return PubsubPublishStats::default();
1356        }
1357        self.pubsub_local_interests
1358            .write()
1359            .await
1360            .insert(stream_id.clone());
1361        let seq = {
1362            let mut versions = self.pubsub_local_interest_versions.write().await;
1363            match versions.get(&stream_id).copied() {
1364                Some(seq) => seq,
1365                None => {
1366                    let seq = self.next_pubsub_interest_seq();
1367                    versions.insert(stream_id.clone(), seq);
1368                    seq
1369                }
1370            }
1371        };
1372        let interest = create_pubsub_interest(
1373            stream_id,
1374            self.signaling.peer_id().to_string(),
1375            seq,
1376            true,
1377            MAX_HTL,
1378        );
1379        self.send_pubsub_interest_to_peers(&interest, None).await
1380    }
1381
1382    /// Stop local delivery for a pubsub stream and advertise the withdrawn interest.
1383    pub async fn unsubscribe_pubsub(
1384        self: &Arc<Self>,
1385        stream_id: impl Into<String>,
1386    ) -> PubsubPublishStats {
1387        let stream_id = stream_id.into();
1388        if stream_id.is_empty() {
1389            return PubsubPublishStats::default();
1390        }
1391        self.pubsub_local_interests.write().await.remove(&stream_id);
1392        self.pubsub_local_interest_versions
1393            .write()
1394            .await
1395            .remove(&stream_id);
1396        let interest = create_pubsub_interest(
1397            stream_id,
1398            self.signaling.peer_id().to_string(),
1399            self.next_pubsub_interest_seq(),
1400            false,
1401            MAX_HTL,
1402        );
1403        self.send_pubsub_interest_to_peers(&interest, None).await
1404    }
1405
1406    /// Publish bytes on a pubsub stream through the configured mesh delivery mode.
1407    pub async fn publish_pubsub(
1408        self: &Arc<Self>,
1409        stream_id: impl Into<String>,
1410        seq: u64,
1411        payload: Vec<u8>,
1412    ) -> PubsubPublishStats {
1413        let stream_id = stream_id.into();
1414        if stream_id.is_empty() {
1415            return PubsubPublishStats::default();
1416        }
1417        let payload_bytes = payload.len() as u64;
1418        let frame = create_pubsub_frame(
1419            stream_id.clone(),
1420            seq,
1421            self.signaling.peer_id().to_string(),
1422            payload.clone(),
1423            MAX_HTL,
1424        );
1425        let frame_key = Self::pubsub_frame_key(&frame);
1426        self.pubsub_seen_frames
1427            .lock()
1428            .await
1429            .insert_if_new(frame_key.clone());
1430        self.cache_pubsub_frame(frame_key, frame.clone()).await;
1431
1432        if self
1433            .pubsub_local_interests
1434            .read()
1435            .await
1436            .contains(&stream_id)
1437        {
1438            self.enqueue_pubsub_event(PubsubEvent {
1439                stream_id: stream_id.clone(),
1440                seq,
1441                origin_peer_id: self.signaling.peer_id().to_string(),
1442                from_peer_id: self.signaling.peer_id().to_string(),
1443                payload,
1444            })
1445            .await;
1446        }
1447
1448        match self.routing.pubsub_delivery_mode {
1449            PubsubDeliveryMode::InterestPush => {
1450                let peers = self.interested_pubsub_peers(&stream_id, None).await;
1451                self.send_pubsub_frame_to_peers(&frame, &peers).await
1452            }
1453            PubsubDeliveryMode::HtlInvWant => {
1454                let inv = create_pubsub_inventory(
1455                    stream_id,
1456                    seq,
1457                    self.signaling.peer_id().to_string(),
1458                    payload_bytes,
1459                    MESH_EVENT_POLICY.max_htl,
1460                );
1461                self.flood_pubsub_inventory(&inv, None).await
1462            }
1463        }
1464    }
1465
1466    /// Drain locally delivered pubsub events.
1467    pub async fn drain_pubsub_events(&self) -> Vec<PubsubEvent> {
1468        self.pubsub_inbox.lock().await.drain(..).collect()
1469    }
1470
1471    /// Connected peers that currently have local or downstream interest in a stream.
1472    pub async fn pubsub_interest_peers(&self, stream_id: &str) -> Vec<String> {
1473        self.interested_pubsub_peers(stream_id, None).await
1474    }
1475
1476    fn choose_ready_response_job(
1477        ready_jobs: &[(u64, String, usize, Instant, u64)],
1478        stats: &HashMap<String, PeerWireStats>,
1479    ) -> Option<(u64, f64)> {
1480        let jobs = ready_jobs
1481            .iter()
1482            .map(|job| OutboundJobCandidate {
1483                job_id: job.0,
1484                peer_id: job.1.clone(),
1485                message_bytes: job.2 as u64,
1486                queue_sequence: job.4,
1487            })
1488            .collect::<Vec<_>>();
1489        select_reciprocal_outbound_job(&jobs, |peer_id| {
1490            stats.get(peer_id).copied().unwrap_or_default()
1491        })
1492        .map(|choice| (choice.job_id, choice.virtual_finish))
1493    }
1494
1495    async fn enqueue_response_send(
1496        self: &Arc<Self>,
1497        peer_id: String,
1498        bytes: Vec<u8>,
1499        ready_at: Instant,
1500    ) {
1501        let job_id = self.next_response_job_id.fetch_add(1, Ordering::Relaxed);
1502        {
1503            let mut queue = self.pending_response_sends.lock().await;
1504            queue.push(PendingResponseSend {
1505                job_id,
1506                peer_id,
1507                bytes,
1508                ready_at,
1509                queue_sequence: job_id,
1510            });
1511        }
1512
1513        if self
1514            .response_scheduler_running
1515            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1516            .is_ok()
1517        {
1518            let this = Arc::clone(self);
1519            tokio::spawn(async move {
1520                this.run_response_scheduler().await;
1521            });
1522        }
1523    }
1524
1525    async fn run_response_scheduler(self: Arc<Self>) {
1526        loop {
1527            let snapshot = {
1528                let queue = self.pending_response_sends.lock().await;
1529                if queue.is_empty() {
1530                    self.response_scheduler_running
1531                        .store(false, Ordering::Release);
1532                    return;
1533                }
1534                queue
1535                    .iter()
1536                    .map(|job| {
1537                        (
1538                            job.job_id,
1539                            job.peer_id.clone(),
1540                            job.bytes.len(),
1541                            job.ready_at,
1542                            job.queue_sequence,
1543                        )
1544                    })
1545                    .collect::<Vec<_>>()
1546            };
1547
1548            let now = Instant::now();
1549            let mut earliest_ready_at: Option<Instant> = None;
1550            let mut ready_jobs = Vec::new();
1551            for job in &snapshot {
1552                if job.3 <= now {
1553                    ready_jobs.push(job.clone());
1554                } else {
1555                    earliest_ready_at = Some(match earliest_ready_at {
1556                        Some(current) => current.min(job.3),
1557                        None => job.3,
1558                    });
1559                }
1560            }
1561
1562            if ready_jobs.is_empty() {
1563                if let Some(ready_at) = earliest_ready_at {
1564                    tokio::time::sleep(ready_at.saturating_duration_since(Instant::now())).await;
1565                    continue;
1566                }
1567                self.response_scheduler_running
1568                    .store(false, Ordering::Release);
1569                return;
1570            }
1571
1572            let (selected_job_id, selected_finish) = {
1573                let stats = self.peer_wire_stats.read().await;
1574                Self::choose_ready_response_job(&ready_jobs, &stats).expect("ready response job")
1575            };
1576
1577            let selected = {
1578                let mut queue = self.pending_response_sends.lock().await;
1579                let Some(index) = queue.iter().position(|job| job.job_id == selected_job_id) else {
1580                    continue;
1581                };
1582                queue.swap_remove(index)
1583            };
1584
1585            let sent = if let Some(channel) = self.signaling.get_channel(&selected.peer_id).await {
1586                channel.send(selected.bytes.clone()).await.is_ok()
1587            } else {
1588                false
1589            };
1590
1591            let queued_peers = {
1592                let queue = self.pending_response_sends.lock().await;
1593                queue
1594                    .iter()
1595                    .map(|job| job.peer_id.clone())
1596                    .collect::<HashSet<_>>()
1597            };
1598            let mut stats = self.peer_wire_stats.write().await;
1599            let entry = stats.entry(selected.peer_id.clone()).or_default();
1600            if sent {
1601                entry.bytes_sent = entry.bytes_sent.saturating_add(selected.bytes.len() as u64);
1602                entry.bandwidth_debt = selected_finish;
1603            }
1604            if queued_peers.is_empty() {
1605                for peer_stats in stats.values_mut() {
1606                    peer_stats.bandwidth_debt = 0.0;
1607                }
1608            } else {
1609                let floor = queued_peers
1610                    .iter()
1611                    .filter_map(|peer_id| stats.get(peer_id).map(|peer| peer.bandwidth_debt))
1612                    .fold(f64::INFINITY, f64::min);
1613                if floor.is_finite() && floor > 0.0 {
1614                    for peer_id in queued_peers {
1615                        if let Some(peer_stats) = stats.get_mut(&peer_id) {
1616                            peer_stats.bandwidth_debt =
1617                                (peer_stats.bandwidth_debt - floor).max(0.0);
1618                        }
1619                    }
1620                }
1621            }
1622        }
1623    }
1624
1625    fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
1626        let mut hasher = DefaultHasher::new();
1627        peer_id.hash(&mut hasher);
1628        hash.hash(&mut hasher);
1629        salt.hash(&mut hasher);
1630        let v = hasher.finish();
1631        (v as f64) / (u64::MAX as f64)
1632    }
1633
1634    fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
1635        Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
1636    }
1637
1638    fn peer_metadata_pointer_slot_hash() -> Hash {
1639        hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
1640    }
1641
1642    fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
1643        let bytes = hex::decode(hash_hex)
1644            .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
1645        if bytes.len() != 32 {
1646            return Err(StoreError::Other(format!(
1647                "Invalid hash length {}, expected 32 bytes",
1648                bytes.len()
1649            )));
1650        }
1651        let mut hash = [0u8; 32];
1652        hash.copy_from_slice(&bytes);
1653        Ok(hash)
1654    }
1655
1656    fn should_drop_response(&self, hash: &Hash) -> bool {
1657        let p = self.response_behavior().drop_response_prob;
1658        if p <= 0.0 {
1659            return false;
1660        }
1661        self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
1662    }
1663
1664    fn should_corrupt_response(&self, hash: &Hash) -> bool {
1665        let p = self.response_behavior().corrupt_response_prob;
1666        if p <= 0.0 {
1667            return false;
1668        }
1669        self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
1670    }
1671
1672    fn should_stall_response(&self, hash: &Hash) -> bool {
1673        let p = self.response_behavior().stall_response_prob;
1674        if p <= 0.0 {
1675            return false;
1676        }
1677        self.deterministic_actor_draw(hash, 0x5A_11_5A_11_5A_11_5A_11) < p
1678    }
1679
1680    fn response_send_delay(&self, hash: &Hash, payload_len: usize) -> Duration {
1681        let behavior = self.response_behavior();
1682        let mut total_ms = behavior
1683            .extra_delay_ms
1684            .saturating_add(behavior.first_byte_delay_ms);
1685
1686        if behavior.bytes_per_second > 0 && payload_len > 0 {
1687            let throughput_ms = ((payload_len as u128) * 1000)
1688                .div_ceil(behavior.bytes_per_second as u128)
1689                .min(u64::MAX as u128) as u64;
1690            total_ms = total_ms.saturating_add(throughput_ms);
1691        }
1692
1693        if behavior.stall_delay_ms > 0 && self.should_stall_response(hash) {
1694            total_ms = total_ms.saturating_add(behavior.stall_delay_ms);
1695        }
1696
1697        Duration::from_millis(total_ms)
1698    }
1699
1700    async fn ordered_connected_peers(&self, exclude_peer_id: Option<&str>) -> Vec<String> {
1701        let current_peer_ids = self.signaling.peer_ids().await;
1702        if current_peer_ids.is_empty() {
1703            return Vec::new();
1704        }
1705
1706        sync_selector_peers(&self.peer_selector, &current_peer_ids).await;
1707        let hash_get_peer_ids: HashSet<String> = self
1708            .signaling
1709            .hash_get_peer_ids()
1710            .await
1711            .into_iter()
1712            .collect();
1713        let mut candidate_peer_ids: Vec<String> = current_peer_ids
1714            .into_iter()
1715            .filter(|peer_id| hash_get_peer_ids.contains(peer_id))
1716            .filter(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude))
1717            .collect();
1718        if candidate_peer_ids.is_empty() {
1719            return Vec::new();
1720        }
1721
1722        let current_set: HashSet<&str> = candidate_peer_ids.iter().map(String::as_str).collect();
1723        let mut selector = self.peer_selector.write().await;
1724        let mut selector_order = selector.select_peers();
1725        selector_order.retain(|peer_id| current_set.contains(peer_id.as_str()));
1726        if selector_order.is_empty() {
1727            let mut fallback = candidate_peer_ids;
1728            fallback.sort();
1729            return fallback;
1730        }
1731        let backed_off: HashMap<String, bool> = candidate_peer_ids
1732            .iter()
1733            .map(|peer_id| (peer_id.clone(), selector.is_peer_backed_off(peer_id)))
1734            .collect();
1735        drop(selector);
1736
1737        let rank: HashMap<&str, usize> = selector_order
1738            .iter()
1739            .enumerate()
1740            .map(|(idx, peer_id)| (peer_id.as_str(), idx))
1741            .collect();
1742        let active = self.peer_active_requests.read().await;
1743        candidate_peer_ids.sort_by(|left, right| {
1744            let left_backed_off = backed_off.get(left).copied().unwrap_or(false);
1745            let right_backed_off = backed_off.get(right).copied().unwrap_or(false);
1746            if left_backed_off != right_backed_off {
1747                return if left_backed_off {
1748                    std::cmp::Ordering::Greater
1749                } else {
1750                    std::cmp::Ordering::Less
1751                };
1752            }
1753            let left_rank = rank.get(left.as_str()).copied().unwrap_or(usize::MAX / 2);
1754            let right_rank = rank.get(right.as_str()).copied().unwrap_or(usize::MAX / 2);
1755            let left_load = active.get(left).copied().unwrap_or(0);
1756            let right_load = active.get(right).copied().unwrap_or(0);
1757            (left_rank + left_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY))
1758                .cmp(&(right_rank + right_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY)))
1759                .then_with(|| left.cmp(right))
1760        });
1761        candidate_peer_ids
1762    }
1763
1764    async fn reserve_peer_request(&self, peer_id: &str) {
1765        let mut active = self.peer_active_requests.write().await;
1766        *active.entry(peer_id.to_string()).or_insert(0) += 1;
1767    }
1768
1769    async fn release_peer_request(&self, peer_id: &str) {
1770        let mut active = self.peer_active_requests.write().await;
1771        let Some(count) = active.get_mut(peer_id) else {
1772            return;
1773        };
1774        if *count <= 1 {
1775            active.remove(peer_id);
1776        } else {
1777            *count -= 1;
1778        }
1779    }
1780
1781    async fn release_queried_peer_requests(&self, peer_ids: &[String]) {
1782        for peer_id in peer_ids {
1783            self.release_peer_request(peer_id).await;
1784        }
1785    }
1786
1787    fn requested_quote_mint(&self) -> Option<&str> {
1788        if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
1789            if self.routing.cashu_accepted_mints.is_empty()
1790                || self
1791                    .routing
1792                    .cashu_accepted_mints
1793                    .iter()
1794                    .any(|mint| mint == default_mint)
1795            {
1796                return Some(default_mint);
1797            }
1798        }
1799
1800        self.routing
1801            .cashu_accepted_mints
1802            .first()
1803            .map(String::as_str)
1804    }
1805
1806    fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
1807        if let Some(requested_mint) = requested_mint {
1808            if self.accepts_quote_mint(Some(requested_mint)) {
1809                return Some(requested_mint.to_string());
1810            }
1811        }
1812        if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
1813            return Some(default_mint.clone());
1814        }
1815        if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
1816            return Some(first_mint.clone());
1817        }
1818        requested_mint.map(str::to_string)
1819    }
1820
1821    fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
1822        if self.routing.cashu_accepted_mints.is_empty() {
1823            return true;
1824        }
1825
1826        let Some(mint_url) = mint_url else {
1827            return false;
1828        };
1829        self.routing
1830            .cashu_accepted_mints
1831            .iter()
1832            .any(|mint| mint == mint_url)
1833    }
1834
1835    fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
1836        let Some(mint_url) = mint_url else {
1837            return self.routing.cashu_default_mint.is_none()
1838                && self.routing.cashu_accepted_mints.is_empty();
1839        };
1840        self.routing.cashu_default_mint.as_deref() == Some(mint_url)
1841            || self
1842                .routing
1843                .cashu_accepted_mints
1844                .iter()
1845                .any(|mint| mint == mint_url)
1846    }
1847
1848    async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
1849        let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
1850        if base == 0 {
1851            return 0;
1852        }
1853
1854        let selector = self.peer_selector.read().await;
1855        let Some(stats) = selector.get_stats(peer_id) else {
1856            let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
1857            return if max_cap > 0 { base.min(max_cap) } else { base };
1858        };
1859
1860        if stats.cashu_payment_defaults > 0
1861            && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
1862        {
1863            return 0;
1864        }
1865
1866        let success_bonus = stats
1867            .successes
1868            .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
1869        let receipt_bonus = stats
1870            .cashu_payment_receipts
1871            .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
1872        let mut cap = base
1873            .saturating_add(success_bonus)
1874            .saturating_add(receipt_bonus);
1875        let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
1876        if max_cap > 0 {
1877            cap = cap.min(max_cap);
1878        }
1879        cap
1880    }
1881
1882    async fn should_accept_quote_response(
1883        &self,
1884        from_peer: &str,
1885        preferred_mint_url: Option<&str>,
1886        offered_payment_sat: u64,
1887        res: &DataQuoteResponse,
1888    ) -> bool {
1889        let Some(payment_sat) = res.p else {
1890            return false;
1891        };
1892        if payment_sat > offered_payment_sat {
1893            return false;
1894        }
1895
1896        let response_mint = res.m.as_deref();
1897        if response_mint == preferred_mint_url {
1898            return true;
1899        }
1900        if self.trusts_quote_mint(response_mint) {
1901            return true;
1902        }
1903        if response_mint.is_none() {
1904            return false;
1905        }
1906
1907        payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
1908    }
1909
1910    async fn issue_quote(
1911        &self,
1912        peer_id: &str,
1913        hash_key: &str,
1914        payment_sat: u64,
1915        ttl_ms: u32,
1916        mint_url: Option<&str>,
1917    ) -> u64 {
1918        let quote_id = {
1919            let mut next = self.next_quote_id.write().await;
1920            let quote_id = *next;
1921            *next = next.saturating_add(1);
1922            quote_id
1923        };
1924
1925        let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
1926        self.issued_quotes.write().await.insert(
1927            (peer_id.to_string(), hash_key.to_string(), quote_id),
1928            IssuedQuote {
1929                expires_at,
1930                payment_sat,
1931                mint_url: mint_url.map(str::to_string),
1932            },
1933        );
1934        quote_id
1935    }
1936
1937    async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
1938        let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
1939        let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
1940            return false;
1941        };
1942        quote.expires_at > Instant::now()
1943    }
1944
1945    async fn send_request_to_peer(
1946        &self,
1947        peer_id: &str,
1948        hash: &Hash,
1949        request_htl: u8,
1950        quote_id: Option<u64>,
1951    ) -> bool {
1952        if !should_forward_htl(request_htl) {
1953            return false;
1954        }
1955
1956        let channel = match self.signaling.get_channel(peer_id).await {
1957            Some(c) => c,
1958            None => return false,
1959        };
1960
1961        let htl_config = {
1962            let configs = self.htl_configs.read().await;
1963            configs
1964                .get(peer_id)
1965                .cloned()
1966                .unwrap_or_else(PeerHTLConfig::random)
1967        };
1968
1969        let send_htl = htl_config.decrement(request_htl);
1970        let req = match quote_id {
1971            Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
1972            None => create_request(hash, send_htl),
1973        };
1974        let request_bytes = encode_request(&req);
1975        let request_len = request_bytes.len() as u64;
1976
1977        {
1978            let mut selector = self.peer_selector.write().await;
1979            selector.record_request(peer_id, request_len);
1980        }
1981
1982        match channel.send(request_bytes).await {
1983            Ok(()) => {
1984                self.record_peer_wire_sent(peer_id, request_len).await;
1985                true
1986            }
1987            Err(_) => {
1988                self.peer_selector.write().await.record_failure(peer_id);
1989                false
1990            }
1991        }
1992    }
1993
1994    async fn send_quote_request_to_peer(
1995        &self,
1996        peer_id: &str,
1997        hash: &Hash,
1998        payment_sat: u64,
1999        ttl_ms: u32,
2000        mint_url: Option<&str>,
2001    ) -> bool {
2002        let channel = match self.signaling.get_channel(peer_id).await {
2003            Some(c) => c,
2004            None => return false,
2005        };
2006
2007        let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
2008        let request_bytes = encode_quote_request(&req);
2009        let request_len = request_bytes.len() as u64;
2010
2011        match channel.send(request_bytes).await {
2012            Ok(()) => {
2013                self.record_peer_wire_sent(peer_id, request_len).await;
2014                true
2015            }
2016            Err(_) => false,
2017        }
2018    }
2019
2020    pub async fn set_read_sources(&self, sources: Vec<Arc<dyn MeshReadSource>>) {
2021        let mut by_id = HashMap::new();
2022        let mut stats = self.read_source_stats.write().await;
2023        for source in sources {
2024            let source_id = source.id().to_string();
2025            by_id.insert(source_id.clone(), source);
2026            stats
2027                .entry(source_id)
2028                .or_insert_with(AdaptiveSourceStats::default);
2029        }
2030        *self.read_sources.write().await = by_id;
2031    }
2032
2033    async fn record_read_source_request(&self, source_id: &str) {
2034        let mut stats = self.read_source_stats.write().await;
2035        stats
2036            .entry(source_id.to_string())
2037            .or_insert_with(AdaptiveSourceStats::default)
2038            .requests += 1;
2039    }
2040
2041    async fn record_read_source_miss(&self, source_id: &str) {
2042        let mut stats = self.read_source_stats.write().await;
2043        stats
2044            .entry(source_id.to_string())
2045            .or_insert_with(AdaptiveSourceStats::default)
2046            .misses += 1;
2047    }
2048
2049    async fn record_read_source_success(&self, source_id: &str, elapsed_ms: u64) {
2050        let now = Instant::now();
2051        let mut stats = self.read_source_stats.write().await;
2052        let stats = stats
2053            .entry(source_id.to_string())
2054            .or_insert_with(AdaptiveSourceStats::default);
2055        stats.successes += 1;
2056        stats.last_success_at = Some(now);
2057        stats.backoff_level = 0;
2058        stats.backed_off_until = None;
2059        if stats.srtt_ms <= 0.0 {
2060            stats.srtt_ms = elapsed_ms as f64;
2061            stats.rttvar_ms = elapsed_ms as f64 / 2.0;
2062            return;
2063        }
2064        let elapsed = elapsed_ms as f64;
2065        stats.rttvar_ms = 0.75 * stats.rttvar_ms + 0.25 * (stats.srtt_ms - elapsed).abs();
2066        stats.srtt_ms = 0.875 * stats.srtt_ms + 0.125 * elapsed;
2067    }
2068
2069    async fn record_read_source_failure(&self, source_id: &str) {
2070        let now = Instant::now();
2071        let mut stats = self.read_source_stats.write().await;
2072        let stats = stats
2073            .entry(source_id.to_string())
2074            .or_insert_with(AdaptiveSourceStats::default);
2075        stats.failures += 1;
2076        stats.last_failure_at = Some(now);
2077        Self::apply_source_backoff(stats, now);
2078    }
2079
2080    async fn record_read_source_timeout(&self, source_id: &str) {
2081        let now = Instant::now();
2082        let mut stats = self.read_source_stats.write().await;
2083        let stats = stats
2084            .entry(source_id.to_string())
2085            .or_insert_with(AdaptiveSourceStats::default);
2086        stats.timeouts += 1;
2087        stats.last_failure_at = Some(now);
2088        Self::apply_source_backoff(stats, now);
2089    }
2090
2091    fn apply_source_backoff(stats: &mut AdaptiveSourceStats, now: Instant) {
2092        stats.backoff_level = stats.backoff_level.saturating_add(1);
2093        let backoff_ms = (INITIAL_SOURCE_BACKOFF_MS
2094            .saturating_mul(2u64.saturating_pow(stats.backoff_level.saturating_sub(1))))
2095        .min(MAX_SOURCE_BACKOFF_MS);
2096        stats.backed_off_until = Some(now + Duration::from_millis(backoff_ms));
2097    }
2098
2099    async fn ordered_read_sources(&self) -> Vec<Arc<dyn MeshReadSource>> {
2100        let sources = self.read_sources.read().await;
2101        if sources.is_empty() {
2102            return Vec::new();
2103        }
2104
2105        let mut available: Vec<Arc<dyn MeshReadSource>> = sources
2106            .values()
2107            .filter(|source| source.is_available())
2108            .cloned()
2109            .collect();
2110        if available.is_empty() {
2111            return Vec::new();
2112        }
2113
2114        let now = Instant::now();
2115        let stats = self.read_source_stats.read().await;
2116        let mut healthy: Vec<Arc<dyn MeshReadSource>> = available
2117            .iter()
2118            .filter(|source| {
2119                stats
2120                    .get(source.id())
2121                    .and_then(|s| s.backed_off_until)
2122                    .is_none_or(|until| until <= now)
2123            })
2124            .cloned()
2125            .collect();
2126        if !healthy.is_empty() {
2127            available = std::mem::take(&mut healthy);
2128        }
2129
2130        available.sort_by(|left, right| {
2131            let left_stats = stats.get(left.id()).cloned().unwrap_or_default();
2132            let right_stats = stats.get(right.id()).cloned().unwrap_or_default();
2133            adaptive_source_score(&right_stats, now)
2134                .partial_cmp(&adaptive_source_score(&left_stats, now))
2135                .unwrap_or(std::cmp::Ordering::Equal)
2136                .then_with(|| left.id().cmp(right.id()))
2137        });
2138        available
2139    }
2140
2141    async fn should_probe_multiple_read_sources(
2142        &self,
2143        ordered_sources: &[Arc<dyn MeshReadSource>],
2144    ) -> bool {
2145        if ordered_sources.len() <= 1 {
2146            return false;
2147        }
2148        let stats = self.read_source_stats.read().await;
2149        let best = stats
2150            .get(ordered_sources[0].id())
2151            .cloned()
2152            .unwrap_or_default();
2153        let second = stats
2154            .get(ordered_sources[1].id())
2155            .cloned()
2156            .unwrap_or_default();
2157        if !source_has_history(&best) || !source_has_history(&second) {
2158            return false;
2159        }
2160        let now = Instant::now();
2161        adaptive_source_score(&best, now) - adaptive_source_score(&second, now)
2162            < SOURCE_SCORE_TIE_DELTA
2163    }
2164
2165    async fn source_dispatch_for(&self, source_count: usize) -> RequestDispatchConfig {
2166        if source_count == 0 {
2167            return self.routing.dispatch;
2168        }
2169        let ordered_sources = self.ordered_read_sources().await;
2170        let probe_multiple = self
2171            .should_probe_multiple_read_sources(&ordered_sources)
2172            .await;
2173        let initial_fanout = if probe_multiple {
2174            source_count.min(2)
2175        } else {
2176            1
2177        };
2178        RequestDispatchConfig {
2179            initial_fanout,
2180            hedge_fanout: self.routing.dispatch.hedge_fanout,
2181            max_fanout: self.routing.dispatch.max_fanout.min(source_count),
2182            hedge_interval_ms: self.routing.dispatch.hedge_interval_ms,
2183        }
2184    }
2185
2186    /// Get peer count
2187    pub async fn peer_count(&self) -> usize {
2188        self.signaling.peer_count().await
2189    }
2190
2191    /// Check if we need more peers
2192    pub async fn needs_peers(&self) -> bool {
2193        self.signaling.needs_peers().await
2194    }
2195
2196    /// Re-broadcast hello to refresh discovery as topology changes.
2197    pub async fn send_hello(&self) -> Result<(), TransportError> {
2198        self.signaling.send_hello(vec![]).await
2199    }
2200
2201    /// Drain all currently available peer-link messages and handle them.
2202    ///
2203    /// This keeps the message pump logic shared between simulation and the
2204    /// default production wrapper instead of duplicating per-channel loops.
2205    pub async fn drain_available_data_messages(self: &Arc<Self>) -> DataPumpStats {
2206        let mut stats = DataPumpStats::default();
2207        let peer_ids = self.signaling.peer_ids().await;
2208        for peer_id in peer_ids {
2209            let Some(channel) = self.signaling.get_channel(&peer_id).await else {
2210                continue;
2211            };
2212
2213            while let Some(data) = channel.try_recv() {
2214                stats.processed += 1;
2215                stats.processed_bytes += data.len() as u64;
2216                if let Some(msg) = parse_message(&data) {
2217                    match msg {
2218                        DataMessage::Request(_) => stats.request_messages += 1,
2219                        DataMessage::Response(_) => stats.response_messages += 1,
2220                        DataMessage::QuoteRequest(_) => stats.quote_request_messages += 1,
2221                        DataMessage::QuoteResponse(_) => stats.quote_response_messages += 1,
2222                        DataMessage::PubsubInterest(_) => stats.pubsub_interest_messages += 1,
2223                        DataMessage::PubsubFrame(_) => stats.pubsub_frame_messages += 1,
2224                        DataMessage::PubsubInventory(_) => stats.pubsub_inventory_messages += 1,
2225                        DataMessage::PubsubWant(_) => stats.pubsub_want_messages += 1,
2226                        DataMessage::Payment(_)
2227                        | DataMessage::PaymentAck(_)
2228                        | DataMessage::Chunk(_)
2229                        | DataMessage::PeerHints(_) => {}
2230                    }
2231                }
2232                self.handle_data_message(&peer_id, &data).await;
2233            }
2234        }
2235        stats
2236    }
2237
2238    /// Apply an out-of-band payment credit to a peer's routing priority.
2239    pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
2240        self.peer_selector
2241            .write()
2242            .await
2243            .record_cashu_payment(peer_id, amount_sat);
2244    }
2245
2246    /// Record a post-delivery payment we received from a peer.
2247    pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
2248        self.peer_selector
2249            .write()
2250            .await
2251            .record_cashu_receipt(peer_id, amount_sat);
2252    }
2253
2254    /// Record that a peer failed to pay after we delivered successfully.
2255    pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
2256        self.peer_selector
2257            .write()
2258            .await
2259            .record_cashu_payment_default(peer_id);
2260    }
2261
2262    /// Snapshot routing/selection summary for inspection/debugging.
2263    pub async fn selector_summary(&self) -> crate::peer_selector::SelectorSummary {
2264        self.peer_selector.read().await.summary()
2265    }
2266
2267    fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
2268        selector.is_peer_blocked_for_payment_defaults(
2269            peer_id,
2270            self.routing.cashu_payment_default_block_threshold,
2271        )
2272    }
2273
2274    /// Export live peer metadata for inspection/debugging.
2275    pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
2276        self.peer_selector
2277            .read()
2278            .await
2279            .export_peer_metadata_snapshot()
2280    }
2281
2282    /// Snapshot current peer metadata and persist it into `local_store`.
2283    ///
2284    /// Uses content-addressed storage for the snapshot body and a reserved
2285    /// mutable pointer slot for the "latest snapshot hash".
2286    pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
2287        let snapshot = self
2288            .peer_selector
2289            .read()
2290            .await
2291            .export_peer_metadata_snapshot();
2292        let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
2293            StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
2294        })?;
2295        let snapshot_hash = hashtree_core::sha256(&bytes);
2296        let _ = self.local_store.put(snapshot_hash, bytes).await?;
2297
2298        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
2299        let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
2300        let _ = self.local_store.delete(&pointer_slot).await?;
2301        let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
2302
2303        Ok(snapshot_hash)
2304    }
2305
2306    /// Load persisted peer metadata from `local_store` if available.
2307    pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
2308        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
2309        let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
2310            return Ok(false);
2311        };
2312        let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
2313            StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
2314        })?;
2315        let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
2316
2317        let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
2318            return Ok(false);
2319        };
2320        let snapshot: PeerMetadataSnapshot =
2321            serde_json::from_slice(&snapshot_bytes).map_err(|e| {
2322                StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
2323            })?;
2324        self.peer_selector
2325            .write()
2326            .await
2327            .import_peer_metadata_snapshot(&snapshot);
2328        Ok(true)
2329    }
2330
2331    /// Request data from peers after negotiating a paid quote.
2332    ///
2333    /// If quote negotiation fails or the quoted peer does not deliver, the store
2334    /// falls back to the normal unpaid retrieval path to preserve liveness.
2335    pub async fn get_with_quote(
2336        &self,
2337        hash: &Hash,
2338        payment_sat: u64,
2339        quote_ttl: Duration,
2340    ) -> Result<Option<Vec<u8>>, StoreError> {
2341        if let Some(data) = self.local_store.get(hash).await? {
2342            return Ok(Some(data));
2343        }
2344        Ok(self
2345            .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
2346            .await)
2347    }
2348
2349    async fn request_from_peers_with_quote(
2350        &self,
2351        hash: &Hash,
2352        payment_sat: u64,
2353        quote_ttl: Duration,
2354    ) -> Option<Vec<u8>> {
2355        let ordered_peer_ids = self.ordered_connected_peers(None).await;
2356        if ordered_peer_ids.is_empty() {
2357            return None;
2358        }
2359
2360        if let Some(quote) = self
2361            .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
2362            .await
2363        {
2364            if let Some(data) = self
2365                .request_from_single_peer(hash, &quote.peer_id, MAX_HTL, Some(quote.quote_id))
2366                .await
2367            {
2368                return Some(data);
2369            }
2370        }
2371
2372        self.request_from_mesh(hash).await
2373    }
2374
2375    async fn request_quote_from_peers(
2376        &self,
2377        hash: &Hash,
2378        payment_sat: u64,
2379        quote_ttl: Duration,
2380        ordered_peer_ids: &[String],
2381    ) -> Option<NegotiatedQuote> {
2382        if ordered_peer_ids.is_empty() {
2383            return None;
2384        }
2385        let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
2386        if ttl_ms == 0 {
2387            return None;
2388        }
2389        let requested_mint = self.requested_quote_mint().map(str::to_string);
2390
2391        let hash_key = hash_to_key(hash);
2392        let (tx, rx) = oneshot::channel();
2393        self.pending_quotes.write().await.insert(
2394            hash_key.clone(),
2395            PendingQuoteRequest {
2396                response_tx: tx,
2397                preferred_mint_url: requested_mint.clone(),
2398                offered_payment_sat: payment_sat,
2399            },
2400        );
2401
2402        let rx = Arc::new(Mutex::new(rx));
2403        let result = run_hedged_waves(
2404            ordered_peer_ids.len(),
2405            self.routing.dispatch,
2406            self.request_timeout,
2407            |range| {
2408                let wave_peer_ids = ordered_peer_ids[range].to_vec();
2409                let requested_mint = requested_mint.clone();
2410                let hash = *hash;
2411                async move {
2412                    let mut sent = 0usize;
2413                    for peer_id in wave_peer_ids {
2414                        if self
2415                            .send_quote_request_to_peer(
2416                                &peer_id,
2417                                &hash,
2418                                payment_sat,
2419                                ttl_ms,
2420                                requested_mint.as_deref(),
2421                            )
2422                            .await
2423                        {
2424                            sent += 1;
2425                        }
2426                    }
2427                    sent
2428                }
2429            },
2430            |wait| {
2431                let rx = rx.clone();
2432                async move {
2433                    let mut rx = rx.lock().await;
2434                    match tokio::time::timeout(wait, &mut *rx).await {
2435                        Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
2436                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
2437                        Err(_) => HedgedWaveAction::Continue,
2438                    }
2439                }
2440            },
2441        )
2442        .await;
2443        let _ = self.pending_quotes.write().await.remove(&hash_key);
2444        result
2445    }
2446
2447    async fn request_from_single_peer(
2448        &self,
2449        hash: &Hash,
2450        peer_id: &str,
2451        request_htl: u8,
2452        quote_id: Option<u64>,
2453    ) -> Option<Vec<u8>> {
2454        let hash_key = hash_to_key(hash);
2455        let (tx, rx) = oneshot::channel();
2456        self.pending_requests.write().await.insert(
2457            hash_key.clone(),
2458            PendingRequest {
2459                response_tx: tx,
2460                started_at: Instant::now(),
2461                queried_peers: vec![peer_id.to_string()],
2462            },
2463        );
2464
2465        let mut rx = rx;
2466        if !self
2467            .send_request_to_peer(peer_id, hash, request_htl, quote_id)
2468            .await
2469        {
2470            let _ = self.pending_requests.write().await.remove(&hash_key);
2471            return None;
2472        }
2473        self.reserve_peer_request(peer_id).await;
2474
2475        if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
2476            if hashtree_core::sha256(&data) == *hash {
2477                let _ = self.local_store.put(*hash, data.clone()).await;
2478                return Some(data);
2479            }
2480        }
2481
2482        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2483            self.release_queried_peer_requests(&pending.queried_peers)
2484                .await;
2485            for peer_id in pending.queried_peers {
2486                self.peer_selector.write().await.record_timeout(&peer_id);
2487            }
2488        }
2489        let _ = self.take_forward_requesters(&hash_key).await;
2490        None
2491    }
2492
2493    async fn request_from_ordered_peers(
2494        &self,
2495        hash: &Hash,
2496        ordered_peer_ids: &[String],
2497        request_htl: u8,
2498    ) -> RouteFetchOutcome {
2499        let hash_key = hash_to_key(hash);
2500        let (tx, rx) = oneshot::channel();
2501        self.pending_requests.write().await.insert(
2502            hash_key.clone(),
2503            PendingRequest {
2504                response_tx: tx,
2505                started_at: Instant::now(),
2506                queried_peers: Vec::new(),
2507            },
2508        );
2509
2510        let rx = Arc::new(Mutex::new(rx));
2511        let result = run_hedged_waves(
2512            ordered_peer_ids.len(),
2513            self.routing.dispatch,
2514            self.request_timeout,
2515            |range| {
2516                let wave_peer_ids = ordered_peer_ids[range].to_vec();
2517                let hash = *hash;
2518                let hash_key = hash_key.clone();
2519                async move {
2520                    let mut sent = 0usize;
2521                    for peer_id in wave_peer_ids {
2522                        if self
2523                            .send_request_to_peer(&peer_id, &hash, request_htl, None)
2524                            .await
2525                        {
2526                            sent += 1;
2527                            self.reserve_peer_request(&peer_id).await;
2528                            if let Some(pending) =
2529                                self.pending_requests.write().await.get_mut(&hash_key)
2530                            {
2531                                pending.queried_peers.push(peer_id);
2532                            }
2533                        }
2534                    }
2535                    sent
2536                }
2537            },
2538            |wait| {
2539                let rx = rx.clone();
2540                async move {
2541                    let mut rx = rx.lock().await;
2542                    match tokio::time::timeout(wait, &mut *rx).await {
2543                        Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == *hash => {
2544                            HedgedWaveAction::Success(data)
2545                        }
2546                        Ok(Ok(Some(_))) => HedgedWaveAction::Continue,
2547                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
2548                        Err(_) => HedgedWaveAction::Continue,
2549                    }
2550                }
2551            },
2552        )
2553        .await;
2554
2555        let Some(data) = result else {
2556            if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2557                self.release_queried_peer_requests(&pending.queried_peers)
2558                    .await;
2559                for peer_id in pending.queried_peers {
2560                    self.peer_selector.write().await.record_timeout(&peer_id);
2561                }
2562            }
2563            let _ = self.take_forward_requesters(&hash_key).await;
2564            return RouteFetchOutcome::Timeout;
2565        };
2566
2567        let _ = self.local_store.put(*hash, data.clone()).await;
2568        RouteFetchOutcome::Hit(data)
2569    }
2570
2571    async fn request_from_read_sources_inner(&self, hash: &Hash) -> RouteFetchOutcome {
2572        let ordered_sources = self.ordered_read_sources().await;
2573        if ordered_sources.is_empty() {
2574            return RouteFetchOutcome::Miss;
2575        }
2576
2577        let dispatch = normalize_dispatch_config(
2578            self.source_dispatch_for(ordered_sources.len()).await,
2579            ordered_sources.len(),
2580        );
2581        let wave_plan = build_hedged_wave_plan(ordered_sources.len(), dispatch);
2582        if wave_plan.is_empty() {
2583            return RouteFetchOutcome::Miss;
2584        }
2585
2586        let deadline = Instant::now() + self.request_timeout;
2587        let mut pending = FuturesUnordered::new();
2588        let mut pending_source_ids = HashSet::new();
2589        let mut saw_timeout = false;
2590        let mut next_source_idx = 0usize;
2591
2592        for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
2593            let from = next_source_idx;
2594            let to = (next_source_idx + wave_size).min(ordered_sources.len());
2595            next_source_idx = to;
2596
2597            for source in &ordered_sources[from..to] {
2598                let source = Arc::clone(source);
2599                let source_id = source.id().to_string();
2600                self.record_read_source_request(&source_id).await;
2601                pending_source_ids.insert(source_id.clone());
2602                let hash = *hash;
2603                pending.push(tokio::spawn(async move {
2604                    let started_at = Instant::now();
2605                    let result = std::panic::AssertUnwindSafe(source.get(&hash))
2606                        .catch_unwind()
2607                        .await;
2608                    match result {
2609                        Ok(Some(data)) => SourceFetchOutcome::Hit {
2610                            source_id,
2611                            data,
2612                            elapsed_ms: started_at.elapsed().as_millis().max(1) as u64,
2613                        },
2614                        Ok(None) => SourceFetchOutcome::Miss { source_id },
2615                        Err(_) => SourceFetchOutcome::Failure { source_id },
2616                    }
2617                }));
2618            }
2619
2620            let is_last_wave =
2621                wave_idx + 1 == wave_plan.len() || next_source_idx >= ordered_sources.len();
2622            let window_end = if is_last_wave {
2623                deadline
2624            } else {
2625                (Instant::now() + Duration::from_millis(dispatch.hedge_interval_ms)).min(deadline)
2626            };
2627
2628            while Instant::now() < window_end {
2629                let remaining = window_end.saturating_duration_since(Instant::now());
2630                let Some(result) = tokio::time::timeout(remaining, pending.next())
2631                    .await
2632                    .ok()
2633                    .flatten()
2634                else {
2635                    break;
2636                };
2637                let Ok(outcome) = result else {
2638                    continue;
2639                };
2640                match outcome {
2641                    SourceFetchOutcome::Hit {
2642                        source_id,
2643                        data,
2644                        elapsed_ms,
2645                    } => {
2646                        pending_source_ids.remove(&source_id);
2647                        self.record_read_source_success(&source_id, elapsed_ms)
2648                            .await;
2649                        return RouteFetchOutcome::Hit(data);
2650                    }
2651                    SourceFetchOutcome::Miss { source_id } => {
2652                        pending_source_ids.remove(&source_id);
2653                        self.record_read_source_miss(&source_id).await;
2654                    }
2655                    SourceFetchOutcome::Failure { source_id } => {
2656                        pending_source_ids.remove(&source_id);
2657                        self.record_read_source_failure(&source_id).await;
2658                    }
2659                }
2660            }
2661
2662            if Instant::now() >= deadline {
2663                break;
2664            }
2665        }
2666
2667        for source_id in pending_source_ids {
2668            saw_timeout = true;
2669            self.record_read_source_timeout(&source_id).await;
2670        }
2671        if saw_timeout {
2672            RouteFetchOutcome::Timeout
2673        } else {
2674            RouteFetchOutcome::Miss
2675        }
2676    }
2677
2678    async fn request_from_read_sources(&self, hash: &Hash) -> RouteFetchOutcome {
2679        let hash_key = hash_to_key(hash);
2680        let existing_wait = {
2681            let mut inflight = self.inflight_source_fetches.lock().await;
2682            if let Some(existing) = inflight.get_mut(&hash_key) {
2683                let (tx, rx) = oneshot::channel();
2684                existing.waiters.push(tx);
2685                Some(rx)
2686            } else {
2687                inflight.insert(
2688                    hash_key.clone(),
2689                    InflightSourceFetch {
2690                        waiters: Vec::new(),
2691                    },
2692                );
2693                None
2694            }
2695        };
2696
2697        if let Some(wait) = existing_wait {
2698            return wait.await.unwrap_or(RouteFetchOutcome::Timeout);
2699        }
2700
2701        let result = self.request_from_read_sources_inner(hash).await;
2702        if let RouteFetchOutcome::Hit(hit) = &result {
2703            let _ = self.local_store.put(*hash, hit.clone()).await;
2704        }
2705        self.complete_inflight_source_fetch(&hash_key, result.clone())
2706            .await;
2707
2708        result
2709    }
2710
2711    async fn complete_inflight_source_fetch(&self, hash_key: &str, result: RouteFetchOutcome) {
2712        let waiters = self
2713            .inflight_source_fetches
2714            .lock()
2715            .await
2716            .remove(hash_key)
2717            .map(|inflight| inflight.waiters)
2718            .unwrap_or_default();
2719        for waiter in waiters {
2720            let _ = waiter.send(result.clone());
2721        }
2722    }
2723
2724    async fn cancel_pending_peer_route(&self, hash: &Hash) {
2725        let hash_key = hash_to_key(hash);
2726        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2727            self.release_queried_peer_requests(&pending.queried_peers)
2728                .await;
2729        }
2730    }
2731
2732    async fn cancel_losing_route(&self, hash: &Hash, route: &ReadRoute, winner_data: &[u8]) {
2733        match route {
2734            ReadRoute::Peers(_) => self.cancel_pending_peer_route(hash).await,
2735            ReadRoute::Sources => {
2736                let hash_key = hash_to_key(hash);
2737                self.complete_inflight_source_fetch(
2738                    &hash_key,
2739                    RouteFetchOutcome::Hit(winner_data.to_vec()),
2740                )
2741                .await;
2742            }
2743        }
2744    }
2745
2746    async fn ranked_read_routes(&self, context: &MeshReadContext) -> Vec<RankedReadRoute> {
2747        let mut routes = Vec::new();
2748        let ordered_peers = if should_forward_htl(context.request_htl) {
2749            self.ordered_connected_peers(context.exclude_peer_id.as_deref())
2750                .await
2751        } else {
2752            Vec::new()
2753        };
2754        if !ordered_peers.is_empty() {
2755            let best_peer_id = ordered_peers[0].clone();
2756            let selector = self.peer_selector.read().await;
2757            let best_peer = selector.get_stats(&best_peer_id).cloned();
2758            let now = Instant::now();
2759            let (score, has_history) = match best_peer.as_ref() {
2760                Some(stats) => (
2761                    peer_endpoint_score(stats, now),
2762                    peer_endpoint_has_history(stats),
2763                ),
2764                None => (0.0, false),
2765            };
2766            routes.push(RankedReadRoute {
2767                route: ReadRoute::Peers(ordered_peers),
2768                best_endpoint_id: format!("peer:{best_peer_id}"),
2769                score,
2770                has_history,
2771            });
2772        }
2773        let ordered_sources = self.ordered_read_sources().await;
2774        if let Some(best_source) = ordered_sources.first() {
2775            let stats = self.read_source_stats.read().await;
2776            let best_source_stats = stats.get(best_source.id()).cloned().unwrap_or_default();
2777            let now = Instant::now();
2778            routes.push(RankedReadRoute {
2779                route: ReadRoute::Sources,
2780                best_endpoint_id: format!("source:{}", best_source.id()),
2781                score: adaptive_source_score(&best_source_stats, now),
2782                has_history: source_has_history(&best_source_stats),
2783            });
2784        }
2785        if routes.len() <= 1 {
2786            return routes;
2787        }
2788
2789        routes.sort_by(|left, right| {
2790            right
2791                .score
2792                .partial_cmp(&left.score)
2793                .unwrap_or(std::cmp::Ordering::Equal)
2794                .then_with(|| ranked_route_kind(&left.route).cmp(&ranked_route_kind(&right.route)))
2795                .then_with(|| left.best_endpoint_id.cmp(&right.best_endpoint_id))
2796                .then_with(|| left.route.id().cmp(right.route.id()))
2797        });
2798        routes
2799    }
2800
2801    fn should_probe_multiple_routes(&self, routes: &[RankedReadRoute]) -> bool {
2802        if routes.len() <= 1 {
2803            return false;
2804        }
2805        if !routes[0].has_history || !routes[1].has_history {
2806            return false;
2807        }
2808        (routes[0].score - routes[1].score) < SOURCE_SCORE_TIE_DELTA
2809    }
2810
2811    async fn run_read_route(
2812        &self,
2813        hash: &Hash,
2814        route: &ReadRoute,
2815        context: &MeshReadContext,
2816    ) -> RouteFetchOutcome {
2817        match route {
2818            ReadRoute::Peers(peer_ids) => {
2819                self.request_from_ordered_peers(hash, peer_ids, context.request_htl)
2820                    .await
2821            }
2822            ReadRoute::Sources => self.request_from_read_sources(hash).await,
2823        }
2824    }
2825
2826    async fn request_from_mesh_with_context(
2827        &self,
2828        hash: &Hash,
2829        context: &MeshReadContext,
2830    ) -> Option<Vec<u8>> {
2831        let routes = self.ranked_read_routes(context).await;
2832        match routes.as_slice() {
2833            [] => None,
2834            [ranked] => match self.run_read_route(hash, &ranked.route, context).await {
2835                RouteFetchOutcome::Hit(data) => Some(data),
2836                RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
2837            },
2838            [first, second, ..] => {
2839                if self.should_probe_multiple_routes(&routes) {
2840                    let first_fut = self.run_read_route(hash, &first.route, context);
2841                    let second_fut = self.run_read_route(hash, &second.route, context);
2842                    tokio::pin!(first_fut);
2843                    tokio::pin!(second_fut);
2844                    let mut first_done = false;
2845                    let mut second_done = false;
2846                    loop {
2847                        tokio::select! {
2848                            result = &mut first_fut, if !first_done => {
2849                                first_done = true;
2850                                if let RouteFetchOutcome::Hit(data) = result {
2851                                    if !second_done {
2852                                        self.cancel_losing_route(hash, &second.route, &data).await;
2853                                    }
2854                                    return Some(data);
2855                                }
2856                            }
2857                            result = &mut second_fut, if !second_done => {
2858                                second_done = true;
2859                                if let RouteFetchOutcome::Hit(data) = result {
2860                                    if !first_done {
2861                                        self.cancel_losing_route(hash, &first.route, &data).await;
2862                                    }
2863                                    return Some(data);
2864                                }
2865                            }
2866                            else => break,
2867                        }
2868                        if first_done && second_done {
2869                            break;
2870                        }
2871                    }
2872                    None
2873                } else {
2874                    match self.run_read_route(hash, &first.route, context).await {
2875                        RouteFetchOutcome::Hit(data) => return Some(data),
2876                        RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
2877                    }
2878                    for ranked in routes.iter().skip(1) {
2879                        match self.run_read_route(hash, &ranked.route, context).await {
2880                            RouteFetchOutcome::Hit(data) => return Some(data),
2881                            RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
2882                        }
2883                    }
2884                    None
2885                }
2886            }
2887        }
2888    }
2889
2890    async fn request_from_mesh(&self, hash: &Hash) -> Option<Vec<u8>> {
2891        self.request_from_mesh_with_context(hash, &MeshReadContext::default())
2892            .await
2893    }
2894
2895    async fn begin_forward_request(&self, hash_key: &str, requester_id: &str) -> bool {
2896        let mut pending = self.pending_forward_requests.write().await;
2897        if let Some(existing) = pending.get_mut(hash_key) {
2898            existing.requester_ids.insert(requester_id.to_string());
2899            return false;
2900        }
2901
2902        let mut requester_ids = HashSet::new();
2903        requester_ids.insert(requester_id.to_string());
2904        pending.insert(
2905            hash_key.to_string(),
2906            PendingForwardRequest { requester_ids },
2907        );
2908        true
2909    }
2910
2911    async fn was_recent_forward_miss(&self, hash_key: &str) -> bool {
2912        self.recent_forward_misses.lock().await.contains(hash_key)
2913    }
2914
2915    async fn mark_recent_forward_miss(&self, hash_key: &str) {
2916        let _ = self
2917            .recent_forward_misses
2918            .lock()
2919            .await
2920            .insert_if_new(hash_key.to_string());
2921    }
2922
2923    async fn take_forward_requesters(&self, hash_key: &str) -> Vec<String> {
2924        self.pending_forward_requests
2925            .write()
2926            .await
2927            .remove(hash_key)
2928            .map(|pending| pending.requester_ids.into_iter().collect())
2929            .unwrap_or_default()
2930    }
2931
2932    async fn complete_pending_response(
2933        self: &Arc<Self>,
2934        from_peer: &str,
2935        hash: &Hash,
2936        hash_key: String,
2937        payload: Vec<u8>,
2938    ) {
2939        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
2940            self.release_queried_peer_requests(&pending.queried_peers)
2941                .await;
2942            let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
2943            self.peer_selector.write().await.record_success(
2944                from_peer,
2945                rtt_ms,
2946                payload.len() as u64,
2947            );
2948            let forward_requesters = self.take_forward_requesters(&hash_key).await;
2949            let response_bytes = if forward_requesters.is_empty() {
2950                None
2951            } else {
2952                Some(encode_response(&create_response(hash, payload.clone())))
2953            };
2954            let _ = pending.response_tx.send(Some(payload));
2955            if let Some(response_bytes) = response_bytes {
2956                for requester_id in forward_requesters {
2957                    Arc::clone(self)
2958                        .enqueue_response_send(requester_id, response_bytes.clone(), Instant::now())
2959                        .await;
2960                }
2961            }
2962        }
2963    }
2964
2965    async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
2966        if !res.a {
2967            return;
2968        }
2969
2970        let Some(quote_id) = res.q else {
2971            return;
2972        };
2973
2974        let hash_key = hash_to_key(&res.h);
2975        let (preferred_mint_url, offered_payment_sat) = {
2976            let pending_quotes = self.pending_quotes.read().await;
2977            let Some(pending) = pending_quotes.get(&hash_key) else {
2978                return;
2979            };
2980            (
2981                pending.preferred_mint_url.clone(),
2982                pending.offered_payment_sat,
2983            )
2984        };
2985        if !self
2986            .should_accept_quote_response(
2987                from_peer,
2988                preferred_mint_url.as_deref(),
2989                offered_payment_sat,
2990                &res,
2991            )
2992            .await
2993        {
2994            return;
2995        }
2996        let mut pending_quotes = self.pending_quotes.write().await;
2997        if let Some(pending) = pending_quotes.remove(&hash_key) {
2998            let _ = pending.response_tx.send(Some(NegotiatedQuote {
2999                peer_id: from_peer.to_string(),
3000                quote_id,
3001                mint_url: res.m,
3002            }));
3003        }
3004    }
3005
3006    async fn handle_response_message(
3007        self: &Arc<Self>,
3008        from_peer: &str,
3009        res: crate::protocol::DataResponse,
3010    ) {
3011        let hash_key = hash_to_key(&res.h);
3012        let hash = match crate::protocol::bytes_to_hash(&res.h) {
3013            Some(h) => h,
3014            None => return,
3015        };
3016
3017        // Ignore malformed/corrupt payload and keep waiting for a valid response.
3018        if hashtree_core::sha256(&res.d) != hash {
3019            self.peer_selector.write().await.record_failure(from_peer);
3020            if self.debug {
3021                println!("[MeshStoreCore] Ignoring invalid response payload for {hash_key}");
3022            }
3023            return;
3024        }
3025
3026        self.record_useful_bytes_received_from_peer(from_peer, res.d.len() as u64)
3027            .await;
3028        self.complete_pending_response(from_peer, &hash, hash_key, res.d)
3029            .await;
3030    }
3031
3032    async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
3033        let hash = match crate::protocol::bytes_to_hash(&req.h) {
3034            Some(h) => h,
3035            None => return,
3036        };
3037        let hash_key = hash_to_key(&hash);
3038
3039        {
3040            let selector = self.peer_selector.read().await;
3041            if self.should_refuse_requests_from_peer(&selector, from_peer) {
3042                if self.debug {
3043                    println!(
3044                        "[MeshStoreCore] Refusing quote request from delinquent peer {}",
3045                        from_peer
3046                    );
3047                }
3048                return;
3049            }
3050        }
3051
3052        let chosen_mint = self.choose_quote_mint(req.m.as_deref());
3053        let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
3054            && !self.should_drop_response(&hash)
3055            && !self.should_corrupt_response(&hash);
3056
3057        let res = if can_serve {
3058            let quote_id = self
3059                .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
3060                .await;
3061            create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
3062        } else {
3063            create_quote_response_unavailable(&hash)
3064        };
3065        let response_bytes = encode_quote_response(&res);
3066        if let Some(channel) = self.signaling.get_channel(from_peer).await {
3067            if channel.send(response_bytes.clone()).await.is_ok() {
3068                self.record_peer_wire_sent(from_peer, response_bytes.len() as u64)
3069                    .await;
3070            }
3071        }
3072    }
3073
3074    async fn handle_request_message(
3075        self: &Arc<Self>,
3076        from_peer: &str,
3077        req: crate::protocol::DataRequest,
3078    ) {
3079        let hash = match crate::protocol::bytes_to_hash(&req.h) {
3080            Some(h) => h,
3081            None => return,
3082        };
3083        let hash_key = hash_to_key(&hash);
3084
3085        if let Some(quote_id) = req.q {
3086            if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
3087                if self.debug {
3088                    println!(
3089                        "[MeshStoreCore] Refusing request with invalid or expired quote {} from {}",
3090                        quote_id, from_peer
3091                    );
3092                }
3093                return;
3094            }
3095        }
3096
3097        let allow_peer_forwarding = {
3098            let selector = self.peer_selector.read().await;
3099            !self.should_refuse_requests_from_peer(&selector, from_peer)
3100        };
3101
3102        // Check local store
3103        if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
3104            if self.should_drop_response(&hash) {
3105                if self.debug {
3106                    println!(
3107                        "[MeshStoreCore] Dropping response for {} due to actor profile",
3108                        hash_to_key(&hash)
3109                    );
3110                }
3111                return;
3112            }
3113
3114            let response_delay = self.response_send_delay(&hash, data.len());
3115            if self.should_corrupt_response(&hash) {
3116                if data.is_empty() {
3117                    data.push(0x80);
3118                } else {
3119                    data[0] ^= 0x80;
3120                }
3121            }
3122
3123            // Send response
3124            let res = create_response(&hash, data);
3125            let response_bytes = encode_response(&res);
3126            let ready_at = Instant::now() + response_delay;
3127            Arc::clone(self)
3128                .enqueue_response_send(from_peer.to_string(), response_bytes, ready_at)
3129                .await;
3130            return;
3131        }
3132
3133        if self.pending_requests.read().await.contains_key(&hash_key) {
3134            let _ = self.begin_forward_request(&hash_key, from_peer).await;
3135            return;
3136        }
3137
3138        if self.was_recent_forward_miss(&hash_key).await {
3139            if self.debug {
3140                println!(
3141                    "[MeshStoreCore] Suppressing recently missed forwarded request for {}",
3142                    hash_key
3143                );
3144            }
3145            return;
3146        }
3147
3148        if !self.begin_forward_request(&hash_key, from_peer).await {
3149            return;
3150        }
3151
3152        let from_peer = from_peer.to_string();
3153        let this = Arc::clone(self);
3154        let request_htl = req.htl;
3155        tokio::spawn(async move {
3156            let result = if allow_peer_forwarding {
3157                let context = MeshReadContext {
3158                    exclude_peer_id: Some(from_peer.clone()),
3159                    request_htl,
3160                };
3161                this.request_from_mesh_with_context(&hash, &context).await
3162            } else {
3163                if this.debug {
3164                    println!(
3165                        "[MeshStoreCore] Serving request from delinquent peer {} via read sources only",
3166                        from_peer
3167                    );
3168                }
3169                match this.request_from_read_sources(&hash).await {
3170                    RouteFetchOutcome::Hit(data) => Some(data),
3171                    RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
3172                }
3173            };
3174            let requester_ids = this.take_forward_requesters(&hash_key).await;
3175            if let Some(data) = result {
3176                let ready_at = Instant::now() + this.response_send_delay(&hash, data.len());
3177                let res = create_response(&hash, data);
3178                let response_bytes = encode_response(&res);
3179                for requester_id in requester_ids {
3180                    Arc::clone(&this)
3181                        .enqueue_response_send(requester_id, response_bytes.clone(), ready_at)
3182                        .await;
3183                }
3184            } else {
3185                this.mark_recent_forward_miss(&hash_key).await;
3186            }
3187        });
3188    }
3189
3190    async fn handle_pubsub_interest_message(
3191        self: &Arc<Self>,
3192        from_peer: &str,
3193        mut interest: PubsubInterest,
3194    ) {
3195        if !self.apply_pubsub_interest_route(from_peer, &interest).await {
3196            return;
3197        }
3198
3199        if interest.htl <= 1 {
3200            return;
3201        }
3202        interest.htl = interest.htl.saturating_sub(1);
3203        let _ = self
3204            .send_pubsub_interest_to_peers(&interest, Some(from_peer))
3205            .await;
3206    }
3207
3208    async fn handle_pubsub_frame_message(
3209        self: &Arc<Self>,
3210        from_peer: &str,
3211        mut frame: PubsubFrame,
3212        wire_bytes: usize,
3213    ) {
3214        if frame.stream_id.is_empty() || frame.origin_peer_id.is_empty() {
3215            return;
3216        }
3217        if frame.origin_peer_id == self.signaling.peer_id() {
3218            return;
3219        }
3220
3221        let frame_key = Self::pubsub_frame_key(&frame);
3222        if !self
3223            .pubsub_seen_frames
3224            .lock()
3225            .await
3226            .insert_if_new(frame_key.clone())
3227        {
3228            return;
3229        }
3230        self.cache_pubsub_frame(frame_key.clone(), frame.clone())
3231            .await;
3232
3233        let local_interested = self
3234            .pubsub_local_interests
3235            .read()
3236            .await
3237            .contains(&frame.stream_id);
3238        let mut downstream_peers = if frame.htl > 1 {
3239            match self.routing.pubsub_delivery_mode {
3240                PubsubDeliveryMode::InterestPush => {
3241                    let mut peers = self
3242                        .interested_pubsub_peers(&frame.stream_id, Some(from_peer))
3243                        .await;
3244                    peers.extend(
3245                        self.take_pubsub_want_peers(&frame_key, Some(from_peer))
3246                            .await,
3247                    );
3248                    peers.sort();
3249                    peers.dedup();
3250                    peers
3251                }
3252                PubsubDeliveryMode::HtlInvWant => {
3253                    self.take_pubsub_want_peers(&frame_key, Some(from_peer))
3254                        .await
3255                }
3256            }
3257        } else {
3258            Vec::new()
3259        };
3260        downstream_peers.retain(|peer_id| peer_id != from_peer);
3261
3262        if local_interested || !downstream_peers.is_empty() {
3263            self.record_useful_bytes_received_from_peer(from_peer, wire_bytes as u64)
3264                .await;
3265        }
3266
3267        if local_interested {
3268            self.enqueue_pubsub_event(PubsubEvent {
3269                stream_id: frame.stream_id.clone(),
3270                seq: frame.seq,
3271                origin_peer_id: frame.origin_peer_id.clone(),
3272                from_peer_id: from_peer.to_string(),
3273                payload: frame.payload.clone(),
3274            })
3275            .await;
3276        }
3277
3278        if downstream_peers.is_empty() {
3279            return;
3280        }
3281
3282        frame.htl = frame.htl.saturating_sub(1);
3283        let _ = self
3284            .send_pubsub_frame_to_peers(&frame, &downstream_peers)
3285            .await;
3286    }
3287
3288    async fn handle_pubsub_inventory_message(
3289        self: &Arc<Self>,
3290        from_peer: &str,
3291        inv: PubsubInventory,
3292        wire_bytes: usize,
3293    ) {
3294        if inv.stream_id.is_empty() || inv.origin_peer_id.is_empty() {
3295            return;
3296        }
3297        if inv.origin_peer_id == self.signaling.peer_id() {
3298            return;
3299        }
3300
3301        let key = Self::pubsub_key(&inv.origin_peer_id, &inv.stream_id, inv.seq);
3302        if !self
3303            .pubsub_seen_inventories
3304            .lock()
3305            .await
3306            .insert_if_new(key.clone())
3307        {
3308            return;
3309        }
3310        {
3311            let mut routes = self.pubsub_inventory_routes.write().await;
3312            routes
3313                .entry(key.clone())
3314                .or_insert_with(|| from_peer.to_string());
3315        }
3316
3317        let local_interested = self
3318            .pubsub_local_interests
3319            .read()
3320            .await
3321            .contains(&inv.stream_id);
3322        let downstream_peers = self
3323            .interested_pubsub_peers(&inv.stream_id, Some(from_peer))
3324            .await;
3325        if local_interested || !downstream_peers.is_empty() {
3326            self.record_useful_bytes_received_from_peer(from_peer, wire_bytes as u64)
3327                .await;
3328            let want =
3329                create_pubsub_want(inv.stream_id.clone(), inv.seq, inv.origin_peer_id.clone());
3330            let _ = self.send_pubsub_want_upstream(&key, &want, None).await;
3331        }
3332
3333        if !should_forward_htl(inv.htl) {
3334            return;
3335        }
3336        let _ = self.flood_pubsub_inventory(&inv, Some(from_peer)).await;
3337    }
3338
3339    async fn handle_pubsub_want_message(
3340        self: &Arc<Self>,
3341        from_peer: &str,
3342        want: PubsubWant,
3343        wire_bytes: usize,
3344    ) {
3345        if want.stream_id.is_empty() || want.origin_peer_id.is_empty() {
3346            return;
3347        }
3348        if want.origin_peer_id == from_peer {
3349            return;
3350        }
3351
3352        let key = Self::pubsub_key(&want.origin_peer_id, &want.stream_id, want.seq);
3353        let want_key = format!("{from_peer}:{key}");
3354        if !self.pubsub_seen_wants.lock().await.insert_if_new(want_key) {
3355            return;
3356        }
3357
3358        if let Some(frame) = self.cached_pubsub_frame(&key).await {
3359            self.record_useful_bytes_received_from_peer(from_peer, wire_bytes as u64)
3360                .await;
3361            let peers = vec![from_peer.to_string()];
3362            let _ = self.send_pubsub_frame_to_peers(&frame, &peers).await;
3363            return;
3364        }
3365
3366        let has_upstream_route = self.pubsub_inventory_routes.read().await.contains_key(&key);
3367        if !has_upstream_route {
3368            return;
3369        }
3370
3371        if self.remember_pubsub_want_peer(key.clone(), from_peer).await {
3372            self.record_useful_bytes_received_from_peer(from_peer, wire_bytes as u64)
3373                .await;
3374        }
3375        let _ = self
3376            .send_pubsub_want_upstream(&key, &want, Some(from_peer))
3377            .await;
3378    }
3379
3380    /// Handle incoming data message
3381    pub async fn handle_data_message(self: &Arc<Self>, from_peer: &str, data: &[u8]) {
3382        self.record_peer_wire_received(from_peer, data.len() as u64)
3383            .await;
3384        let parsed = match parse_message(data) {
3385            Some(m) => m,
3386            None => return,
3387        };
3388
3389        match parsed {
3390            DataMessage::Request(req) => {
3391                self.handle_request_message(from_peer, req).await;
3392            }
3393            DataMessage::Response(res) => {
3394                self.handle_response_message(from_peer, res).await;
3395            }
3396            DataMessage::QuoteRequest(req) => {
3397                self.handle_quote_request_message(from_peer, req).await;
3398            }
3399            DataMessage::QuoteResponse(res) => {
3400                self.handle_quote_response_message(from_peer, res).await;
3401            }
3402            DataMessage::PubsubInterest(interest) => {
3403                self.handle_pubsub_interest_message(from_peer, interest)
3404                    .await;
3405            }
3406            DataMessage::PubsubFrame(frame) => {
3407                self.handle_pubsub_frame_message(from_peer, frame, data.len())
3408                    .await;
3409            }
3410            DataMessage::PubsubInventory(inv) => {
3411                self.handle_pubsub_inventory_message(from_peer, inv, data.len())
3412                    .await;
3413            }
3414            DataMessage::PubsubWant(want) => {
3415                self.handle_pubsub_want_message(from_peer, want, data.len())
3416                    .await;
3417            }
3418            DataMessage::Payment(_)
3419            | DataMessage::PaymentAck(_)
3420            | DataMessage::Chunk(_)
3421            | DataMessage::PeerHints(_) => {}
3422        }
3423    }
3424}
3425
3426#[async_trait]
3427impl<S, R, F> Store for MeshStoreCore<S, R, F>
3428where
3429    S: Store + Send + Sync + 'static,
3430    R: SignalingTransport + Send + Sync + 'static,
3431    F: PeerLinkFactory + Send + Sync + 'static,
3432{
3433    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
3434        self.local_store.put(hash, data).await
3435    }
3436
3437    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
3438        // Try local first
3439        if let Some(data) = self.local_store.get(hash).await? {
3440            return Ok(Some(data));
3441        }
3442
3443        // Try peers
3444        Ok(self.request_from_mesh(hash).await)
3445    }
3446
3447    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
3448        self.local_store.has(hash).await
3449    }
3450
3451    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
3452        self.local_store.delete(hash).await
3453    }
3454}
3455
3456#[cfg(test)]
3457mod tests;
3458
3459/// Type alias for simulation store.
3460pub type SimMeshStore<S> =
3461    MeshStoreCore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
3462
3463/// Type alias for the default production core (Nostr signaling + WebRTC links).
3464pub type ProductionMeshStore<S> =
3465    MeshStoreCore<S, crate::nostr::NostrRelayTransport, crate::real_factory::WebRtcPeerLinkFactory>;