Skip to main content

hashtree_network/
mesh_store_core.rs

1//! Shared routed mesh store core.
2//!
3//! This module provides a concrete store wrapper that works with any local storage
4//! backend plus any signaling transport and peer-link factory. Both production
5//! (Nostr websockets + WebRTC) and simulation (mocks) use this same code.
6
7use async_trait::async_trait;
8use std::collections::hash_map::DefaultHasher;
9use std::collections::{HashMap, HashSet};
10use std::future::Future;
11use std::hash::{Hash as _, Hasher};
12use std::ops::Range;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use tokio::sync::{oneshot, Mutex, RwLock};
16
17use hashtree_core::{Hash, Store, StoreError};
18
19use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
20use crate::protocol::{
21    create_quote_request, create_quote_response_available, create_quote_response_unavailable,
22    create_request, create_request_with_quote, create_response, encode_quote_request,
23    encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
24    DataMessage, DataQuoteRequest, DataQuoteResponse,
25};
26use crate::signaling::MeshRouter;
27use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
28use crate::types::{PeerHTLConfig, SignalingMessage, MAX_HTL};
29
30// Keep the on-disk namespace stable across the crate rename so existing peer
31// metadata does not disappear for users upgrading from the old package name.
32const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
33
34/// Pending request awaiting response
35struct PendingRequest {
36    response_tx: oneshot::Sender<Option<Vec<u8>>>,
37    started_at: Instant,
38    queried_peers: Vec<String>,
39}
40
41struct PendingQuoteRequest {
42    response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
43    preferred_mint_url: Option<String>,
44    offered_payment_sat: u64,
45}
46
47#[derive(Debug, Clone)]
48struct NegotiatedQuote {
49    peer_id: String,
50    quote_id: u64,
51    #[allow(dead_code)]
52    mint_url: Option<String>,
53}
54
55struct IssuedQuote {
56    expires_at: Instant,
57    #[allow(dead_code)]
58    payment_sat: u64,
59    #[allow(dead_code)]
60    mint_url: Option<String>,
61}
62
63/// Aggregate stats from draining currently available peer-link messages.
64#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
65pub struct DataPumpStats {
66    pub processed: usize,
67    pub request_messages: usize,
68    pub response_messages: usize,
69    pub quote_request_messages: u64,
70    pub quote_response_messages: u64,
71    pub processed_bytes: u64,
72}
73
74/// Request dispatch strategy for peer queries.
75///
76/// `MeshStoreCore` supports two practical retrieval modes:
77/// - Flood (`usize::MAX` fanout): maximize success/latency at bandwidth cost.
78/// - Staged hedging: probe a subset first, then expand.
79#[derive(Debug, Clone, Copy)]
80pub struct RequestDispatchConfig {
81    /// Number of peers queried immediately.
82    pub initial_fanout: usize,
83    /// Number of additional peers to query on each hedge step.
84    pub hedge_fanout: usize,
85    /// Total peers allowed for this request.
86    pub max_fanout: usize,
87    /// Delay between hedge waves (ms). `0` means send all waves immediately.
88    pub hedge_interval_ms: u64,
89}
90
91impl Default for RequestDispatchConfig {
92    fn default() -> Self {
93        Self {
94            initial_fanout: usize::MAX,
95            hedge_fanout: usize::MAX,
96            max_fanout: usize::MAX,
97            hedge_interval_ms: 0,
98        }
99    }
100}
101
102/// Normalize fanout config against current peer availability.
103pub fn normalize_dispatch_config(
104    dispatch: RequestDispatchConfig,
105    available_peers: usize,
106) -> RequestDispatchConfig {
107    let mut cfg = dispatch;
108    let cap = if cfg.max_fanout == 0 {
109        available_peers
110    } else {
111        cfg.max_fanout.min(available_peers)
112    };
113    cfg.max_fanout = cap;
114    cfg.initial_fanout = if cfg.initial_fanout == 0 {
115        1
116    } else {
117        cfg.initial_fanout.min(cap.max(1))
118    };
119    cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
120        1
121    } else {
122        cfg.hedge_fanout.min(cap.max(1))
123    };
124    cfg
125}
126
127/// Build wave sizes for staged hedged dispatch.
128pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
129    if peer_count == 0 {
130        return Vec::new();
131    }
132    let cap = dispatch.max_fanout.min(peer_count);
133    if cap == 0 {
134        return Vec::new();
135    }
136
137    let mut plan = Vec::new();
138    let mut sent = 0usize;
139    let first = dispatch.initial_fanout.min(cap).max(1);
140    plan.push(first);
141    sent += first;
142
143    while sent < cap {
144        let next = dispatch.hedge_fanout.min(cap - sent).max(1);
145        plan.push(next);
146        sent += next;
147    }
148    plan
149}
150
151/// Outcome returned after waiting on a hedged dispatch wave.
152#[derive(Debug)]
153pub enum HedgedWaveAction<T> {
154    Continue,
155    Success(T),
156    Abort,
157}
158
159/// Run a staged hedged dispatch over peer index ranges.
160///
161/// This scheduler is shared by the reusable `MeshStoreCore` and the native
162/// `hashtree-cli` mesh path so tests and production use the same wave timing.
163pub async fn run_hedged_waves<T, SendWave, SendWaveFut, WaitWave, WaitWaveFut>(
164    peer_count: usize,
165    dispatch: RequestDispatchConfig,
166    request_timeout: Duration,
167    mut send_wave: SendWave,
168    mut wait_wave: WaitWave,
169) -> Option<T>
170where
171    SendWave: FnMut(Range<usize>) -> SendWaveFut,
172    SendWaveFut: Future<Output = usize>,
173    WaitWave: FnMut(Duration) -> WaitWaveFut,
174    WaitWaveFut: Future<Output = HedgedWaveAction<T>>,
175{
176    let dispatch = normalize_dispatch_config(dispatch, peer_count);
177    let wave_plan = build_hedged_wave_plan(peer_count, dispatch);
178    if wave_plan.is_empty() {
179        return None;
180    }
181
182    let deadline = Instant::now() + request_timeout;
183    let mut sent_total = 0usize;
184    let mut next_peer_idx = 0usize;
185
186    for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
187        let from = next_peer_idx;
188        let to = (next_peer_idx + wave_size).min(peer_count);
189        next_peer_idx = to;
190
191        if from == to {
192            continue;
193        }
194
195        sent_total += send_wave(from..to).await;
196        if sent_total == 0 {
197            if next_peer_idx >= peer_count {
198                break;
199            }
200            continue;
201        }
202
203        let now = Instant::now();
204        if now >= deadline {
205            break;
206        }
207        let remaining = deadline.saturating_duration_since(now);
208        let is_last_wave = wave_idx + 1 == wave_plan.len() || next_peer_idx >= peer_count;
209        let wait = if is_last_wave {
210            remaining
211        } else if dispatch.hedge_interval_ms == 0 {
212            Duration::ZERO
213        } else {
214            Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
215        };
216
217        if wait.is_zero() {
218            continue;
219        }
220
221        match wait_wave(wait).await {
222            HedgedWaveAction::Continue => {}
223            HedgedWaveAction::Success(value) => return Some(value),
224            HedgedWaveAction::Abort => break,
225        }
226    }
227
228    None
229}
230
231/// Keep selector membership aligned with currently connected peer IDs.
232pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
233    let mut selector = selector.write().await;
234    let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
235    let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
236    for peer_id in known {
237        if !current.contains(peer_id.as_str()) {
238            selector.remove_peer(&peer_id);
239        }
240    }
241    for peer_id in current_peer_ids {
242        selector.add_peer(peer_id.clone());
243    }
244}
245
246/// Response behavior profile for simulation/game-theory actors.
247///
248/// Defaults to honest behavior (always respond correctly, no extra delay).
249#[derive(Debug, Clone, Copy)]
250pub struct ResponseBehaviorConfig {
251    /// Probability that a node drops a response even when it has data.
252    pub drop_response_prob: f64,
253    /// Probability that a node responds with corrupted payload.
254    pub corrupt_response_prob: f64,
255    /// Optional response delay to model slow/incompetent peers.
256    pub extra_delay_ms: u64,
257}
258
259impl Default for ResponseBehaviorConfig {
260    fn default() -> Self {
261        Self {
262            drop_response_prob: 0.0,
263            corrupt_response_prob: 0.0,
264            extra_delay_ms: 0,
265        }
266    }
267}
268
269impl ResponseBehaviorConfig {
270    fn normalized(self) -> Self {
271        Self {
272            drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
273            corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
274            extra_delay_ms: self.extra_delay_ms,
275        }
276    }
277}
278
279/// Routing policy for request ordering + dispatch fanout.
280#[derive(Debug, Clone)]
281pub struct MeshRoutingConfig {
282    pub selection_strategy: SelectionStrategy,
283    pub fairness_enabled: bool,
284    /// Blend weight for payment-priority ranking in selector (`0.0` disables).
285    pub cashu_payment_weight: f64,
286    /// Refuse serving peers that have reached this many unpaid post-delivery settlements.
287    /// `0` disables refusal and only keeps metadata/downranking.
288    pub cashu_payment_default_block_threshold: u64,
289    /// Cashu mint URLs this node is willing to use for settlement.
290    pub cashu_accepted_mints: Vec<String>,
291    /// Preferred Cashu mint URL when initiating paid retrieval.
292    pub cashu_default_mint: Option<String>,
293    /// Baseline cap for accepting a peer-suggested mint outside the trusted set.
294    pub cashu_peer_suggested_mint_base_cap_sat: u64,
295    /// Additional sats allowed per successful delivery from that peer.
296    pub cashu_peer_suggested_mint_success_step_sat: u64,
297    /// Additional sats allowed per successful post-delivery payment received from that peer.
298    pub cashu_peer_suggested_mint_receipt_step_sat: u64,
299    /// Hard upper bound for any single peer-suggested mint quote we accept.
300    pub cashu_peer_suggested_mint_max_cap_sat: u64,
301    pub dispatch: RequestDispatchConfig,
302    pub response_behavior: ResponseBehaviorConfig,
303}
304
305impl Default for MeshRoutingConfig {
306    fn default() -> Self {
307        Self {
308            selection_strategy: SelectionStrategy::Weighted,
309            fairness_enabled: true,
310            cashu_payment_weight: 0.0,
311            cashu_payment_default_block_threshold: 0,
312            cashu_accepted_mints: Vec::new(),
313            cashu_default_mint: None,
314            cashu_peer_suggested_mint_base_cap_sat: 0,
315            cashu_peer_suggested_mint_success_step_sat: 0,
316            cashu_peer_suggested_mint_receipt_step_sat: 0,
317            cashu_peer_suggested_mint_max_cap_sat: 0,
318            dispatch: RequestDispatchConfig::default(),
319            response_behavior: ResponseBehaviorConfig::default(),
320        }
321    }
322}
323
324/// Routed mesh store core that works with any storage backend and transport
325/// implementation.
326///
327/// This is the shared code between production and simulation.
328/// - Production: `MeshStoreCore<LmdbStore, NostrRelayTransport, WebRtcPeerLinkFactory>`
329/// - Simulation: `MeshStoreCore<MemoryStore, MockRelayTransport, MockConnectionFactory>`
330pub struct MeshStoreCore<S, R, F>
331where
332    S: Store + Send + Sync + 'static,
333    R: SignalingTransport + Send + Sync + 'static,
334    F: PeerLinkFactory + Send + Sync + 'static,
335{
336    /// Local backing store
337    local_store: Arc<S>,
338    /// Mesh router (handles peer discovery and connection)
339    signaling: Arc<MeshRouter<R, F>>,
340    /// Per-peer HTL config
341    htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
342    /// Pending requests we sent
343    pending_requests: RwLock<HashMap<String, PendingRequest>>,
344    /// Pending quote negotiations keyed by requested hash.
345    pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
346    /// Quotes we issued to peers and will accept exactly once until expiry.
347    issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
348    /// Monotonic quote identifier generator.
349    next_quote_id: RwLock<u64>,
350    /// Adaptive selector for peer ordering.
351    peer_selector: RwLock<PeerSelector>,
352    /// Routing/dispatch configuration.
353    routing: MeshRoutingConfig,
354    /// Request timeout
355    request_timeout: Duration,
356    /// Debug mode
357    debug: bool,
358    /// Running flag
359    running: RwLock<bool>,
360}
361
362impl<S, R, F> MeshStoreCore<S, R, F>
363where
364    S: Store + Send + Sync + 'static,
365    R: SignalingTransport + Send + Sync + 'static,
366    F: PeerLinkFactory + Send + Sync + 'static,
367{
368    /// Create a new routed mesh store core.
369    pub fn new(
370        local_store: Arc<S>,
371        signaling: Arc<MeshRouter<R, F>>,
372        request_timeout: Duration,
373        debug: bool,
374    ) -> Self {
375        Self::new_with_routing(
376            local_store,
377            signaling,
378            request_timeout,
379            debug,
380            Default::default(),
381        )
382    }
383
384    /// Create a new routed mesh store core with explicit routing configuration.
385    pub fn new_with_routing(
386        local_store: Arc<S>,
387        signaling: Arc<MeshRouter<R, F>>,
388        request_timeout: Duration,
389        debug: bool,
390        routing: MeshRoutingConfig,
391    ) -> Self {
392        let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
393        selector.set_fairness(routing.fairness_enabled);
394        selector.set_cashu_payment_weight(routing.cashu_payment_weight);
395        Self {
396            local_store,
397            signaling,
398            htl_configs: RwLock::new(HashMap::new()),
399            pending_requests: RwLock::new(HashMap::new()),
400            pending_quotes: RwLock::new(HashMap::new()),
401            issued_quotes: RwLock::new(HashMap::new()),
402            next_quote_id: RwLock::new(1),
403            peer_selector: RwLock::new(selector),
404            routing,
405            request_timeout,
406            debug,
407            running: RwLock::new(false),
408        }
409    }
410
411    /// Start the store (begin listening for messages)
412    pub async fn start(&self) -> Result<(), TransportError> {
413        *self.running.write().await = true;
414
415        // Send initial hello
416        self.signaling.send_hello(vec![]).await?;
417
418        Ok(())
419    }
420
421    /// Stop the store
422    pub async fn stop(&self) {
423        *self.running.write().await = false;
424    }
425
426    /// Process incoming signaling message
427    pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
428        // When a new peer connects, initialize their HTL config
429        let peer_id = msg.peer_id().to_string();
430        {
431            let mut configs = self.htl_configs.write().await;
432            if !configs.contains_key(&peer_id) {
433                configs.insert(peer_id.clone(), PeerHTLConfig::random());
434            }
435        }
436        self.peer_selector.write().await.add_peer(peer_id);
437
438        self.signaling.handle_message(msg).await
439    }
440
441    /// Get signaling manager reference
442    pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
443        &self.signaling
444    }
445
446    fn response_behavior(&self) -> ResponseBehaviorConfig {
447        self.routing.response_behavior.normalized()
448    }
449
450    fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
451        let mut hasher = DefaultHasher::new();
452        peer_id.hash(&mut hasher);
453        hash.hash(&mut hasher);
454        salt.hash(&mut hasher);
455        let v = hasher.finish();
456        (v as f64) / (u64::MAX as f64)
457    }
458
459    fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
460        Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
461    }
462
463    fn peer_metadata_pointer_slot_hash() -> Hash {
464        hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
465    }
466
467    fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
468        let bytes = hex::decode(hash_hex)
469            .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
470        if bytes.len() != 32 {
471            return Err(StoreError::Other(format!(
472                "Invalid hash length {}, expected 32 bytes",
473                bytes.len()
474            )));
475        }
476        let mut hash = [0u8; 32];
477        hash.copy_from_slice(&bytes);
478        Ok(hash)
479    }
480
481    fn should_drop_response(&self, hash: &Hash) -> bool {
482        let p = self.response_behavior().drop_response_prob;
483        if p <= 0.0 {
484            return false;
485        }
486        self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
487    }
488
489    fn should_corrupt_response(&self, hash: &Hash) -> bool {
490        let p = self.response_behavior().corrupt_response_prob;
491        if p <= 0.0 {
492            return false;
493        }
494        self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
495    }
496
497    async fn ordered_connected_peers(&self) -> Vec<String> {
498        let current_peer_ids = self.signaling.peer_ids().await;
499        if current_peer_ids.is_empty() {
500            return Vec::new();
501        }
502
503        sync_selector_peers(&self.peer_selector, &current_peer_ids).await;
504        let current_set: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
505        let mut ordered_peer_ids = self.peer_selector.write().await.select_peers();
506        ordered_peer_ids.retain(|peer_id| current_set.contains(peer_id.as_str()));
507        if ordered_peer_ids.is_empty() {
508            let mut fallback = current_peer_ids;
509            fallback.sort();
510            return fallback;
511        }
512        ordered_peer_ids
513    }
514
515    fn requested_quote_mint(&self) -> Option<&str> {
516        if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
517            if self.routing.cashu_accepted_mints.is_empty()
518                || self
519                    .routing
520                    .cashu_accepted_mints
521                    .iter()
522                    .any(|mint| mint == default_mint)
523            {
524                return Some(default_mint);
525            }
526        }
527
528        self.routing
529            .cashu_accepted_mints
530            .first()
531            .map(String::as_str)
532    }
533
534    fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
535        if let Some(requested_mint) = requested_mint {
536            if self.accepts_quote_mint(Some(requested_mint)) {
537                return Some(requested_mint.to_string());
538            }
539        }
540        if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
541            return Some(default_mint.clone());
542        }
543        if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
544            return Some(first_mint.clone());
545        }
546        requested_mint.map(str::to_string)
547    }
548
549    fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
550        if self.routing.cashu_accepted_mints.is_empty() {
551            return true;
552        }
553
554        let Some(mint_url) = mint_url else {
555            return false;
556        };
557        self.routing
558            .cashu_accepted_mints
559            .iter()
560            .any(|mint| mint == mint_url)
561    }
562
563    fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
564        let Some(mint_url) = mint_url else {
565            return self.routing.cashu_default_mint.is_none()
566                && self.routing.cashu_accepted_mints.is_empty();
567        };
568        self.routing.cashu_default_mint.as_deref() == Some(mint_url)
569            || self
570                .routing
571                .cashu_accepted_mints
572                .iter()
573                .any(|mint| mint == mint_url)
574    }
575
576    async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
577        let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
578        if base == 0 {
579            return 0;
580        }
581
582        let selector = self.peer_selector.read().await;
583        let Some(stats) = selector.get_stats(peer_id) else {
584            let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
585            return if max_cap > 0 { base.min(max_cap) } else { base };
586        };
587
588        if stats.cashu_payment_defaults > 0
589            && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
590        {
591            return 0;
592        }
593
594        let success_bonus = stats
595            .successes
596            .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
597        let receipt_bonus = stats
598            .cashu_payment_receipts
599            .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
600        let mut cap = base
601            .saturating_add(success_bonus)
602            .saturating_add(receipt_bonus);
603        let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
604        if max_cap > 0 {
605            cap = cap.min(max_cap);
606        }
607        cap
608    }
609
610    async fn should_accept_quote_response(
611        &self,
612        from_peer: &str,
613        preferred_mint_url: Option<&str>,
614        offered_payment_sat: u64,
615        res: &DataQuoteResponse,
616    ) -> bool {
617        let Some(payment_sat) = res.p else {
618            return false;
619        };
620        if payment_sat > offered_payment_sat {
621            return false;
622        }
623
624        let response_mint = res.m.as_deref();
625        if response_mint == preferred_mint_url {
626            return true;
627        }
628        if self.trusts_quote_mint(response_mint) {
629            return true;
630        }
631        if response_mint.is_none() {
632            return false;
633        }
634
635        payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
636    }
637
638    async fn issue_quote(
639        &self,
640        peer_id: &str,
641        hash_key: &str,
642        payment_sat: u64,
643        ttl_ms: u32,
644        mint_url: Option<&str>,
645    ) -> u64 {
646        let quote_id = {
647            let mut next = self.next_quote_id.write().await;
648            let quote_id = *next;
649            *next = next.saturating_add(1);
650            quote_id
651        };
652
653        let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
654        self.issued_quotes.write().await.insert(
655            (peer_id.to_string(), hash_key.to_string(), quote_id),
656            IssuedQuote {
657                expires_at,
658                payment_sat,
659                mint_url: mint_url.map(str::to_string),
660            },
661        );
662        quote_id
663    }
664
665    async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
666        let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
667        let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
668            return false;
669        };
670        quote.expires_at > Instant::now()
671    }
672
673    async fn send_request_to_peer(
674        &self,
675        peer_id: &str,
676        hash: &Hash,
677        quote_id: Option<u64>,
678    ) -> bool {
679        let channel = match self.signaling.get_channel(peer_id).await {
680            Some(c) => c,
681            None => return false,
682        };
683
684        let htl_config = {
685            let configs = self.htl_configs.read().await;
686            configs
687                .get(peer_id)
688                .cloned()
689                .unwrap_or_else(PeerHTLConfig::random)
690        };
691
692        let send_htl = htl_config.decrement(MAX_HTL);
693        let req = match quote_id {
694            Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
695            None => create_request(hash, send_htl),
696        };
697        let request_bytes = encode_request(&req);
698
699        {
700            let mut selector = self.peer_selector.write().await;
701            selector.record_request(peer_id, request_bytes.len() as u64);
702        }
703
704        match channel.send(request_bytes).await {
705            Ok(()) => true,
706            Err(_) => {
707                self.peer_selector.write().await.record_failure(peer_id);
708                false
709            }
710        }
711    }
712
713    async fn send_quote_request_to_peer(
714        &self,
715        peer_id: &str,
716        hash: &Hash,
717        payment_sat: u64,
718        ttl_ms: u32,
719        mint_url: Option<&str>,
720    ) -> bool {
721        let channel = match self.signaling.get_channel(peer_id).await {
722            Some(c) => c,
723            None => return false,
724        };
725
726        let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
727        let request_bytes = encode_quote_request(&req);
728
729        match channel.send(request_bytes).await {
730            Ok(()) => true,
731            Err(_) => false,
732        }
733    }
734
735    /// Get peer count
736    pub async fn peer_count(&self) -> usize {
737        self.signaling.peer_count().await
738    }
739
740    /// Check if we need more peers
741    pub async fn needs_peers(&self) -> bool {
742        self.signaling.needs_peers().await
743    }
744
745    /// Re-broadcast hello to refresh discovery as topology changes.
746    pub async fn send_hello(&self) -> Result<(), TransportError> {
747        self.signaling.send_hello(vec![]).await
748    }
749
750    /// Drain all currently available peer-link messages and handle them.
751    ///
752    /// This keeps the message pump logic shared between simulation and the
753    /// default production wrapper instead of duplicating per-channel loops.
754    pub async fn drain_available_data_messages(&self) -> DataPumpStats {
755        let mut stats = DataPumpStats::default();
756        let peer_ids = self.signaling.peer_ids().await;
757        for peer_id in peer_ids {
758            let Some(channel) = self.signaling.get_channel(&peer_id).await else {
759                continue;
760            };
761
762            while let Some(data) = channel.try_recv() {
763                stats.processed += 1;
764                stats.processed_bytes += data.len() as u64;
765                if let Some(msg) = parse_message(&data) {
766                    match msg {
767                        DataMessage::Request(_) => stats.request_messages += 1,
768                        DataMessage::Response(_) => stats.response_messages += 1,
769                        DataMessage::QuoteRequest(_) => stats.quote_request_messages += 1,
770                        DataMessage::QuoteResponse(_) => stats.quote_response_messages += 1,
771                    }
772                }
773                self.handle_data_message(&peer_id, &data).await;
774            }
775        }
776        stats
777    }
778
779    /// Apply an out-of-band payment credit to a peer's routing priority.
780    pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
781        self.peer_selector
782            .write()
783            .await
784            .record_cashu_payment(peer_id, amount_sat);
785    }
786
787    /// Record a post-delivery payment we received from a peer.
788    pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
789        self.peer_selector
790            .write()
791            .await
792            .record_cashu_receipt(peer_id, amount_sat);
793    }
794
795    /// Record that a peer failed to pay after we delivered successfully.
796    pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
797        self.peer_selector
798            .write()
799            .await
800            .record_cashu_payment_default(peer_id);
801    }
802
803    /// Snapshot routing/selection summary for inspection/debugging.
804    pub async fn selector_summary(&self) -> crate::peer_selector::SelectorSummary {
805        self.peer_selector.read().await.summary()
806    }
807
808    fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
809        selector.is_peer_blocked_for_payment_defaults(
810            peer_id,
811            self.routing.cashu_payment_default_block_threshold,
812        )
813    }
814
815    /// Export live peer metadata for inspection/debugging.
816    pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
817        self.peer_selector
818            .read()
819            .await
820            .export_peer_metadata_snapshot()
821    }
822
823    /// Snapshot current peer metadata and persist it into `local_store`.
824    ///
825    /// Uses content-addressed storage for the snapshot body and a reserved
826    /// mutable pointer slot for the "latest snapshot hash".
827    pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
828        let snapshot = self
829            .peer_selector
830            .read()
831            .await
832            .export_peer_metadata_snapshot();
833        let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
834            StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
835        })?;
836        let snapshot_hash = hashtree_core::sha256(&bytes);
837        let _ = self.local_store.put(snapshot_hash, bytes).await?;
838
839        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
840        let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
841        let _ = self.local_store.delete(&pointer_slot).await?;
842        let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
843
844        Ok(snapshot_hash)
845    }
846
847    /// Load persisted peer metadata from `local_store` if available.
848    pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
849        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
850        let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
851            return Ok(false);
852        };
853        let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
854            StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
855        })?;
856        let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
857
858        let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
859            return Ok(false);
860        };
861        let snapshot: PeerMetadataSnapshot =
862            serde_json::from_slice(&snapshot_bytes).map_err(|e| {
863                StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
864            })?;
865        self.peer_selector
866            .write()
867            .await
868            .import_peer_metadata_snapshot(&snapshot);
869        Ok(true)
870    }
871
872    /// Request data from peers after negotiating a paid quote.
873    ///
874    /// If quote negotiation fails or the quoted peer does not deliver, the store
875    /// falls back to the normal unpaid retrieval path to preserve liveness.
876    pub async fn get_with_quote(
877        &self,
878        hash: &Hash,
879        payment_sat: u64,
880        quote_ttl: Duration,
881    ) -> Result<Option<Vec<u8>>, StoreError> {
882        if let Some(data) = self.local_store.get(hash).await? {
883            return Ok(Some(data));
884        }
885        Ok(self
886            .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
887            .await)
888    }
889
890    async fn request_from_peers_with_quote(
891        &self,
892        hash: &Hash,
893        payment_sat: u64,
894        quote_ttl: Duration,
895    ) -> Option<Vec<u8>> {
896        let ordered_peer_ids = self.ordered_connected_peers().await;
897        if ordered_peer_ids.is_empty() {
898            return None;
899        }
900
901        if let Some(quote) = self
902            .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
903            .await
904        {
905            if let Some(data) = self
906                .request_from_single_peer(hash, &quote.peer_id, Some(quote.quote_id))
907                .await
908            {
909                return Some(data);
910            }
911        }
912
913        self.request_from_ordered_peers(hash, &ordered_peer_ids)
914            .await
915    }
916
917    async fn request_quote_from_peers(
918        &self,
919        hash: &Hash,
920        payment_sat: u64,
921        quote_ttl: Duration,
922        ordered_peer_ids: &[String],
923    ) -> Option<NegotiatedQuote> {
924        if ordered_peer_ids.is_empty() {
925            return None;
926        }
927        let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
928        if ttl_ms == 0 {
929            return None;
930        }
931        let requested_mint = self.requested_quote_mint().map(str::to_string);
932
933        let hash_key = hash_to_key(hash);
934        let (tx, rx) = oneshot::channel();
935        self.pending_quotes.write().await.insert(
936            hash_key.clone(),
937            PendingQuoteRequest {
938                response_tx: tx,
939                preferred_mint_url: requested_mint.clone(),
940                offered_payment_sat: payment_sat,
941            },
942        );
943
944        let rx = Arc::new(Mutex::new(rx));
945        let result = run_hedged_waves(
946            ordered_peer_ids.len(),
947            self.routing.dispatch,
948            self.request_timeout,
949            |range| {
950                let wave_peer_ids = ordered_peer_ids[range].to_vec();
951                let requested_mint = requested_mint.clone();
952                let hash = *hash;
953                async move {
954                    let mut sent = 0usize;
955                    for peer_id in wave_peer_ids {
956                        if self
957                            .send_quote_request_to_peer(
958                                &peer_id,
959                                &hash,
960                                payment_sat,
961                                ttl_ms,
962                                requested_mint.as_deref(),
963                            )
964                            .await
965                        {
966                            sent += 1;
967                        }
968                    }
969                    sent
970                }
971            },
972            |wait| {
973                let rx = rx.clone();
974                async move {
975                    let mut rx = rx.lock().await;
976                    match tokio::time::timeout(wait, &mut *rx).await {
977                        Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
978                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
979                        Err(_) => HedgedWaveAction::Continue,
980                    }
981                }
982            },
983        )
984        .await;
985        let _ = self.pending_quotes.write().await.remove(&hash_key);
986        result
987    }
988
989    async fn request_from_single_peer(
990        &self,
991        hash: &Hash,
992        peer_id: &str,
993        quote_id: Option<u64>,
994    ) -> Option<Vec<u8>> {
995        let hash_key = hash_to_key(hash);
996        let (tx, rx) = oneshot::channel();
997        self.pending_requests.write().await.insert(
998            hash_key.clone(),
999            PendingRequest {
1000                response_tx: tx,
1001                started_at: Instant::now(),
1002                queried_peers: vec![peer_id.to_string()],
1003            },
1004        );
1005
1006        let mut rx = rx;
1007        if !self.send_request_to_peer(peer_id, hash, quote_id).await {
1008            let _ = self.pending_requests.write().await.remove(&hash_key);
1009            return None;
1010        }
1011
1012        if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
1013            if hashtree_core::sha256(&data) == *hash {
1014                let _ = self.local_store.put(*hash, data.clone()).await;
1015                return Some(data);
1016            }
1017        }
1018
1019        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1020            for peer_id in pending.queried_peers {
1021                self.peer_selector.write().await.record_timeout(&peer_id);
1022            }
1023        }
1024        None
1025    }
1026
1027    async fn request_from_ordered_peers(
1028        &self,
1029        hash: &Hash,
1030        ordered_peer_ids: &[String],
1031    ) -> Option<Vec<u8>> {
1032        let hash_key = hash_to_key(hash);
1033        let (tx, rx) = oneshot::channel();
1034        self.pending_requests.write().await.insert(
1035            hash_key.clone(),
1036            PendingRequest {
1037                response_tx: tx,
1038                started_at: Instant::now(),
1039                queried_peers: Vec::new(),
1040            },
1041        );
1042
1043        let rx = Arc::new(Mutex::new(rx));
1044        let result = run_hedged_waves(
1045            ordered_peer_ids.len(),
1046            self.routing.dispatch,
1047            self.request_timeout,
1048            |range| {
1049                let wave_peer_ids = ordered_peer_ids[range].to_vec();
1050                let hash = *hash;
1051                let hash_key = hash_key.clone();
1052                async move {
1053                    let mut sent = 0usize;
1054                    for peer_id in wave_peer_ids {
1055                        if self.send_request_to_peer(&peer_id, &hash, None).await {
1056                            sent += 1;
1057                            if let Some(pending) =
1058                                self.pending_requests.write().await.get_mut(&hash_key)
1059                            {
1060                                pending.queried_peers.push(peer_id);
1061                            }
1062                        }
1063                    }
1064                    sent
1065                }
1066            },
1067            |wait| {
1068                let rx = rx.clone();
1069                async move {
1070                    let mut rx = rx.lock().await;
1071                    match tokio::time::timeout(wait, &mut *rx).await {
1072                        Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == *hash => {
1073                            HedgedWaveAction::Success(data)
1074                        }
1075                        Ok(Ok(Some(_))) => HedgedWaveAction::Continue,
1076                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1077                        Err(_) => HedgedWaveAction::Continue,
1078                    }
1079                }
1080            },
1081        )
1082        .await;
1083
1084        let Some(data) = result else {
1085            if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1086                for peer_id in pending.queried_peers {
1087                    self.peer_selector.write().await.record_timeout(&peer_id);
1088                }
1089            }
1090            return None;
1091        };
1092
1093        let _ = self.local_store.put(*hash, data.clone()).await;
1094        Some(data)
1095    }
1096
1097    /// Request data from peers
1098    async fn request_from_peers(&self, hash: &Hash) -> Option<Vec<u8>> {
1099        let ordered_peer_ids = self.ordered_connected_peers().await;
1100        if ordered_peer_ids.is_empty() {
1101            return None;
1102        }
1103        self.request_from_ordered_peers(hash, &ordered_peer_ids)
1104            .await
1105    }
1106
1107    async fn complete_pending_response(&self, from_peer: &str, hash_key: String, payload: Vec<u8>) {
1108        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1109            let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
1110            self.peer_selector.write().await.record_success(
1111                from_peer,
1112                rtt_ms,
1113                payload.len() as u64,
1114            );
1115            let _ = pending.response_tx.send(Some(payload));
1116        }
1117    }
1118
1119    async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
1120        if !res.a {
1121            return;
1122        }
1123
1124        let Some(quote_id) = res.q else {
1125            return;
1126        };
1127
1128        let hash_key = hash_to_key(&res.h);
1129        let (preferred_mint_url, offered_payment_sat) = {
1130            let pending_quotes = self.pending_quotes.read().await;
1131            let Some(pending) = pending_quotes.get(&hash_key) else {
1132                return;
1133            };
1134            (
1135                pending.preferred_mint_url.clone(),
1136                pending.offered_payment_sat,
1137            )
1138        };
1139        if !self
1140            .should_accept_quote_response(
1141                from_peer,
1142                preferred_mint_url.as_deref(),
1143                offered_payment_sat,
1144                &res,
1145            )
1146            .await
1147        {
1148            return;
1149        }
1150        let mut pending_quotes = self.pending_quotes.write().await;
1151        if let Some(pending) = pending_quotes.remove(&hash_key) {
1152            let _ = pending.response_tx.send(Some(NegotiatedQuote {
1153                peer_id: from_peer.to_string(),
1154                quote_id,
1155                mint_url: res.m,
1156            }));
1157        }
1158    }
1159
1160    async fn handle_response_message(&self, from_peer: &str, res: crate::protocol::DataResponse) {
1161        let hash_key = hash_to_key(&res.h);
1162        let hash = match crate::protocol::bytes_to_hash(&res.h) {
1163            Some(h) => h,
1164            None => return,
1165        };
1166
1167        // Ignore malformed/corrupt payload and keep waiting for a valid response.
1168        if hashtree_core::sha256(&res.d) != hash {
1169            self.peer_selector.write().await.record_failure(from_peer);
1170            if self.debug {
1171                println!("[MeshStoreCore] Ignoring invalid response payload for {hash_key}");
1172            }
1173            return;
1174        }
1175
1176        self.complete_pending_response(from_peer, hash_key, res.d)
1177            .await;
1178    }
1179
1180    async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
1181        let hash = match crate::protocol::bytes_to_hash(&req.h) {
1182            Some(h) => h,
1183            None => return,
1184        };
1185        let hash_key = hash_to_key(&hash);
1186
1187        {
1188            let selector = self.peer_selector.read().await;
1189            if self.should_refuse_requests_from_peer(&selector, from_peer) {
1190                if self.debug {
1191                    println!(
1192                        "[MeshStoreCore] Refusing quote request from delinquent peer {}",
1193                        from_peer
1194                    );
1195                }
1196                return;
1197            }
1198        }
1199
1200        let chosen_mint = self.choose_quote_mint(req.m.as_deref());
1201        let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
1202            && !self.should_drop_response(&hash)
1203            && !self.should_corrupt_response(&hash);
1204
1205        let res = if can_serve {
1206            let quote_id = self
1207                .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
1208                .await;
1209            create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
1210        } else {
1211            create_quote_response_unavailable(&hash)
1212        };
1213        let response_bytes = encode_quote_response(&res);
1214        if let Some(channel) = self.signaling.get_channel(from_peer).await {
1215            let _ = channel.send(response_bytes).await;
1216        }
1217    }
1218
1219    async fn handle_request_message(&self, from_peer: &str, req: crate::protocol::DataRequest) {
1220        let hash = match crate::protocol::bytes_to_hash(&req.h) {
1221            Some(h) => h,
1222            None => return,
1223        };
1224        let hash_key = hash_to_key(&hash);
1225
1226        {
1227            let selector = self.peer_selector.read().await;
1228            if self.should_refuse_requests_from_peer(&selector, from_peer) {
1229                if self.debug {
1230                    println!(
1231                        "[MeshStoreCore] Refusing request from delinquent peer {}",
1232                        from_peer
1233                    );
1234                }
1235                return;
1236            }
1237        }
1238
1239        if let Some(quote_id) = req.q {
1240            if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
1241                if self.debug {
1242                    println!(
1243                        "[MeshStoreCore] Refusing request with invalid or expired quote {} from {}",
1244                        quote_id, from_peer
1245                    );
1246                }
1247                return;
1248            }
1249        }
1250
1251        // Check local store
1252        if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
1253            if self.should_drop_response(&hash) {
1254                if self.debug {
1255                    println!(
1256                        "[MeshStoreCore] Dropping response for {} due to actor profile",
1257                        hash_to_key(&hash)
1258                    );
1259                }
1260                return;
1261            }
1262
1263            let behavior = self.response_behavior();
1264            if behavior.extra_delay_ms > 0 {
1265                tokio::time::sleep(Duration::from_millis(behavior.extra_delay_ms)).await;
1266            }
1267
1268            if self.should_corrupt_response(&hash) {
1269                if data.is_empty() {
1270                    data.push(0x80);
1271                } else {
1272                    data[0] ^= 0x80;
1273                }
1274            }
1275
1276            // Send response
1277            let res = create_response(&hash, data);
1278            let response_bytes = encode_response(&res);
1279            if let Some(channel) = self.signaling.get_channel(from_peer).await {
1280                let _ = channel.send(response_bytes).await;
1281            }
1282        }
1283        // For now, don't forward - keep it simple
1284    }
1285
1286    /// Handle incoming data message
1287    pub async fn handle_data_message(&self, from_peer: &str, data: &[u8]) {
1288        let parsed = match parse_message(data) {
1289            Some(m) => m,
1290            None => return,
1291        };
1292
1293        match parsed {
1294            DataMessage::Request(req) => {
1295                self.handle_request_message(from_peer, req).await;
1296            }
1297            DataMessage::Response(res) => {
1298                self.handle_response_message(from_peer, res).await;
1299            }
1300            DataMessage::QuoteRequest(req) => {
1301                self.handle_quote_request_message(from_peer, req).await;
1302            }
1303            DataMessage::QuoteResponse(res) => {
1304                self.handle_quote_response_message(from_peer, res).await;
1305            }
1306        }
1307    }
1308}
1309
1310#[async_trait]
1311impl<S, R, F> Store for MeshStoreCore<S, R, F>
1312where
1313    S: Store + Send + Sync + 'static,
1314    R: SignalingTransport + Send + Sync + 'static,
1315    F: PeerLinkFactory + Send + Sync + 'static,
1316{
1317    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
1318        self.local_store.put(hash, data).await
1319    }
1320
1321    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
1322        // Try local first
1323        if let Some(data) = self.local_store.get(hash).await? {
1324            return Ok(Some(data));
1325        }
1326
1327        // Try peers
1328        Ok(self.request_from_peers(hash).await)
1329    }
1330
1331    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
1332        self.local_store.has(hash).await
1333    }
1334
1335    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
1336        self.local_store.delete(hash).await
1337    }
1338}
1339
1340#[cfg(test)]
1341mod tests {
1342    use super::*;
1343    use hashtree_core::MemoryStore;
1344    use std::sync::Arc;
1345    use std::sync::OnceLock;
1346    use std::time::Duration;
1347
1348    type TestStore = MeshStoreCore<
1349        MemoryStore,
1350        crate::mock::MockRelayTransport,
1351        crate::mock::MockConnectionFactory,
1352    >;
1353
1354    struct TestNode {
1355        store: Arc<TestStore>,
1356        local_store: Arc<MemoryStore>,
1357        transport: Arc<crate::mock::MockRelayTransport>,
1358    }
1359
1360    fn mock_network_lock() -> &'static tokio::sync::Mutex<()> {
1361        static LOCK: OnceLock<tokio::sync::Mutex<()>> = OnceLock::new();
1362        LOCK.get_or_init(|| tokio::sync::Mutex::new(()))
1363    }
1364
1365    fn make_test_store(local_store: Arc<MemoryStore>, node_id: &str) -> TestStore {
1366        make_test_store_with_routing(local_store, node_id, MeshRoutingConfig::default())
1367    }
1368
1369    fn make_test_store_with_routing(
1370        local_store: Arc<MemoryStore>,
1371        node_id: &str,
1372        routing: MeshRoutingConfig,
1373    ) -> TestStore {
1374        let relay = crate::mock::MockRelay::new();
1375        let transport = Arc::new(relay.create_transport(node_id.to_string()));
1376        let conn_factory = Arc::new(crate::mock::MockConnectionFactory::new(
1377            node_id.to_string(),
1378            0,
1379        ));
1380        let signaling = Arc::new(crate::signaling::MeshRouter::new(
1381            node_id.to_string(),
1382            transport,
1383            conn_factory,
1384            crate::types::PoolSettings::default(),
1385            false,
1386        ));
1387
1388        TestStore::new_with_routing(
1389            local_store,
1390            signaling,
1391            Duration::from_millis(200),
1392            false,
1393            routing,
1394        )
1395    }
1396
1397    fn make_shared_test_node(
1398        relay: Arc<crate::mock::MockRelay>,
1399        node_id: &str,
1400        routing: MeshRoutingConfig,
1401    ) -> TestNode {
1402        let transport = Arc::new(relay.create_transport(node_id.to_string()));
1403        let conn_factory = Arc::new(crate::mock::MockConnectionFactory::new(
1404            node_id.to_string(),
1405            0,
1406        ));
1407        let signaling = Arc::new(crate::signaling::MeshRouter::new(
1408            node_id.to_string(),
1409            transport.clone(),
1410            conn_factory,
1411            crate::types::PoolSettings::default(),
1412            false,
1413        ));
1414        let local_store = Arc::new(MemoryStore::new());
1415        let store = Arc::new(TestStore::new_with_routing(
1416            local_store.clone(),
1417            signaling,
1418            Duration::from_millis(120),
1419            false,
1420            routing,
1421        ));
1422
1423        TestNode {
1424            store,
1425            local_store,
1426            transport,
1427        }
1428    }
1429
1430    async fn pump_test_signaling(nodes: &[&TestNode]) -> usize {
1431        let mut processed = 0usize;
1432        for node in nodes {
1433            while let Some(msg) = node.transport.try_recv() {
1434                node.store
1435                    .process_signaling(msg)
1436                    .await
1437                    .expect("process signaling");
1438                processed += 1;
1439            }
1440        }
1441        processed
1442    }
1443
1444    async fn pump_test_data(nodes: &[&TestNode]) -> usize {
1445        let mut processed = 0usize;
1446        for node in nodes {
1447            let peer_ids = node.store.signaling().peer_ids().await;
1448            for peer_id in peer_ids {
1449                let Some(channel) = node.store.signaling().get_channel(&peer_id).await else {
1450                    continue;
1451                };
1452                while let Some(data) = channel.try_recv() {
1453                    node.store.handle_data_message(&peer_id, &data).await;
1454                    processed += 1;
1455                }
1456            }
1457        }
1458        processed
1459    }
1460
1461    async fn pump_test_network(nodes: &[&TestNode], max_steps: usize) {
1462        for _ in 0..max_steps {
1463            let signaling = pump_test_signaling(nodes).await;
1464            let data = pump_test_data(nodes).await;
1465            if signaling + data == 0 {
1466                tokio::task::yield_now().await;
1467            }
1468        }
1469    }
1470
1471    async fn run_get_with_pumps(
1472        requester: Arc<TestStore>,
1473        hash: Hash,
1474        nodes: &[&TestNode],
1475    ) -> Option<Vec<u8>> {
1476        let task = tokio::spawn(async move { requester.get(&hash).await.ok().flatten() });
1477        let started = Instant::now();
1478
1479        loop {
1480            if task.is_finished() {
1481                return task.await.expect("request task join");
1482            }
1483
1484            if started.elapsed() > Duration::from_secs(1) {
1485                task.abort();
1486                return None;
1487            }
1488
1489            pump_test_network(nodes, 4).await;
1490        }
1491    }
1492
1493    async fn run_bad_peer_series(strategy: SelectionStrategy) -> usize {
1494        let _guard = mock_network_lock().lock().await;
1495        crate::mock::clear_channel_registry().await;
1496
1497        let relay = crate::mock::MockRelay::new();
1498        let requester = make_shared_test_node(
1499            relay.clone(),
1500            "requester-reject",
1501            MeshRoutingConfig {
1502                selection_strategy: strategy,
1503                fairness_enabled: false,
1504                dispatch: RequestDispatchConfig {
1505                    initial_fanout: 1,
1506                    hedge_fanout: 1,
1507                    max_fanout: 1,
1508                    hedge_interval_ms: 5,
1509                },
1510                ..Default::default()
1511            },
1512        );
1513        let bad = make_shared_test_node(
1514            relay.clone(),
1515            "a-bad",
1516            MeshRoutingConfig {
1517                response_behavior: ResponseBehaviorConfig {
1518                    drop_response_prob: 1.0,
1519                    ..Default::default()
1520                },
1521                ..Default::default()
1522            },
1523        );
1524        let honest = make_shared_test_node(relay, "b-honest", MeshRoutingConfig::default());
1525        let nodes = [&requester, &bad, &honest];
1526
1527        for node in &nodes {
1528            node.transport
1529                .connect(&[])
1530                .await
1531                .expect("connect transport");
1532            node.store.start().await.expect("start store");
1533        }
1534        pump_test_network(&nodes, 24).await;
1535
1536        let mut successes = 0usize;
1537        for round in 0..6 {
1538            let payload = format!("payload-{round}").into_bytes();
1539            let hash = hashtree_core::sha256(&payload);
1540            let _ = bad.local_store.put(hash, payload.clone()).await;
1541            let _ = honest.local_store.put(hash, payload.clone()).await;
1542
1543            let result = run_get_with_pumps(requester.store.clone(), hash, &nodes).await;
1544            if result.as_ref() == Some(&payload) {
1545                successes += 1;
1546            }
1547        }
1548
1549        crate::mock::clear_channel_registry().await;
1550        successes
1551    }
1552
1553    #[test]
1554    fn test_hedged_wave_plan_flood_all() {
1555        let plan = build_hedged_wave_plan(7, RequestDispatchConfig::default());
1556        assert_eq!(plan, vec![7]);
1557    }
1558
1559    #[test]
1560    fn test_hedged_wave_plan_staged() {
1561        let plan = build_hedged_wave_plan(
1562            10,
1563            RequestDispatchConfig {
1564                initial_fanout: 2,
1565                hedge_fanout: 3,
1566                max_fanout: 8,
1567                hedge_interval_ms: 25,
1568            },
1569        );
1570        assert_eq!(plan, vec![2, 3, 3]);
1571    }
1572
1573    #[tokio::test]
1574    async fn test_run_hedged_waves_uses_zero_interval_as_immediate_hedge() {
1575        let waits = Arc::new(std::sync::Mutex::new(Vec::new()));
1576        let waits_clone = waits.clone();
1577
1578        let result = run_hedged_waves(
1579            3,
1580            RequestDispatchConfig {
1581                initial_fanout: 1,
1582                hedge_fanout: 1,
1583                max_fanout: 3,
1584                hedge_interval_ms: 0,
1585            },
1586            Duration::from_millis(25),
1587            |_range| async { 1 },
1588            move |wait| {
1589                let waits = waits_clone.clone();
1590                async move {
1591                    waits.lock().expect("wait lock").push(wait);
1592                    HedgedWaveAction::<()>::Continue
1593                }
1594            },
1595        )
1596        .await;
1597
1598        assert!(result.is_none());
1599        let waits = waits.lock().expect("wait lock");
1600        assert_eq!(waits.len(), 1);
1601        assert!(waits[0] <= Duration::from_millis(25));
1602    }
1603
1604    #[test]
1605    fn test_response_behavior_normalization_clamps_probs() {
1606        let raw = ResponseBehaviorConfig {
1607            drop_response_prob: -1.5,
1608            corrupt_response_prob: 9.0,
1609            extra_delay_ms: 12,
1610        };
1611        let normalized = raw.normalized();
1612        assert_eq!(normalized.drop_response_prob, 0.0);
1613        assert_eq!(normalized.corrupt_response_prob, 1.0);
1614        assert_eq!(normalized.extra_delay_ms, 12);
1615    }
1616
1617    #[test]
1618    fn test_actor_draw_is_deterministic_per_peer_hash_and_salt() {
1619        let hash = hashtree_core::sha256(b"deterministic");
1620        let a = TestStore::deterministic_actor_draw_for("peer-a", &hash, 7);
1621        let b = TestStore::deterministic_actor_draw_for("peer-a", &hash, 7);
1622        assert!((a - b).abs() < f64::EPSILON);
1623    }
1624
1625    #[tokio::test]
1626    async fn test_load_peer_metadata_returns_false_when_missing() {
1627        let local_store = Arc::new(MemoryStore::new());
1628        let store = make_test_store(local_store, "0");
1629        assert!(!store.load_peer_metadata().await.expect("load result"));
1630    }
1631
1632    #[tokio::test]
1633    async fn test_persist_and_load_peer_metadata_with_existing_store_adapter() {
1634        let local_store = Arc::new(MemoryStore::new());
1635        let writer = make_test_store(local_store.clone(), "0");
1636        {
1637            let mut selector = writer.peer_selector.write().await;
1638            selector.add_peer("npub1stable");
1639            selector.record_request("npub1stable", 64);
1640            selector.record_success("npub1stable", 35, 1024);
1641            selector.record_cashu_payment("npub1stable", 120);
1642            selector.record_cashu_receipt("npub1stable", 40);
1643            selector.record_cashu_payment_default("npub1stable");
1644        }
1645
1646        let snapshot_hash = writer
1647            .persist_peer_metadata()
1648            .await
1649            .expect("persist peer metadata");
1650        assert!(local_store
1651            .get(&snapshot_hash)
1652            .await
1653            .expect("snapshot lookup")
1654            .is_some());
1655
1656        let reader = make_test_store(local_store, "1");
1657        assert!(reader
1658            .load_peer_metadata()
1659            .await
1660            .expect("load peer metadata snapshot"));
1661
1662        let mut selector = reader.peer_selector.write().await;
1663        selector.add_peer("npub1stable");
1664        let stats = selector
1665            .get_stats("npub1stable")
1666            .expect("restored peer stats");
1667        assert_eq!(stats.requests_sent, 1);
1668        assert_eq!(stats.successes, 1);
1669        assert_eq!(stats.cashu_paid_sat, 120);
1670        assert_eq!(stats.cashu_received_sat, 40);
1671        assert_eq!(stats.cashu_payment_receipts, 1);
1672        assert_eq!(stats.cashu_payment_defaults, 1);
1673    }
1674
1675    #[tokio::test]
1676    async fn test_should_refuse_requests_from_peer_after_payment_defaults() {
1677        let local_store = Arc::new(MemoryStore::new());
1678        let store = make_test_store_with_routing(
1679            local_store,
1680            "0",
1681            MeshRoutingConfig {
1682                cashu_payment_default_block_threshold: 1,
1683                ..Default::default()
1684            },
1685        );
1686        store.record_cashu_payment_default_from_peer("peer-a").await;
1687
1688        let selector = store.peer_selector.read().await;
1689        assert!(store.should_refuse_requests_from_peer(&selector, "peer-a"));
1690        assert!(!store.should_refuse_requests_from_peer(&selector, "peer-b"));
1691    }
1692
1693    #[tokio::test]
1694    async fn test_take_valid_quote_consumes_once_and_rejects_expired_quotes() {
1695        let local_store = Arc::new(MemoryStore::new());
1696        let store = make_test_store(local_store, "0");
1697        let hash = hashtree_core::sha256(b"quote-test");
1698        let hash_key = hash_to_key(&hash);
1699
1700        {
1701            let mut issued = store.issued_quotes.write().await;
1702            issued.insert(
1703                ("peer-a".to_string(), hash_key.clone(), 11),
1704                IssuedQuote {
1705                    expires_at: Instant::now() + Duration::from_secs(1),
1706                    payment_sat: 5,
1707                    mint_url: Some("https://mint-a.example".to_string()),
1708                },
1709            );
1710            issued.insert(
1711                ("peer-a".to_string(), hash_key.clone(), 12),
1712                IssuedQuote {
1713                    expires_at: Instant::now() - Duration::from_millis(1),
1714                    payment_sat: 5,
1715                    mint_url: Some("https://mint-a.example".to_string()),
1716                },
1717            );
1718        }
1719
1720        assert!(store.take_valid_quote("peer-a", &hash_key, 11).await);
1721        assert!(!store.take_valid_quote("peer-a", &hash_key, 11).await);
1722        assert!(!store.take_valid_quote("peer-a", &hash_key, 12).await);
1723    }
1724
1725    async fn run_quote_with_pumps(
1726        requester: Arc<TestStore>,
1727        hash: Hash,
1728        payment_sat: u64,
1729        quote_ttl: Duration,
1730        peer_ids: Vec<String>,
1731        nodes: &[&TestNode],
1732    ) -> Option<NegotiatedQuote> {
1733        let task = tokio::spawn(async move {
1734            requester
1735                .request_quote_from_peers(&hash, payment_sat, quote_ttl, &peer_ids)
1736                .await
1737        });
1738        let started = Instant::now();
1739
1740        loop {
1741            if task.is_finished() {
1742                return task.await.expect("quote task join");
1743            }
1744            if started.elapsed() > Duration::from_secs(1) {
1745                task.abort();
1746                return None;
1747            }
1748            pump_test_network(nodes, 4).await;
1749        }
1750    }
1751
1752    #[tokio::test]
1753    async fn test_request_quote_from_peers_rejects_unaccepted_mint() {
1754        let _guard = mock_network_lock().lock().await;
1755        crate::mock::clear_channel_registry().await;
1756
1757        let relay = crate::mock::MockRelay::new();
1758        let requester = make_shared_test_node(
1759            relay.clone(),
1760            "requester-reject",
1761            MeshRoutingConfig {
1762                cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1763                cashu_default_mint: Some("https://mint-a.example".to_string()),
1764                dispatch: RequestDispatchConfig {
1765                    initial_fanout: 1,
1766                    hedge_fanout: 1,
1767                    max_fanout: 1,
1768                    hedge_interval_ms: 5,
1769                },
1770                ..Default::default()
1771            },
1772        );
1773        let provider = make_shared_test_node(
1774            relay,
1775            "provider-reject",
1776            MeshRoutingConfig {
1777                cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1778                ..Default::default()
1779            },
1780        );
1781        let nodes = [&requester, &provider];
1782
1783        requester.transport.connect(&[]).await.expect("connect");
1784        provider.transport.connect(&[]).await.expect("connect");
1785        requester.store.start().await.expect("start");
1786        provider.store.start().await.expect("start");
1787        pump_test_network(&nodes, 24).await;
1788
1789        let payload = b"quoted-data".to_vec();
1790        let hash = hashtree_core::sha256(&payload);
1791        provider.local_store.put(hash, payload).await.expect("put");
1792
1793        let quote = run_quote_with_pumps(
1794            requester.store.clone(),
1795            hash,
1796            9,
1797            Duration::from_millis(80),
1798            vec!["provider-reject".to_string()],
1799            &nodes,
1800        )
1801        .await;
1802        assert!(
1803            quote.is_none(),
1804            "expected quote to be rejected on mint mismatch"
1805        );
1806    }
1807
1808    #[tokio::test]
1809    async fn test_request_quote_from_peers_accepts_small_peer_suggested_mint_under_cap() {
1810        let _guard = mock_network_lock().lock().await;
1811        crate::mock::clear_channel_registry().await;
1812
1813        let relay = crate::mock::MockRelay::new();
1814        let requester = make_shared_test_node(
1815            relay.clone(),
1816            "requester-suggested",
1817            MeshRoutingConfig {
1818                cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1819                cashu_default_mint: Some("https://mint-a.example".to_string()),
1820                cashu_peer_suggested_mint_base_cap_sat: 3,
1821                cashu_peer_suggested_mint_max_cap_sat: 3,
1822                dispatch: RequestDispatchConfig {
1823                    initial_fanout: 1,
1824                    hedge_fanout: 1,
1825                    max_fanout: 1,
1826                    hedge_interval_ms: 5,
1827                },
1828                ..Default::default()
1829            },
1830        );
1831        let provider = make_shared_test_node(
1832            relay,
1833            "provider-suggested",
1834            MeshRoutingConfig {
1835                cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1836                cashu_default_mint: Some("https://mint-b.example".to_string()),
1837                ..Default::default()
1838            },
1839        );
1840        let nodes = [&requester, &provider];
1841
1842        requester.transport.connect(&[]).await.expect("connect");
1843        provider.transport.connect(&[]).await.expect("connect");
1844        requester.store.start().await.expect("start");
1845        provider.store.start().await.expect("start");
1846        pump_test_network(&nodes, 24).await;
1847
1848        let payload = b"quoted-data".to_vec();
1849        let hash = hashtree_core::sha256(&payload);
1850        provider.local_store.put(hash, payload).await.expect("put");
1851
1852        let quote = run_quote_with_pumps(
1853            requester.store.clone(),
1854            hash,
1855            3,
1856            Duration::from_millis(80),
1857            vec!["provider-suggested".to_string()],
1858            &nodes,
1859        )
1860        .await
1861        .expect("expected bounded peer-suggested mint quote");
1862        assert_eq!(quote.mint_url.as_deref(), Some("https://mint-b.example"));
1863    }
1864
1865    #[tokio::test]
1866    async fn test_request_quote_from_peers_scales_peer_suggested_mint_cap_with_reputation() {
1867        let _guard = mock_network_lock().lock().await;
1868        crate::mock::clear_channel_registry().await;
1869
1870        let relay = crate::mock::MockRelay::new();
1871        let requester = make_shared_test_node(
1872            relay.clone(),
1873            "requester-reputation",
1874            MeshRoutingConfig {
1875                cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1876                cashu_default_mint: Some("https://mint-a.example".to_string()),
1877                cashu_peer_suggested_mint_base_cap_sat: 1,
1878                cashu_peer_suggested_mint_success_step_sat: 1,
1879                cashu_peer_suggested_mint_receipt_step_sat: 2,
1880                cashu_peer_suggested_mint_max_cap_sat: 5,
1881                dispatch: RequestDispatchConfig {
1882                    initial_fanout: 1,
1883                    hedge_fanout: 1,
1884                    max_fanout: 1,
1885                    hedge_interval_ms: 5,
1886                },
1887                ..Default::default()
1888            },
1889        );
1890        let provider = make_shared_test_node(
1891            relay,
1892            "provider-reputation",
1893            MeshRoutingConfig {
1894                cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1895                cashu_default_mint: Some("https://mint-b.example".to_string()),
1896                ..Default::default()
1897            },
1898        );
1899        let nodes = [&requester, &provider];
1900
1901        requester.transport.connect(&[]).await.expect("connect");
1902        provider.transport.connect(&[]).await.expect("connect");
1903        requester.store.start().await.expect("start");
1904        provider.store.start().await.expect("start");
1905        pump_test_network(&nodes, 24).await;
1906
1907        let payload = b"quoted-data".to_vec();
1908        let hash = hashtree_core::sha256(&payload);
1909        provider.local_store.put(hash, payload).await.expect("put");
1910
1911        let quote = run_quote_with_pumps(
1912            requester.store.clone(),
1913            hash,
1914            4,
1915            Duration::from_millis(80),
1916            vec!["provider-reputation".to_string()],
1917            &nodes,
1918        )
1919        .await;
1920        assert!(
1921            quote.is_none(),
1922            "new peer should not get a 4 sat untrusted-mint quote"
1923        );
1924
1925        {
1926            let mut selector = requester.store.peer_selector.write().await;
1927            selector.add_peer("provider-reputation");
1928            selector.record_success("provider-reputation", 20, 1024);
1929            selector.record_cashu_receipt("provider-reputation", 2);
1930        }
1931
1932        let quote = run_quote_with_pumps(
1933            requester.store.clone(),
1934            hash,
1935            4,
1936            Duration::from_millis(80),
1937            vec!["provider-reputation".to_string()],
1938            &nodes,
1939        )
1940        .await
1941        .expect("reputable peer should get larger bounded quote");
1942        assert_eq!(quote.mint_url.as_deref(), Some("https://mint-b.example"));
1943
1944        requester
1945            .store
1946            .record_cashu_payment_default_from_peer("provider-reputation")
1947            .await;
1948        let quote = run_quote_with_pumps(
1949            requester.store.clone(),
1950            hash,
1951            4,
1952            Duration::from_millis(80),
1953            vec!["provider-reputation".to_string()],
1954            &nodes,
1955        )
1956        .await;
1957        assert!(
1958            quote.is_none(),
1959            "peer-suggested mint exposure should drop to zero after defaults exceed receipts"
1960        );
1961    }
1962
1963    #[tokio::test]
1964    async fn test_request_quote_from_peers_returns_matching_mint() {
1965        let _guard = mock_network_lock().lock().await;
1966        crate::mock::clear_channel_registry().await;
1967
1968        let relay = crate::mock::MockRelay::new();
1969        let requester = make_shared_test_node(
1970            relay.clone(),
1971            "requester-match",
1972            MeshRoutingConfig {
1973                cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1974                cashu_default_mint: Some("https://mint-a.example".to_string()),
1975                dispatch: RequestDispatchConfig {
1976                    initial_fanout: 1,
1977                    hedge_fanout: 1,
1978                    max_fanout: 1,
1979                    hedge_interval_ms: 5,
1980                },
1981                ..Default::default()
1982            },
1983        );
1984        let provider = make_shared_test_node(
1985            relay,
1986            "provider-match",
1987            MeshRoutingConfig {
1988                cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1989                ..Default::default()
1990            },
1991        );
1992        let nodes = [&requester, &provider];
1993
1994        requester.transport.connect(&[]).await.expect("connect");
1995        provider.transport.connect(&[]).await.expect("connect");
1996        requester.store.start().await.expect("start");
1997        provider.store.start().await.expect("start");
1998        pump_test_network(&nodes, 24).await;
1999
2000        let payload = b"quoted-data".to_vec();
2001        let hash = hashtree_core::sha256(&payload);
2002        provider.local_store.put(hash, payload).await.expect("put");
2003
2004        let quote = run_quote_with_pumps(
2005            requester.store.clone(),
2006            hash,
2007            9,
2008            Duration::from_millis(80),
2009            vec!["provider-match".to_string()],
2010            &nodes,
2011        )
2012        .await
2013        .expect("expected quote");
2014        assert_eq!(quote.mint_url.as_deref(), Some("https://mint-a.example"));
2015    }
2016
2017    #[tokio::test]
2018    async fn test_tit_for_tat_store_path_recovers_after_bad_peer_observation() {
2019        let tit_for_tat_successes = run_bad_peer_series(SelectionStrategy::TitForTat).await;
2020
2021        assert!(
2022            tit_for_tat_successes >= 5,
2023            "expected tit-for-tat path to recover after the first consistently bad peer observation (successes={tit_for_tat_successes})"
2024        );
2025    }
2026}
2027
2028/// Type alias for simulation store.
2029pub type SimMeshStore<S> =
2030    MeshStoreCore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
2031
2032/// Type alias for the default production core (Nostr signaling + WebRTC links).
2033pub type ProductionMeshStore<S> =
2034    MeshStoreCore<S, crate::nostr::NostrRelayTransport, crate::real_factory::WebRtcPeerLinkFactory>;