Skip to main content

hashtree_network/
generic_store.rs

1//! Generic P2P store using abstract transports.
2//!
3//! This module provides a store wrapper that works with any local storage
4//! backend plus any signaling transport and peer-link factory. Both production
5//! (Nostr websockets + WebRTC) and simulation (mocks) use this same code.
6
7use async_trait::async_trait;
8use std::collections::hash_map::DefaultHasher;
9use std::collections::{HashMap, HashSet};
10use std::future::Future;
11use std::hash::{Hash as _, Hasher};
12use std::ops::Range;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use tokio::sync::{oneshot, Mutex, RwLock};
16
17use hashtree_core::{Hash, Store, StoreError};
18
19use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
20use crate::protocol::{
21    create_quote_request, create_quote_response_available, create_quote_response_unavailable,
22    create_request, create_request_with_quote, create_response, encode_quote_request,
23    encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
24    DataMessage, DataQuoteRequest, DataQuoteResponse,
25};
26use crate::signaling::MeshRouter;
27use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
28use crate::types::{PeerHTLConfig, SignalingMessage, MAX_HTL};
29
30// Keep the on-disk namespace stable across the crate rename so existing peer
31// metadata does not disappear for users upgrading from the old package name.
32const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
33
34/// Pending request awaiting response
35struct PendingRequest {
36    response_tx: oneshot::Sender<Option<Vec<u8>>>,
37    started_at: Instant,
38    queried_peers: Vec<String>,
39}
40
41struct PendingQuoteRequest {
42    response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
43    preferred_mint_url: Option<String>,
44    offered_payment_sat: u64,
45}
46
47#[derive(Debug, Clone)]
48struct NegotiatedQuote {
49    peer_id: String,
50    quote_id: u64,
51    #[allow(dead_code)]
52    mint_url: Option<String>,
53}
54
55struct IssuedQuote {
56    expires_at: Instant,
57    #[allow(dead_code)]
58    payment_sat: u64,
59    #[allow(dead_code)]
60    mint_url: Option<String>,
61}
62
63/// Request dispatch strategy for peer queries.
64///
65/// `GenericStore` supports two practical retrieval modes:
66/// - Flood (`usize::MAX` fanout): maximize success/latency at bandwidth cost.
67/// - Staged hedging: probe a subset first, then expand.
68#[derive(Debug, Clone, Copy)]
69pub struct RequestDispatchConfig {
70    /// Number of peers queried immediately.
71    pub initial_fanout: usize,
72    /// Number of additional peers to query on each hedge step.
73    pub hedge_fanout: usize,
74    /// Total peers allowed for this request.
75    pub max_fanout: usize,
76    /// Delay between hedge waves (ms). `0` means send all waves immediately.
77    pub hedge_interval_ms: u64,
78}
79
80impl Default for RequestDispatchConfig {
81    fn default() -> Self {
82        Self {
83            initial_fanout: usize::MAX,
84            hedge_fanout: usize::MAX,
85            max_fanout: usize::MAX,
86            hedge_interval_ms: 0,
87        }
88    }
89}
90
91/// Normalize fanout config against current peer availability.
92pub fn normalize_dispatch_config(
93    dispatch: RequestDispatchConfig,
94    available_peers: usize,
95) -> RequestDispatchConfig {
96    let mut cfg = dispatch;
97    let cap = if cfg.max_fanout == 0 {
98        available_peers
99    } else {
100        cfg.max_fanout.min(available_peers)
101    };
102    cfg.max_fanout = cap;
103    cfg.initial_fanout = if cfg.initial_fanout == 0 {
104        1
105    } else {
106        cfg.initial_fanout.min(cap.max(1))
107    };
108    cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
109        1
110    } else {
111        cfg.hedge_fanout.min(cap.max(1))
112    };
113    cfg
114}
115
116/// Build wave sizes for staged hedged dispatch.
117pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
118    if peer_count == 0 {
119        return Vec::new();
120    }
121    let cap = dispatch.max_fanout.min(peer_count);
122    if cap == 0 {
123        return Vec::new();
124    }
125
126    let mut plan = Vec::new();
127    let mut sent = 0usize;
128    let first = dispatch.initial_fanout.min(cap).max(1);
129    plan.push(first);
130    sent += first;
131
132    while sent < cap {
133        let next = dispatch.hedge_fanout.min(cap - sent).max(1);
134        plan.push(next);
135        sent += next;
136    }
137    plan
138}
139
140/// Outcome returned after waiting on a hedged dispatch wave.
141#[derive(Debug)]
142pub enum HedgedWaveAction<T> {
143    Continue,
144    Success(T),
145    Abort,
146}
147
148/// Run a staged hedged dispatch over peer index ranges.
149///
150/// This scheduler is shared by the reusable `GenericStore` and the native
151/// `hashtree-cli` mesh path so tests and production use the same wave timing.
152pub async fn run_hedged_waves<T, SendWave, SendWaveFut, WaitWave, WaitWaveFut>(
153    peer_count: usize,
154    dispatch: RequestDispatchConfig,
155    request_timeout: Duration,
156    mut send_wave: SendWave,
157    mut wait_wave: WaitWave,
158) -> Option<T>
159where
160    SendWave: FnMut(Range<usize>) -> SendWaveFut,
161    SendWaveFut: Future<Output = usize>,
162    WaitWave: FnMut(Duration) -> WaitWaveFut,
163    WaitWaveFut: Future<Output = HedgedWaveAction<T>>,
164{
165    let dispatch = normalize_dispatch_config(dispatch, peer_count);
166    let wave_plan = build_hedged_wave_plan(peer_count, dispatch);
167    if wave_plan.is_empty() {
168        return None;
169    }
170
171    let deadline = Instant::now() + request_timeout;
172    let mut sent_total = 0usize;
173    let mut next_peer_idx = 0usize;
174
175    for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
176        let from = next_peer_idx;
177        let to = (next_peer_idx + wave_size).min(peer_count);
178        next_peer_idx = to;
179
180        if from == to {
181            continue;
182        }
183
184        sent_total += send_wave(from..to).await;
185        if sent_total == 0 {
186            if next_peer_idx >= peer_count {
187                break;
188            }
189            continue;
190        }
191
192        let now = Instant::now();
193        if now >= deadline {
194            break;
195        }
196        let remaining = deadline.saturating_duration_since(now);
197        let is_last_wave = wave_idx + 1 == wave_plan.len() || next_peer_idx >= peer_count;
198        let wait = if is_last_wave {
199            remaining
200        } else if dispatch.hedge_interval_ms == 0 {
201            Duration::ZERO
202        } else {
203            Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
204        };
205
206        if wait.is_zero() {
207            continue;
208        }
209
210        match wait_wave(wait).await {
211            HedgedWaveAction::Continue => {}
212            HedgedWaveAction::Success(value) => return Some(value),
213            HedgedWaveAction::Abort => break,
214        }
215    }
216
217    None
218}
219
220/// Keep selector membership aligned with currently connected peer IDs.
221pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
222    let mut selector = selector.write().await;
223    let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
224    let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
225    for peer_id in known {
226        if !current.contains(peer_id.as_str()) {
227            selector.remove_peer(&peer_id);
228        }
229    }
230    for peer_id in current_peer_ids {
231        selector.add_peer(peer_id.clone());
232    }
233}
234
235/// Response behavior profile for simulation/game-theory actors.
236///
237/// Defaults to honest behavior (always respond correctly, no extra delay).
238#[derive(Debug, Clone, Copy)]
239pub struct ResponseBehaviorConfig {
240    /// Probability that a node drops a response even when it has data.
241    pub drop_response_prob: f64,
242    /// Probability that a node responds with corrupted payload.
243    pub corrupt_response_prob: f64,
244    /// Optional response delay to model slow/incompetent peers.
245    pub extra_delay_ms: u64,
246}
247
248impl Default for ResponseBehaviorConfig {
249    fn default() -> Self {
250        Self {
251            drop_response_prob: 0.0,
252            corrupt_response_prob: 0.0,
253            extra_delay_ms: 0,
254        }
255    }
256}
257
258impl ResponseBehaviorConfig {
259    fn normalized(self) -> Self {
260        Self {
261            drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
262            corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
263            extra_delay_ms: self.extra_delay_ms,
264        }
265    }
266}
267
268/// Routing policy for request ordering + dispatch fanout.
269#[derive(Debug, Clone)]
270pub struct GenericStoreRoutingConfig {
271    pub selection_strategy: SelectionStrategy,
272    pub fairness_enabled: bool,
273    /// Blend weight for payment-priority ranking in selector (`0.0` disables).
274    pub cashu_payment_weight: f64,
275    /// Refuse serving peers that have reached this many unpaid post-delivery settlements.
276    /// `0` disables refusal and only keeps metadata/downranking.
277    pub cashu_payment_default_block_threshold: u64,
278    /// Cashu mint URLs this node is willing to use for settlement.
279    pub cashu_accepted_mints: Vec<String>,
280    /// Preferred Cashu mint URL when initiating paid retrieval.
281    pub cashu_default_mint: Option<String>,
282    /// Baseline cap for accepting a peer-suggested mint outside the trusted set.
283    pub cashu_peer_suggested_mint_base_cap_sat: u64,
284    /// Additional sats allowed per successful delivery from that peer.
285    pub cashu_peer_suggested_mint_success_step_sat: u64,
286    /// Additional sats allowed per successful post-delivery payment received from that peer.
287    pub cashu_peer_suggested_mint_receipt_step_sat: u64,
288    /// Hard upper bound for any single peer-suggested mint quote we accept.
289    pub cashu_peer_suggested_mint_max_cap_sat: u64,
290    pub dispatch: RequestDispatchConfig,
291    pub response_behavior: ResponseBehaviorConfig,
292}
293
294impl Default for GenericStoreRoutingConfig {
295    fn default() -> Self {
296        Self {
297            selection_strategy: SelectionStrategy::Weighted,
298            fairness_enabled: true,
299            cashu_payment_weight: 0.0,
300            cashu_payment_default_block_threshold: 0,
301            cashu_accepted_mints: Vec::new(),
302            cashu_default_mint: None,
303            cashu_peer_suggested_mint_base_cap_sat: 0,
304            cashu_peer_suggested_mint_success_step_sat: 0,
305            cashu_peer_suggested_mint_receipt_step_sat: 0,
306            cashu_peer_suggested_mint_max_cap_sat: 0,
307            dispatch: RequestDispatchConfig::default(),
308            response_behavior: ResponseBehaviorConfig::default(),
309        }
310    }
311}
312
313/// Generic mesh store that works with any storage backend and transport
314/// implementation.
315///
316/// This is the shared code between production and simulation.
317/// - Production: `GenericStore<LmdbStore, NostrSignalingTransport, WebRtcPeerLinkFactory>`
318/// - Simulation: `GenericStore<MemoryStore, MockSignalingTransport, MockConnectionFactory>`
319pub struct GenericStore<S, R, F>
320where
321    S: Store + Send + Sync + 'static,
322    R: SignalingTransport + Send + Sync + 'static,
323    F: PeerLinkFactory + Send + Sync + 'static,
324{
325    /// Local backing store
326    local_store: Arc<S>,
327    /// Mesh router (handles peer discovery and connection)
328    signaling: Arc<MeshRouter<R, F>>,
329    /// Per-peer HTL config
330    htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
331    /// Pending requests we sent
332    pending_requests: RwLock<HashMap<String, PendingRequest>>,
333    /// Pending quote negotiations keyed by requested hash.
334    pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
335    /// Quotes we issued to peers and will accept exactly once until expiry.
336    issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
337    /// Monotonic quote identifier generator.
338    next_quote_id: RwLock<u64>,
339    /// Adaptive selector for peer ordering.
340    peer_selector: RwLock<PeerSelector>,
341    /// Routing/dispatch configuration.
342    routing: GenericStoreRoutingConfig,
343    /// Request timeout
344    request_timeout: Duration,
345    /// Debug mode
346    debug: bool,
347    /// Running flag
348    running: RwLock<bool>,
349}
350
351impl<S, R, F> GenericStore<S, R, F>
352where
353    S: Store + Send + Sync + 'static,
354    R: SignalingTransport + Send + Sync + 'static,
355    F: PeerLinkFactory + Send + Sync + 'static,
356{
357    /// Create a new generic store
358    pub fn new(
359        local_store: Arc<S>,
360        signaling: Arc<MeshRouter<R, F>>,
361        request_timeout: Duration,
362        debug: bool,
363    ) -> Self {
364        Self::new_with_routing(
365            local_store,
366            signaling,
367            request_timeout,
368            debug,
369            Default::default(),
370        )
371    }
372
373    /// Create a new generic store with explicit routing configuration.
374    pub fn new_with_routing(
375        local_store: Arc<S>,
376        signaling: Arc<MeshRouter<R, F>>,
377        request_timeout: Duration,
378        debug: bool,
379        routing: GenericStoreRoutingConfig,
380    ) -> Self {
381        let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
382        selector.set_fairness(routing.fairness_enabled);
383        selector.set_cashu_payment_weight(routing.cashu_payment_weight);
384        Self {
385            local_store,
386            signaling,
387            htl_configs: RwLock::new(HashMap::new()),
388            pending_requests: RwLock::new(HashMap::new()),
389            pending_quotes: RwLock::new(HashMap::new()),
390            issued_quotes: RwLock::new(HashMap::new()),
391            next_quote_id: RwLock::new(1),
392            peer_selector: RwLock::new(selector),
393            routing,
394            request_timeout,
395            debug,
396            running: RwLock::new(false),
397        }
398    }
399
400    /// Start the store (begin listening for messages)
401    pub async fn start(&self) -> Result<(), TransportError> {
402        *self.running.write().await = true;
403
404        // Send initial hello
405        self.signaling.send_hello(vec![]).await?;
406
407        Ok(())
408    }
409
410    /// Stop the store
411    pub async fn stop(&self) {
412        *self.running.write().await = false;
413    }
414
415    /// Process incoming signaling message
416    pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
417        // When a new peer connects, initialize their HTL config
418        let peer_id = msg.peer_id().to_string();
419        {
420            let mut configs = self.htl_configs.write().await;
421            if !configs.contains_key(&peer_id) {
422                configs.insert(peer_id.clone(), PeerHTLConfig::random());
423            }
424        }
425        self.peer_selector.write().await.add_peer(peer_id);
426
427        self.signaling.handle_message(msg).await
428    }
429
430    /// Get signaling manager reference
431    pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
432        &self.signaling
433    }
434
435    fn response_behavior(&self) -> ResponseBehaviorConfig {
436        self.routing.response_behavior.normalized()
437    }
438
439    fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
440        let mut hasher = DefaultHasher::new();
441        peer_id.hash(&mut hasher);
442        hash.hash(&mut hasher);
443        salt.hash(&mut hasher);
444        let v = hasher.finish();
445        (v as f64) / (u64::MAX as f64)
446    }
447
448    fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
449        Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
450    }
451
452    fn peer_metadata_pointer_slot_hash() -> Hash {
453        hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
454    }
455
456    fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
457        let bytes = hex::decode(hash_hex)
458            .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
459        if bytes.len() != 32 {
460            return Err(StoreError::Other(format!(
461                "Invalid hash length {}, expected 32 bytes",
462                bytes.len()
463            )));
464        }
465        let mut hash = [0u8; 32];
466        hash.copy_from_slice(&bytes);
467        Ok(hash)
468    }
469
470    fn should_drop_response(&self, hash: &Hash) -> bool {
471        let p = self.response_behavior().drop_response_prob;
472        if p <= 0.0 {
473            return false;
474        }
475        self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
476    }
477
478    fn should_corrupt_response(&self, hash: &Hash) -> bool {
479        let p = self.response_behavior().corrupt_response_prob;
480        if p <= 0.0 {
481            return false;
482        }
483        self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
484    }
485
486    async fn ordered_connected_peers(&self) -> Vec<String> {
487        let current_peer_ids = self.signaling.peer_ids().await;
488        if current_peer_ids.is_empty() {
489            return Vec::new();
490        }
491
492        sync_selector_peers(&self.peer_selector, &current_peer_ids).await;
493        let current_set: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
494        let mut ordered_peer_ids = self.peer_selector.write().await.select_peers();
495        ordered_peer_ids.retain(|peer_id| current_set.contains(peer_id.as_str()));
496        if ordered_peer_ids.is_empty() {
497            let mut fallback = current_peer_ids;
498            fallback.sort();
499            return fallback;
500        }
501        ordered_peer_ids
502    }
503
504    fn requested_quote_mint(&self) -> Option<&str> {
505        if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
506            if self.routing.cashu_accepted_mints.is_empty()
507                || self
508                    .routing
509                    .cashu_accepted_mints
510                    .iter()
511                    .any(|mint| mint == default_mint)
512            {
513                return Some(default_mint);
514            }
515        }
516
517        self.routing
518            .cashu_accepted_mints
519            .first()
520            .map(String::as_str)
521    }
522
523    fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
524        if let Some(requested_mint) = requested_mint {
525            if self.accepts_quote_mint(Some(requested_mint)) {
526                return Some(requested_mint.to_string());
527            }
528        }
529        if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
530            return Some(default_mint.clone());
531        }
532        if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
533            return Some(first_mint.clone());
534        }
535        requested_mint.map(str::to_string)
536    }
537
538    fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
539        if self.routing.cashu_accepted_mints.is_empty() {
540            return true;
541        }
542
543        let Some(mint_url) = mint_url else {
544            return false;
545        };
546        self.routing
547            .cashu_accepted_mints
548            .iter()
549            .any(|mint| mint == mint_url)
550    }
551
552    fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
553        let Some(mint_url) = mint_url else {
554            return self.routing.cashu_default_mint.is_none()
555                && self.routing.cashu_accepted_mints.is_empty();
556        };
557        self.routing.cashu_default_mint.as_deref() == Some(mint_url)
558            || self
559                .routing
560                .cashu_accepted_mints
561                .iter()
562                .any(|mint| mint == mint_url)
563    }
564
565    async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
566        let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
567        if base == 0 {
568            return 0;
569        }
570
571        let selector = self.peer_selector.read().await;
572        let Some(stats) = selector.get_stats(peer_id) else {
573            let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
574            return if max_cap > 0 { base.min(max_cap) } else { base };
575        };
576
577        if stats.cashu_payment_defaults > 0
578            && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
579        {
580            return 0;
581        }
582
583        let success_bonus = stats
584            .successes
585            .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
586        let receipt_bonus = stats
587            .cashu_payment_receipts
588            .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
589        let mut cap = base
590            .saturating_add(success_bonus)
591            .saturating_add(receipt_bonus);
592        let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
593        if max_cap > 0 {
594            cap = cap.min(max_cap);
595        }
596        cap
597    }
598
599    async fn should_accept_quote_response(
600        &self,
601        from_peer: &str,
602        preferred_mint_url: Option<&str>,
603        offered_payment_sat: u64,
604        res: &DataQuoteResponse,
605    ) -> bool {
606        let Some(payment_sat) = res.p else {
607            return false;
608        };
609        if payment_sat > offered_payment_sat {
610            return false;
611        }
612
613        let response_mint = res.m.as_deref();
614        if response_mint == preferred_mint_url {
615            return true;
616        }
617        if self.trusts_quote_mint(response_mint) {
618            return true;
619        }
620        if response_mint.is_none() {
621            return false;
622        }
623
624        payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
625    }
626
627    async fn issue_quote(
628        &self,
629        peer_id: &str,
630        hash_key: &str,
631        payment_sat: u64,
632        ttl_ms: u32,
633        mint_url: Option<&str>,
634    ) -> u64 {
635        let quote_id = {
636            let mut next = self.next_quote_id.write().await;
637            let quote_id = *next;
638            *next = next.saturating_add(1);
639            quote_id
640        };
641
642        let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
643        self.issued_quotes.write().await.insert(
644            (peer_id.to_string(), hash_key.to_string(), quote_id),
645            IssuedQuote {
646                expires_at,
647                payment_sat,
648                mint_url: mint_url.map(str::to_string),
649            },
650        );
651        quote_id
652    }
653
654    async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
655        let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
656        let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
657            return false;
658        };
659        quote.expires_at > Instant::now()
660    }
661
662    async fn send_request_to_peer(
663        &self,
664        peer_id: &str,
665        hash: &Hash,
666        quote_id: Option<u64>,
667    ) -> bool {
668        let channel = match self.signaling.get_channel(peer_id).await {
669            Some(c) => c,
670            None => return false,
671        };
672
673        let htl_config = {
674            let configs = self.htl_configs.read().await;
675            configs
676                .get(peer_id)
677                .cloned()
678                .unwrap_or_else(PeerHTLConfig::random)
679        };
680
681        let send_htl = htl_config.decrement(MAX_HTL);
682        let req = match quote_id {
683            Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
684            None => create_request(hash, send_htl),
685        };
686        let request_bytes = encode_request(&req);
687
688        {
689            let mut selector = self.peer_selector.write().await;
690            selector.record_request(peer_id, request_bytes.len() as u64);
691        }
692
693        match channel.send(request_bytes).await {
694            Ok(()) => true,
695            Err(_) => {
696                self.peer_selector.write().await.record_failure(peer_id);
697                false
698            }
699        }
700    }
701
702    async fn send_quote_request_to_peer(
703        &self,
704        peer_id: &str,
705        hash: &Hash,
706        payment_sat: u64,
707        ttl_ms: u32,
708        mint_url: Option<&str>,
709    ) -> bool {
710        let channel = match self.signaling.get_channel(peer_id).await {
711            Some(c) => c,
712            None => return false,
713        };
714
715        let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
716        let request_bytes = encode_quote_request(&req);
717
718        match channel.send(request_bytes).await {
719            Ok(()) => true,
720            Err(_) => false,
721        }
722    }
723
724    /// Get peer count
725    pub async fn peer_count(&self) -> usize {
726        self.signaling.peer_count().await
727    }
728
729    /// Check if we need more peers
730    pub async fn needs_peers(&self) -> bool {
731        self.signaling.needs_peers().await
732    }
733
734    /// Re-broadcast hello to refresh discovery as topology changes.
735    pub async fn send_hello(&self) -> Result<(), TransportError> {
736        self.signaling.send_hello(vec![]).await
737    }
738
739    /// Apply an out-of-band payment credit to a peer's routing priority.
740    pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
741        self.peer_selector
742            .write()
743            .await
744            .record_cashu_payment(peer_id, amount_sat);
745    }
746
747    /// Record a post-delivery payment we received from a peer.
748    pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
749        self.peer_selector
750            .write()
751            .await
752            .record_cashu_receipt(peer_id, amount_sat);
753    }
754
755    /// Record that a peer failed to pay after we delivered successfully.
756    pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
757        self.peer_selector
758            .write()
759            .await
760            .record_cashu_payment_default(peer_id);
761    }
762
763    fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
764        selector.is_peer_blocked_for_payment_defaults(
765            peer_id,
766            self.routing.cashu_payment_default_block_threshold,
767        )
768    }
769
770    /// Export live peer metadata for inspection/debugging.
771    pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
772        self.peer_selector
773            .read()
774            .await
775            .export_peer_metadata_snapshot()
776    }
777
778    /// Snapshot current peer metadata and persist it into `local_store`.
779    ///
780    /// Uses content-addressed storage for the snapshot body and a reserved
781    /// mutable pointer slot for the "latest snapshot hash".
782    pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
783        let snapshot = self
784            .peer_selector
785            .read()
786            .await
787            .export_peer_metadata_snapshot();
788        let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
789            StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
790        })?;
791        let snapshot_hash = hashtree_core::sha256(&bytes);
792        let _ = self.local_store.put(snapshot_hash, bytes).await?;
793
794        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
795        let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
796        let _ = self.local_store.delete(&pointer_slot).await?;
797        let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
798
799        Ok(snapshot_hash)
800    }
801
802    /// Load persisted peer metadata from `local_store` if available.
803    pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
804        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
805        let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
806            return Ok(false);
807        };
808        let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
809            StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
810        })?;
811        let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
812
813        let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
814            return Ok(false);
815        };
816        let snapshot: PeerMetadataSnapshot =
817            serde_json::from_slice(&snapshot_bytes).map_err(|e| {
818                StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
819            })?;
820        self.peer_selector
821            .write()
822            .await
823            .import_peer_metadata_snapshot(&snapshot);
824        Ok(true)
825    }
826
827    /// Request data from peers after negotiating a paid quote.
828    ///
829    /// If quote negotiation fails or the quoted peer does not deliver, the store
830    /// falls back to the normal unpaid retrieval path to preserve liveness.
831    pub async fn get_with_quote(
832        &self,
833        hash: &Hash,
834        payment_sat: u64,
835        quote_ttl: Duration,
836    ) -> Result<Option<Vec<u8>>, StoreError> {
837        if let Some(data) = self.local_store.get(hash).await? {
838            return Ok(Some(data));
839        }
840        Ok(self
841            .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
842            .await)
843    }
844
845    async fn request_from_peers_with_quote(
846        &self,
847        hash: &Hash,
848        payment_sat: u64,
849        quote_ttl: Duration,
850    ) -> Option<Vec<u8>> {
851        let ordered_peer_ids = self.ordered_connected_peers().await;
852        if ordered_peer_ids.is_empty() {
853            return None;
854        }
855
856        if let Some(quote) = self
857            .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
858            .await
859        {
860            if let Some(data) = self
861                .request_from_single_peer(hash, &quote.peer_id, Some(quote.quote_id))
862                .await
863            {
864                return Some(data);
865            }
866        }
867
868        self.request_from_ordered_peers(hash, &ordered_peer_ids)
869            .await
870    }
871
872    async fn request_quote_from_peers(
873        &self,
874        hash: &Hash,
875        payment_sat: u64,
876        quote_ttl: Duration,
877        ordered_peer_ids: &[String],
878    ) -> Option<NegotiatedQuote> {
879        if ordered_peer_ids.is_empty() {
880            return None;
881        }
882        let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
883        if ttl_ms == 0 {
884            return None;
885        }
886        let requested_mint = self.requested_quote_mint().map(str::to_string);
887
888        let hash_key = hash_to_key(hash);
889        let (tx, rx) = oneshot::channel();
890        self.pending_quotes.write().await.insert(
891            hash_key.clone(),
892            PendingQuoteRequest {
893                response_tx: tx,
894                preferred_mint_url: requested_mint.clone(),
895                offered_payment_sat: payment_sat,
896            },
897        );
898
899        let rx = Arc::new(Mutex::new(rx));
900        let result = run_hedged_waves(
901            ordered_peer_ids.len(),
902            self.routing.dispatch,
903            self.request_timeout,
904            |range| {
905                let wave_peer_ids = ordered_peer_ids[range].to_vec();
906                let requested_mint = requested_mint.clone();
907                let hash = *hash;
908                async move {
909                    let mut sent = 0usize;
910                    for peer_id in wave_peer_ids {
911                        if self
912                            .send_quote_request_to_peer(
913                                &peer_id,
914                                &hash,
915                                payment_sat,
916                                ttl_ms,
917                                requested_mint.as_deref(),
918                            )
919                            .await
920                        {
921                            sent += 1;
922                        }
923                    }
924                    sent
925                }
926            },
927            |wait| {
928                let rx = rx.clone();
929                async move {
930                    let mut rx = rx.lock().await;
931                    match tokio::time::timeout(wait, &mut *rx).await {
932                        Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
933                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
934                        Err(_) => HedgedWaveAction::Continue,
935                    }
936                }
937            },
938        )
939        .await;
940        let _ = self.pending_quotes.write().await.remove(&hash_key);
941        result
942    }
943
944    async fn request_from_single_peer(
945        &self,
946        hash: &Hash,
947        peer_id: &str,
948        quote_id: Option<u64>,
949    ) -> Option<Vec<u8>> {
950        let hash_key = hash_to_key(hash);
951        let (tx, rx) = oneshot::channel();
952        self.pending_requests.write().await.insert(
953            hash_key.clone(),
954            PendingRequest {
955                response_tx: tx,
956                started_at: Instant::now(),
957                queried_peers: vec![peer_id.to_string()],
958            },
959        );
960
961        let mut rx = rx;
962        if !self.send_request_to_peer(peer_id, hash, quote_id).await {
963            let _ = self.pending_requests.write().await.remove(&hash_key);
964            return None;
965        }
966
967        if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
968            if hashtree_core::sha256(&data) == *hash {
969                let _ = self.local_store.put(*hash, data.clone()).await;
970                return Some(data);
971            }
972        }
973
974        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
975            for peer_id in pending.queried_peers {
976                self.peer_selector.write().await.record_timeout(&peer_id);
977            }
978        }
979        None
980    }
981
982    async fn request_from_ordered_peers(
983        &self,
984        hash: &Hash,
985        ordered_peer_ids: &[String],
986    ) -> Option<Vec<u8>> {
987        let hash_key = hash_to_key(hash);
988        let (tx, rx) = oneshot::channel();
989        self.pending_requests.write().await.insert(
990            hash_key.clone(),
991            PendingRequest {
992                response_tx: tx,
993                started_at: Instant::now(),
994                queried_peers: Vec::new(),
995            },
996        );
997
998        let rx = Arc::new(Mutex::new(rx));
999        let result = run_hedged_waves(
1000            ordered_peer_ids.len(),
1001            self.routing.dispatch,
1002            self.request_timeout,
1003            |range| {
1004                let wave_peer_ids = ordered_peer_ids[range].to_vec();
1005                let hash = *hash;
1006                let hash_key = hash_key.clone();
1007                async move {
1008                    let mut sent = 0usize;
1009                    for peer_id in wave_peer_ids {
1010                        if self.send_request_to_peer(&peer_id, &hash, None).await {
1011                            sent += 1;
1012                            if let Some(pending) =
1013                                self.pending_requests.write().await.get_mut(&hash_key)
1014                            {
1015                                pending.queried_peers.push(peer_id);
1016                            }
1017                        }
1018                    }
1019                    sent
1020                }
1021            },
1022            |wait| {
1023                let rx = rx.clone();
1024                async move {
1025                    let mut rx = rx.lock().await;
1026                    match tokio::time::timeout(wait, &mut *rx).await {
1027                        Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == *hash => {
1028                            HedgedWaveAction::Success(data)
1029                        }
1030                        Ok(Ok(Some(_))) => HedgedWaveAction::Continue,
1031                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1032                        Err(_) => HedgedWaveAction::Continue,
1033                    }
1034                }
1035            },
1036        )
1037        .await;
1038
1039        let Some(data) = result else {
1040            if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1041                for peer_id in pending.queried_peers {
1042                    self.peer_selector.write().await.record_timeout(&peer_id);
1043                }
1044            }
1045            return None;
1046        };
1047
1048        let _ = self.local_store.put(*hash, data.clone()).await;
1049        Some(data)
1050    }
1051
1052    /// Request data from peers
1053    async fn request_from_peers(&self, hash: &Hash) -> Option<Vec<u8>> {
1054        let ordered_peer_ids = self.ordered_connected_peers().await;
1055        if ordered_peer_ids.is_empty() {
1056            return None;
1057        }
1058        self.request_from_ordered_peers(hash, &ordered_peer_ids)
1059            .await
1060    }
1061
1062    async fn complete_pending_response(&self, from_peer: &str, hash_key: String, payload: Vec<u8>) {
1063        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1064            let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
1065            self.peer_selector.write().await.record_success(
1066                from_peer,
1067                rtt_ms,
1068                payload.len() as u64,
1069            );
1070            let _ = pending.response_tx.send(Some(payload));
1071        }
1072    }
1073
1074    async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
1075        if !res.a {
1076            return;
1077        }
1078
1079        let Some(quote_id) = res.q else {
1080            return;
1081        };
1082
1083        let hash_key = hash_to_key(&res.h);
1084        let (preferred_mint_url, offered_payment_sat) = {
1085            let pending_quotes = self.pending_quotes.read().await;
1086            let Some(pending) = pending_quotes.get(&hash_key) else {
1087                return;
1088            };
1089            (
1090                pending.preferred_mint_url.clone(),
1091                pending.offered_payment_sat,
1092            )
1093        };
1094        if !self
1095            .should_accept_quote_response(
1096                from_peer,
1097                preferred_mint_url.as_deref(),
1098                offered_payment_sat,
1099                &res,
1100            )
1101            .await
1102        {
1103            return;
1104        }
1105        let mut pending_quotes = self.pending_quotes.write().await;
1106        if let Some(pending) = pending_quotes.remove(&hash_key) {
1107            let _ = pending.response_tx.send(Some(NegotiatedQuote {
1108                peer_id: from_peer.to_string(),
1109                quote_id,
1110                mint_url: res.m,
1111            }));
1112        }
1113    }
1114
1115    async fn handle_response_message(&self, from_peer: &str, res: crate::protocol::DataResponse) {
1116        let hash_key = hash_to_key(&res.h);
1117        let hash = match crate::protocol::bytes_to_hash(&res.h) {
1118            Some(h) => h,
1119            None => return,
1120        };
1121
1122        // Ignore malformed/corrupt payload and keep waiting for a valid response.
1123        if hashtree_core::sha256(&res.d) != hash {
1124            self.peer_selector.write().await.record_failure(from_peer);
1125            if self.debug {
1126                println!("[GenericStore] Ignoring invalid response payload for {hash_key}");
1127            }
1128            return;
1129        }
1130
1131        self.complete_pending_response(from_peer, hash_key, res.d)
1132            .await;
1133    }
1134
1135    async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
1136        let hash = match crate::protocol::bytes_to_hash(&req.h) {
1137            Some(h) => h,
1138            None => return,
1139        };
1140        let hash_key = hash_to_key(&hash);
1141
1142        {
1143            let selector = self.peer_selector.read().await;
1144            if self.should_refuse_requests_from_peer(&selector, from_peer) {
1145                if self.debug {
1146                    println!(
1147                        "[GenericStore] Refusing quote request from delinquent peer {}",
1148                        from_peer
1149                    );
1150                }
1151                return;
1152            }
1153        }
1154
1155        let chosen_mint = self.choose_quote_mint(req.m.as_deref());
1156        let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
1157            && !self.should_drop_response(&hash)
1158            && !self.should_corrupt_response(&hash);
1159
1160        let res = if can_serve {
1161            let quote_id = self
1162                .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
1163                .await;
1164            create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
1165        } else {
1166            create_quote_response_unavailable(&hash)
1167        };
1168        let response_bytes = encode_quote_response(&res);
1169        if let Some(channel) = self.signaling.get_channel(from_peer).await {
1170            let _ = channel.send(response_bytes).await;
1171        }
1172    }
1173
1174    async fn handle_request_message(&self, from_peer: &str, req: crate::protocol::DataRequest) {
1175        let hash = match crate::protocol::bytes_to_hash(&req.h) {
1176            Some(h) => h,
1177            None => return,
1178        };
1179        let hash_key = hash_to_key(&hash);
1180
1181        {
1182            let selector = self.peer_selector.read().await;
1183            if self.should_refuse_requests_from_peer(&selector, from_peer) {
1184                if self.debug {
1185                    println!(
1186                        "[GenericStore] Refusing request from delinquent peer {}",
1187                        from_peer
1188                    );
1189                }
1190                return;
1191            }
1192        }
1193
1194        if let Some(quote_id) = req.q {
1195            if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
1196                if self.debug {
1197                    println!(
1198                        "[GenericStore] Refusing request with invalid or expired quote {} from {}",
1199                        quote_id, from_peer
1200                    );
1201                }
1202                return;
1203            }
1204        }
1205
1206        // Check local store
1207        if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
1208            if self.should_drop_response(&hash) {
1209                if self.debug {
1210                    println!(
1211                        "[GenericStore] Dropping response for {} due to actor profile",
1212                        hash_to_key(&hash)
1213                    );
1214                }
1215                return;
1216            }
1217
1218            let behavior = self.response_behavior();
1219            if behavior.extra_delay_ms > 0 {
1220                tokio::time::sleep(Duration::from_millis(behavior.extra_delay_ms)).await;
1221            }
1222
1223            if self.should_corrupt_response(&hash) {
1224                if data.is_empty() {
1225                    data.push(0x80);
1226                } else {
1227                    data[0] ^= 0x80;
1228                }
1229            }
1230
1231            // Send response
1232            let res = create_response(&hash, data);
1233            let response_bytes = encode_response(&res);
1234            if let Some(channel) = self.signaling.get_channel(from_peer).await {
1235                let _ = channel.send(response_bytes).await;
1236            }
1237        }
1238        // For now, don't forward - keep it simple
1239    }
1240
1241    /// Handle incoming data message
1242    pub async fn handle_data_message(&self, from_peer: &str, data: &[u8]) {
1243        let parsed = match parse_message(data) {
1244            Some(m) => m,
1245            None => return,
1246        };
1247
1248        match parsed {
1249            DataMessage::Request(req) => {
1250                self.handle_request_message(from_peer, req).await;
1251            }
1252            DataMessage::Response(res) => {
1253                self.handle_response_message(from_peer, res).await;
1254            }
1255            DataMessage::QuoteRequest(req) => {
1256                self.handle_quote_request_message(from_peer, req).await;
1257            }
1258            DataMessage::QuoteResponse(res) => {
1259                self.handle_quote_response_message(from_peer, res).await;
1260            }
1261        }
1262    }
1263}
1264
1265#[async_trait]
1266impl<S, R, F> Store for GenericStore<S, R, F>
1267where
1268    S: Store + Send + Sync + 'static,
1269    R: SignalingTransport + Send + Sync + 'static,
1270    F: PeerLinkFactory + Send + Sync + 'static,
1271{
1272    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
1273        self.local_store.put(hash, data).await
1274    }
1275
1276    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
1277        // Try local first
1278        if let Some(data) = self.local_store.get(hash).await? {
1279            return Ok(Some(data));
1280        }
1281
1282        // Try peers
1283        Ok(self.request_from_peers(hash).await)
1284    }
1285
1286    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
1287        self.local_store.has(hash).await
1288    }
1289
1290    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
1291        self.local_store.delete(hash).await
1292    }
1293}
1294
1295#[cfg(test)]
1296mod tests {
1297    use super::*;
1298    use hashtree_core::MemoryStore;
1299    use std::sync::Arc;
1300    use std::sync::OnceLock;
1301    use std::time::Duration;
1302
1303    type TestStore = GenericStore<
1304        MemoryStore,
1305        crate::mock::MockRelayTransport,
1306        crate::mock::MockConnectionFactory,
1307    >;
1308
1309    struct TestNode {
1310        store: Arc<TestStore>,
1311        local_store: Arc<MemoryStore>,
1312        transport: Arc<crate::mock::MockRelayTransport>,
1313    }
1314
1315    fn mock_network_lock() -> &'static tokio::sync::Mutex<()> {
1316        static LOCK: OnceLock<tokio::sync::Mutex<()>> = OnceLock::new();
1317        LOCK.get_or_init(|| tokio::sync::Mutex::new(()))
1318    }
1319
1320    fn make_test_store(local_store: Arc<MemoryStore>, node_id: &str) -> TestStore {
1321        make_test_store_with_routing(local_store, node_id, GenericStoreRoutingConfig::default())
1322    }
1323
1324    fn make_test_store_with_routing(
1325        local_store: Arc<MemoryStore>,
1326        node_id: &str,
1327        routing: GenericStoreRoutingConfig,
1328    ) -> TestStore {
1329        let relay = crate::mock::MockRelay::new();
1330        let transport = Arc::new(relay.create_transport(node_id.to_string(), node_id.to_string()));
1331        let conn_factory = Arc::new(crate::mock::MockConnectionFactory::new(
1332            node_id.to_string(),
1333            0,
1334        ));
1335        let signaling = Arc::new(crate::signaling::MeshRouter::new(
1336            node_id.to_string(),
1337            node_id.to_string(),
1338            transport,
1339            conn_factory,
1340            crate::types::PoolSettings::default(),
1341            false,
1342        ));
1343
1344        TestStore::new_with_routing(
1345            local_store,
1346            signaling,
1347            Duration::from_millis(200),
1348            false,
1349            routing,
1350        )
1351    }
1352
1353    fn make_shared_test_node(
1354        relay: Arc<crate::mock::MockRelay>,
1355        node_id: &str,
1356        routing: GenericStoreRoutingConfig,
1357    ) -> TestNode {
1358        let transport = Arc::new(relay.create_transport(node_id.to_string(), node_id.to_string()));
1359        let conn_factory = Arc::new(crate::mock::MockConnectionFactory::new(
1360            node_id.to_string(),
1361            0,
1362        ));
1363        let signaling = Arc::new(crate::signaling::MeshRouter::new(
1364            node_id.to_string(),
1365            node_id.to_string(),
1366            transport.clone(),
1367            conn_factory,
1368            crate::types::PoolSettings::default(),
1369            false,
1370        ));
1371        let local_store = Arc::new(MemoryStore::new());
1372        let store = Arc::new(TestStore::new_with_routing(
1373            local_store.clone(),
1374            signaling,
1375            Duration::from_millis(120),
1376            false,
1377            routing,
1378        ));
1379
1380        TestNode {
1381            store,
1382            local_store,
1383            transport,
1384        }
1385    }
1386
1387    async fn pump_test_signaling(nodes: &[&TestNode]) -> usize {
1388        let mut processed = 0usize;
1389        for node in nodes {
1390            while let Some(msg) = node.transport.try_recv() {
1391                node.store
1392                    .process_signaling(msg)
1393                    .await
1394                    .expect("process signaling");
1395                processed += 1;
1396            }
1397        }
1398        processed
1399    }
1400
1401    async fn pump_test_data(nodes: &[&TestNode]) -> usize {
1402        let mut processed = 0usize;
1403        for node in nodes {
1404            let peer_ids = node.store.signaling().peer_ids().await;
1405            for peer_id in peer_ids {
1406                let Some(channel) = node.store.signaling().get_channel(&peer_id).await else {
1407                    continue;
1408                };
1409                while let Some(data) = channel.try_recv() {
1410                    node.store.handle_data_message(&peer_id, &data).await;
1411                    processed += 1;
1412                }
1413            }
1414        }
1415        processed
1416    }
1417
1418    async fn pump_test_network(nodes: &[&TestNode], max_steps: usize) {
1419        for _ in 0..max_steps {
1420            let signaling = pump_test_signaling(nodes).await;
1421            let data = pump_test_data(nodes).await;
1422            if signaling + data == 0 {
1423                tokio::task::yield_now().await;
1424            }
1425        }
1426    }
1427
1428    async fn run_get_with_pumps(
1429        requester: Arc<TestStore>,
1430        hash: Hash,
1431        nodes: &[&TestNode],
1432    ) -> Option<Vec<u8>> {
1433        let task = tokio::spawn(async move { requester.get(&hash).await.ok().flatten() });
1434        let started = Instant::now();
1435
1436        loop {
1437            if task.is_finished() {
1438                return task.await.expect("request task join");
1439            }
1440
1441            if started.elapsed() > Duration::from_secs(1) {
1442                task.abort();
1443                return None;
1444            }
1445
1446            pump_test_network(nodes, 4).await;
1447        }
1448    }
1449
1450    async fn run_bad_peer_series(strategy: SelectionStrategy) -> usize {
1451        let _guard = mock_network_lock().lock().await;
1452        crate::mock::clear_channel_registry().await;
1453
1454        let relay = crate::mock::MockRelay::new();
1455        let requester = make_shared_test_node(
1456            relay.clone(),
1457            "requester-reject",
1458            GenericStoreRoutingConfig {
1459                selection_strategy: strategy,
1460                fairness_enabled: false,
1461                dispatch: RequestDispatchConfig {
1462                    initial_fanout: 1,
1463                    hedge_fanout: 1,
1464                    max_fanout: 1,
1465                    hedge_interval_ms: 5,
1466                },
1467                ..Default::default()
1468            },
1469        );
1470        let bad = make_shared_test_node(
1471            relay.clone(),
1472            "a-bad",
1473            GenericStoreRoutingConfig {
1474                response_behavior: ResponseBehaviorConfig {
1475                    drop_response_prob: 1.0,
1476                    ..Default::default()
1477                },
1478                ..Default::default()
1479            },
1480        );
1481        let honest = make_shared_test_node(relay, "b-honest", GenericStoreRoutingConfig::default());
1482        let nodes = [&requester, &bad, &honest];
1483
1484        for node in &nodes {
1485            node.transport
1486                .connect(&[])
1487                .await
1488                .expect("connect transport");
1489            node.store.start().await.expect("start store");
1490        }
1491        pump_test_network(&nodes, 24).await;
1492
1493        let mut successes = 0usize;
1494        for round in 0..6 {
1495            let payload = format!("payload-{round}").into_bytes();
1496            let hash = hashtree_core::sha256(&payload);
1497            let _ = bad.local_store.put(hash, payload.clone()).await;
1498            let _ = honest.local_store.put(hash, payload.clone()).await;
1499
1500            let result = run_get_with_pumps(requester.store.clone(), hash, &nodes).await;
1501            if result.as_ref() == Some(&payload) {
1502                successes += 1;
1503            }
1504        }
1505
1506        crate::mock::clear_channel_registry().await;
1507        successes
1508    }
1509
1510    #[test]
1511    fn test_hedged_wave_plan_flood_all() {
1512        let plan = build_hedged_wave_plan(7, RequestDispatchConfig::default());
1513        assert_eq!(plan, vec![7]);
1514    }
1515
1516    #[test]
1517    fn test_hedged_wave_plan_staged() {
1518        let plan = build_hedged_wave_plan(
1519            10,
1520            RequestDispatchConfig {
1521                initial_fanout: 2,
1522                hedge_fanout: 3,
1523                max_fanout: 8,
1524                hedge_interval_ms: 25,
1525            },
1526        );
1527        assert_eq!(plan, vec![2, 3, 3]);
1528    }
1529
1530    #[tokio::test]
1531    async fn test_run_hedged_waves_uses_zero_interval_as_immediate_hedge() {
1532        let waits = Arc::new(std::sync::Mutex::new(Vec::new()));
1533        let waits_clone = waits.clone();
1534
1535        let result = run_hedged_waves(
1536            3,
1537            RequestDispatchConfig {
1538                initial_fanout: 1,
1539                hedge_fanout: 1,
1540                max_fanout: 3,
1541                hedge_interval_ms: 0,
1542            },
1543            Duration::from_millis(25),
1544            |_range| async { 1 },
1545            move |wait| {
1546                let waits = waits_clone.clone();
1547                async move {
1548                    waits.lock().expect("wait lock").push(wait);
1549                    HedgedWaveAction::<()>::Continue
1550                }
1551            },
1552        )
1553        .await;
1554
1555        assert!(result.is_none());
1556        let waits = waits.lock().expect("wait lock");
1557        assert_eq!(waits.len(), 1);
1558        assert!(waits[0] <= Duration::from_millis(25));
1559    }
1560
1561    #[test]
1562    fn test_response_behavior_normalization_clamps_probs() {
1563        let raw = ResponseBehaviorConfig {
1564            drop_response_prob: -1.5,
1565            corrupt_response_prob: 9.0,
1566            extra_delay_ms: 12,
1567        };
1568        let normalized = raw.normalized();
1569        assert_eq!(normalized.drop_response_prob, 0.0);
1570        assert_eq!(normalized.corrupt_response_prob, 1.0);
1571        assert_eq!(normalized.extra_delay_ms, 12);
1572    }
1573
1574    #[test]
1575    fn test_actor_draw_is_deterministic_per_peer_hash_and_salt() {
1576        let hash = hashtree_core::sha256(b"deterministic");
1577        let a = TestStore::deterministic_actor_draw_for("peer-a", &hash, 7);
1578        let b = TestStore::deterministic_actor_draw_for("peer-a", &hash, 7);
1579        assert!((a - b).abs() < f64::EPSILON);
1580    }
1581
1582    #[tokio::test]
1583    async fn test_load_peer_metadata_returns_false_when_missing() {
1584        let local_store = Arc::new(MemoryStore::new());
1585        let store = make_test_store(local_store, "0");
1586        assert!(!store.load_peer_metadata().await.expect("load result"));
1587    }
1588
1589    #[tokio::test]
1590    async fn test_persist_and_load_peer_metadata_with_existing_store_adapter() {
1591        let local_store = Arc::new(MemoryStore::new());
1592        let writer = make_test_store(local_store.clone(), "0");
1593        {
1594            let mut selector = writer.peer_selector.write().await;
1595            selector.add_peer("npub1stable:session-a");
1596            selector.record_request("npub1stable:session-a", 64);
1597            selector.record_success("npub1stable:session-a", 35, 1024);
1598            selector.record_cashu_payment("npub1stable:session-a", 120);
1599            selector.record_cashu_receipt("npub1stable:session-a", 40);
1600            selector.record_cashu_payment_default("npub1stable:session-a");
1601        }
1602
1603        let snapshot_hash = writer
1604            .persist_peer_metadata()
1605            .await
1606            .expect("persist peer metadata");
1607        assert!(local_store
1608            .get(&snapshot_hash)
1609            .await
1610            .expect("snapshot lookup")
1611            .is_some());
1612
1613        let reader = make_test_store(local_store, "1");
1614        assert!(reader
1615            .load_peer_metadata()
1616            .await
1617            .expect("load peer metadata snapshot"));
1618
1619        let mut selector = reader.peer_selector.write().await;
1620        selector.add_peer("npub1stable:session-b");
1621        let stats = selector
1622            .get_stats("npub1stable:session-b")
1623            .expect("restored peer stats");
1624        assert_eq!(stats.requests_sent, 1);
1625        assert_eq!(stats.successes, 1);
1626        assert_eq!(stats.cashu_paid_sat, 120);
1627        assert_eq!(stats.cashu_received_sat, 40);
1628        assert_eq!(stats.cashu_payment_receipts, 1);
1629        assert_eq!(stats.cashu_payment_defaults, 1);
1630    }
1631
1632    #[tokio::test]
1633    async fn test_should_refuse_requests_from_peer_after_payment_defaults() {
1634        let local_store = Arc::new(MemoryStore::new());
1635        let store = make_test_store_with_routing(
1636            local_store,
1637            "0",
1638            GenericStoreRoutingConfig {
1639                cashu_payment_default_block_threshold: 1,
1640                ..Default::default()
1641            },
1642        );
1643        store.record_cashu_payment_default_from_peer("peer-a").await;
1644
1645        let selector = store.peer_selector.read().await;
1646        assert!(store.should_refuse_requests_from_peer(&selector, "peer-a"));
1647        assert!(!store.should_refuse_requests_from_peer(&selector, "peer-b"));
1648    }
1649
1650    #[tokio::test]
1651    async fn test_take_valid_quote_consumes_once_and_rejects_expired_quotes() {
1652        let local_store = Arc::new(MemoryStore::new());
1653        let store = make_test_store(local_store, "0");
1654        let hash = hashtree_core::sha256(b"quote-test");
1655        let hash_key = hash_to_key(&hash);
1656
1657        {
1658            let mut issued = store.issued_quotes.write().await;
1659            issued.insert(
1660                ("peer-a".to_string(), hash_key.clone(), 11),
1661                IssuedQuote {
1662                    expires_at: Instant::now() + Duration::from_secs(1),
1663                    payment_sat: 5,
1664                    mint_url: Some("https://mint-a.example".to_string()),
1665                },
1666            );
1667            issued.insert(
1668                ("peer-a".to_string(), hash_key.clone(), 12),
1669                IssuedQuote {
1670                    expires_at: Instant::now() - Duration::from_millis(1),
1671                    payment_sat: 5,
1672                    mint_url: Some("https://mint-a.example".to_string()),
1673                },
1674            );
1675        }
1676
1677        assert!(store.take_valid_quote("peer-a", &hash_key, 11).await);
1678        assert!(!store.take_valid_quote("peer-a", &hash_key, 11).await);
1679        assert!(!store.take_valid_quote("peer-a", &hash_key, 12).await);
1680    }
1681
1682    async fn run_quote_with_pumps(
1683        requester: Arc<TestStore>,
1684        hash: Hash,
1685        payment_sat: u64,
1686        quote_ttl: Duration,
1687        peer_ids: Vec<String>,
1688        nodes: &[&TestNode],
1689    ) -> Option<NegotiatedQuote> {
1690        let task = tokio::spawn(async move {
1691            requester
1692                .request_quote_from_peers(&hash, payment_sat, quote_ttl, &peer_ids)
1693                .await
1694        });
1695        let started = Instant::now();
1696
1697        loop {
1698            if task.is_finished() {
1699                return task.await.expect("quote task join");
1700            }
1701            if started.elapsed() > Duration::from_secs(1) {
1702                task.abort();
1703                return None;
1704            }
1705            pump_test_network(nodes, 4).await;
1706        }
1707    }
1708
1709    #[tokio::test]
1710    async fn test_request_quote_from_peers_rejects_unaccepted_mint() {
1711        let _guard = mock_network_lock().lock().await;
1712        crate::mock::clear_channel_registry().await;
1713
1714        let relay = crate::mock::MockRelay::new();
1715        let requester = make_shared_test_node(
1716            relay.clone(),
1717            "requester-reject",
1718            GenericStoreRoutingConfig {
1719                cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1720                cashu_default_mint: Some("https://mint-a.example".to_string()),
1721                dispatch: RequestDispatchConfig {
1722                    initial_fanout: 1,
1723                    hedge_fanout: 1,
1724                    max_fanout: 1,
1725                    hedge_interval_ms: 5,
1726                },
1727                ..Default::default()
1728            },
1729        );
1730        let provider = make_shared_test_node(
1731            relay,
1732            "provider-reject",
1733            GenericStoreRoutingConfig {
1734                cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1735                ..Default::default()
1736            },
1737        );
1738        let nodes = [&requester, &provider];
1739
1740        requester.transport.connect(&[]).await.expect("connect");
1741        provider.transport.connect(&[]).await.expect("connect");
1742        requester.store.start().await.expect("start");
1743        provider.store.start().await.expect("start");
1744        pump_test_network(&nodes, 24).await;
1745
1746        let payload = b"quoted-data".to_vec();
1747        let hash = hashtree_core::sha256(&payload);
1748        provider.local_store.put(hash, payload).await.expect("put");
1749
1750        let quote = run_quote_with_pumps(
1751            requester.store.clone(),
1752            hash,
1753            9,
1754            Duration::from_millis(80),
1755            vec!["provider-reject".to_string()],
1756            &nodes,
1757        )
1758        .await;
1759        assert!(
1760            quote.is_none(),
1761            "expected quote to be rejected on mint mismatch"
1762        );
1763    }
1764
1765    #[tokio::test]
1766    async fn test_request_quote_from_peers_accepts_small_peer_suggested_mint_under_cap() {
1767        let _guard = mock_network_lock().lock().await;
1768        crate::mock::clear_channel_registry().await;
1769
1770        let relay = crate::mock::MockRelay::new();
1771        let requester = make_shared_test_node(
1772            relay.clone(),
1773            "requester-suggested",
1774            GenericStoreRoutingConfig {
1775                cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1776                cashu_default_mint: Some("https://mint-a.example".to_string()),
1777                cashu_peer_suggested_mint_base_cap_sat: 3,
1778                cashu_peer_suggested_mint_max_cap_sat: 3,
1779                dispatch: RequestDispatchConfig {
1780                    initial_fanout: 1,
1781                    hedge_fanout: 1,
1782                    max_fanout: 1,
1783                    hedge_interval_ms: 5,
1784                },
1785                ..Default::default()
1786            },
1787        );
1788        let provider = make_shared_test_node(
1789            relay,
1790            "provider-suggested",
1791            GenericStoreRoutingConfig {
1792                cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1793                cashu_default_mint: Some("https://mint-b.example".to_string()),
1794                ..Default::default()
1795            },
1796        );
1797        let nodes = [&requester, &provider];
1798
1799        requester.transport.connect(&[]).await.expect("connect");
1800        provider.transport.connect(&[]).await.expect("connect");
1801        requester.store.start().await.expect("start");
1802        provider.store.start().await.expect("start");
1803        pump_test_network(&nodes, 24).await;
1804
1805        let payload = b"quoted-data".to_vec();
1806        let hash = hashtree_core::sha256(&payload);
1807        provider.local_store.put(hash, payload).await.expect("put");
1808
1809        let quote = run_quote_with_pumps(
1810            requester.store.clone(),
1811            hash,
1812            3,
1813            Duration::from_millis(80),
1814            vec!["provider-suggested".to_string()],
1815            &nodes,
1816        )
1817        .await
1818        .expect("expected bounded peer-suggested mint quote");
1819        assert_eq!(quote.mint_url.as_deref(), Some("https://mint-b.example"));
1820    }
1821
1822    #[tokio::test]
1823    async fn test_request_quote_from_peers_scales_peer_suggested_mint_cap_with_reputation() {
1824        let _guard = mock_network_lock().lock().await;
1825        crate::mock::clear_channel_registry().await;
1826
1827        let relay = crate::mock::MockRelay::new();
1828        let requester = make_shared_test_node(
1829            relay.clone(),
1830            "requester-reputation",
1831            GenericStoreRoutingConfig {
1832                cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1833                cashu_default_mint: Some("https://mint-a.example".to_string()),
1834                cashu_peer_suggested_mint_base_cap_sat: 1,
1835                cashu_peer_suggested_mint_success_step_sat: 1,
1836                cashu_peer_suggested_mint_receipt_step_sat: 2,
1837                cashu_peer_suggested_mint_max_cap_sat: 5,
1838                dispatch: RequestDispatchConfig {
1839                    initial_fanout: 1,
1840                    hedge_fanout: 1,
1841                    max_fanout: 1,
1842                    hedge_interval_ms: 5,
1843                },
1844                ..Default::default()
1845            },
1846        );
1847        let provider = make_shared_test_node(
1848            relay,
1849            "provider-reputation",
1850            GenericStoreRoutingConfig {
1851                cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1852                cashu_default_mint: Some("https://mint-b.example".to_string()),
1853                ..Default::default()
1854            },
1855        );
1856        let nodes = [&requester, &provider];
1857
1858        requester.transport.connect(&[]).await.expect("connect");
1859        provider.transport.connect(&[]).await.expect("connect");
1860        requester.store.start().await.expect("start");
1861        provider.store.start().await.expect("start");
1862        pump_test_network(&nodes, 24).await;
1863
1864        let payload = b"quoted-data".to_vec();
1865        let hash = hashtree_core::sha256(&payload);
1866        provider.local_store.put(hash, payload).await.expect("put");
1867
1868        let quote = run_quote_with_pumps(
1869            requester.store.clone(),
1870            hash,
1871            4,
1872            Duration::from_millis(80),
1873            vec!["provider-reputation".to_string()],
1874            &nodes,
1875        )
1876        .await;
1877        assert!(
1878            quote.is_none(),
1879            "new peer should not get a 4 sat untrusted-mint quote"
1880        );
1881
1882        {
1883            let mut selector = requester.store.peer_selector.write().await;
1884            selector.add_peer("provider-reputation");
1885            selector.record_success("provider-reputation", 20, 1024);
1886            selector.record_cashu_receipt("provider-reputation", 2);
1887        }
1888
1889        let quote = run_quote_with_pumps(
1890            requester.store.clone(),
1891            hash,
1892            4,
1893            Duration::from_millis(80),
1894            vec!["provider-reputation".to_string()],
1895            &nodes,
1896        )
1897        .await
1898        .expect("reputable peer should get larger bounded quote");
1899        assert_eq!(quote.mint_url.as_deref(), Some("https://mint-b.example"));
1900
1901        requester
1902            .store
1903            .record_cashu_payment_default_from_peer("provider-reputation")
1904            .await;
1905        let quote = run_quote_with_pumps(
1906            requester.store.clone(),
1907            hash,
1908            4,
1909            Duration::from_millis(80),
1910            vec!["provider-reputation".to_string()],
1911            &nodes,
1912        )
1913        .await;
1914        assert!(
1915            quote.is_none(),
1916            "peer-suggested mint exposure should drop to zero after defaults exceed receipts"
1917        );
1918    }
1919
1920    #[tokio::test]
1921    async fn test_request_quote_from_peers_returns_matching_mint() {
1922        let _guard = mock_network_lock().lock().await;
1923        crate::mock::clear_channel_registry().await;
1924
1925        let relay = crate::mock::MockRelay::new();
1926        let requester = make_shared_test_node(
1927            relay.clone(),
1928            "requester-match",
1929            GenericStoreRoutingConfig {
1930                cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1931                cashu_default_mint: Some("https://mint-a.example".to_string()),
1932                dispatch: RequestDispatchConfig {
1933                    initial_fanout: 1,
1934                    hedge_fanout: 1,
1935                    max_fanout: 1,
1936                    hedge_interval_ms: 5,
1937                },
1938                ..Default::default()
1939            },
1940        );
1941        let provider = make_shared_test_node(
1942            relay,
1943            "provider-match",
1944            GenericStoreRoutingConfig {
1945                cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1946                ..Default::default()
1947            },
1948        );
1949        let nodes = [&requester, &provider];
1950
1951        requester.transport.connect(&[]).await.expect("connect");
1952        provider.transport.connect(&[]).await.expect("connect");
1953        requester.store.start().await.expect("start");
1954        provider.store.start().await.expect("start");
1955        pump_test_network(&nodes, 24).await;
1956
1957        let payload = b"quoted-data".to_vec();
1958        let hash = hashtree_core::sha256(&payload);
1959        provider.local_store.put(hash, payload).await.expect("put");
1960
1961        let quote = run_quote_with_pumps(
1962            requester.store.clone(),
1963            hash,
1964            9,
1965            Duration::from_millis(80),
1966            vec!["provider-match".to_string()],
1967            &nodes,
1968        )
1969        .await
1970        .expect("expected quote");
1971        assert_eq!(quote.mint_url.as_deref(), Some("https://mint-a.example"));
1972    }
1973
1974    #[tokio::test]
1975    async fn test_tit_for_tat_store_path_recovers_after_bad_peer_observation() {
1976        let tit_for_tat_successes = run_bad_peer_series(SelectionStrategy::TitForTat).await;
1977
1978        assert!(
1979            tit_for_tat_successes >= 5,
1980            "expected tit-for-tat path to recover after the first consistently bad peer observation (successes={tit_for_tat_successes})"
1981        );
1982    }
1983}
1984
1985/// Type alias for simulation store
1986pub type SimStore<S> =
1987    GenericStore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
1988
1989/// Type alias for production store (using real WebRTC)
1990pub type ProductionStore<S> = GenericStore<
1991    S,
1992    crate::nostr::NostrRelayTransport,
1993    crate::real_factory::RealPeerConnectionFactory,
1994>;