Skip to main content

hashtree_webrtc/
generic_store.rs

1//! Generic P2P store using abstract transports.
2//!
3//! This module provides a store wrapper that works with any local storage
4//! backend plus any signaling transport and peer-link factory. Both production
5//! (Nostr websockets + WebRTC) and simulation (mocks) use this same code.
6
7use async_trait::async_trait;
8use std::collections::hash_map::DefaultHasher;
9use std::collections::{HashMap, HashSet};
10use std::hash::{Hash as _, Hasher};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::{oneshot, RwLock};
14
15use hashtree_core::{Hash, Store, StoreError};
16
17use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
18use crate::protocol::{
19    create_quote_request, create_quote_response_available, create_quote_response_unavailable,
20    create_request, create_request_with_quote, create_response, encode_quote_request,
21    encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
22    DataMessage, DataQuoteRequest, DataQuoteResponse,
23};
24use crate::signaling::MeshRouter;
25use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
26use crate::types::{PeerHTLConfig, SignalingMessage, MAX_HTL};
27
28const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
29
30/// Pending request awaiting response
31struct PendingRequest {
32    response_tx: oneshot::Sender<Option<Vec<u8>>>,
33    started_at: Instant,
34    queried_peers: Vec<String>,
35}
36
37struct PendingQuoteRequest {
38    response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
39    preferred_mint_url: Option<String>,
40    offered_payment_sat: u64,
41}
42
43#[derive(Debug, Clone)]
44struct NegotiatedQuote {
45    peer_id: String,
46    quote_id: u64,
47    #[allow(dead_code)]
48    mint_url: Option<String>,
49}
50
51struct IssuedQuote {
52    expires_at: Instant,
53    #[allow(dead_code)]
54    payment_sat: u64,
55    #[allow(dead_code)]
56    mint_url: Option<String>,
57}
58
59/// Request dispatch strategy for peer queries.
60///
61/// `GenericStore` supports two practical retrieval modes:
62/// - Flood (`usize::MAX` fanout): maximize success/latency at bandwidth cost.
63/// - Staged hedging: probe a subset first, then expand.
64#[derive(Debug, Clone, Copy)]
65pub struct RequestDispatchConfig {
66    /// Number of peers queried immediately.
67    pub initial_fanout: usize,
68    /// Number of additional peers to query on each hedge step.
69    pub hedge_fanout: usize,
70    /// Total peers allowed for this request.
71    pub max_fanout: usize,
72    /// Delay between hedge waves (ms). `0` means send all waves immediately.
73    pub hedge_interval_ms: u64,
74}
75
76impl Default for RequestDispatchConfig {
77    fn default() -> Self {
78        Self {
79            initial_fanout: usize::MAX,
80            hedge_fanout: usize::MAX,
81            max_fanout: usize::MAX,
82            hedge_interval_ms: 0,
83        }
84    }
85}
86
87/// Normalize fanout config against current peer availability.
88pub fn normalize_dispatch_config(
89    dispatch: RequestDispatchConfig,
90    available_peers: usize,
91) -> RequestDispatchConfig {
92    let mut cfg = dispatch;
93    let cap = if cfg.max_fanout == 0 {
94        available_peers
95    } else {
96        cfg.max_fanout.min(available_peers)
97    };
98    cfg.max_fanout = cap;
99    cfg.initial_fanout = if cfg.initial_fanout == 0 {
100        1
101    } else {
102        cfg.initial_fanout.min(cap.max(1))
103    };
104    cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
105        1
106    } else {
107        cfg.hedge_fanout.min(cap.max(1))
108    };
109    cfg
110}
111
112/// Build wave sizes for staged hedged dispatch.
113pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
114    if peer_count == 0 {
115        return Vec::new();
116    }
117    let cap = dispatch.max_fanout.min(peer_count);
118    if cap == 0 {
119        return Vec::new();
120    }
121
122    let mut plan = Vec::new();
123    let mut sent = 0usize;
124    let first = dispatch.initial_fanout.min(cap).max(1);
125    plan.push(first);
126    sent += first;
127
128    while sent < cap {
129        let next = dispatch.hedge_fanout.min(cap - sent).max(1);
130        plan.push(next);
131        sent += next;
132    }
133    plan
134}
135
136/// Keep selector membership aligned with currently connected peer IDs.
137pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
138    let mut selector = selector.write().await;
139    let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
140    let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
141    for peer_id in known {
142        if !current.contains(peer_id.as_str()) {
143            selector.remove_peer(&peer_id);
144        }
145    }
146    for peer_id in current_peer_ids {
147        selector.add_peer(peer_id.clone());
148    }
149}
150
151/// Response behavior profile for simulation/game-theory actors.
152///
153/// Defaults to honest behavior (always respond correctly, no extra delay).
154#[derive(Debug, Clone, Copy)]
155pub struct ResponseBehaviorConfig {
156    /// Probability that a node drops a response even when it has data.
157    pub drop_response_prob: f64,
158    /// Probability that a node responds with corrupted payload.
159    pub corrupt_response_prob: f64,
160    /// Optional response delay to model slow/incompetent peers.
161    pub extra_delay_ms: u64,
162}
163
164impl Default for ResponseBehaviorConfig {
165    fn default() -> Self {
166        Self {
167            drop_response_prob: 0.0,
168            corrupt_response_prob: 0.0,
169            extra_delay_ms: 0,
170        }
171    }
172}
173
174impl ResponseBehaviorConfig {
175    fn normalized(self) -> Self {
176        Self {
177            drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
178            corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
179            extra_delay_ms: self.extra_delay_ms,
180        }
181    }
182}
183
184/// Routing policy for request ordering + dispatch fanout.
185#[derive(Debug, Clone)]
186pub struct GenericStoreRoutingConfig {
187    pub selection_strategy: SelectionStrategy,
188    pub fairness_enabled: bool,
189    /// Blend weight for payment-priority ranking in selector (`0.0` disables).
190    pub cashu_payment_weight: f64,
191    /// Refuse serving peers that have reached this many unpaid post-delivery settlements.
192    /// `0` disables refusal and only keeps metadata/downranking.
193    pub cashu_payment_default_block_threshold: u64,
194    /// Cashu mint URLs this node is willing to use for settlement.
195    pub cashu_accepted_mints: Vec<String>,
196    /// Preferred Cashu mint URL when initiating paid retrieval.
197    pub cashu_default_mint: Option<String>,
198    /// Baseline cap for accepting a peer-suggested mint outside the trusted set.
199    pub cashu_peer_suggested_mint_base_cap_sat: u64,
200    /// Additional sats allowed per successful delivery from that peer.
201    pub cashu_peer_suggested_mint_success_step_sat: u64,
202    /// Additional sats allowed per successful post-delivery payment received from that peer.
203    pub cashu_peer_suggested_mint_receipt_step_sat: u64,
204    /// Hard upper bound for any single peer-suggested mint quote we accept.
205    pub cashu_peer_suggested_mint_max_cap_sat: u64,
206    pub dispatch: RequestDispatchConfig,
207    pub response_behavior: ResponseBehaviorConfig,
208}
209
210impl Default for GenericStoreRoutingConfig {
211    fn default() -> Self {
212        Self {
213            selection_strategy: SelectionStrategy::Weighted,
214            fairness_enabled: true,
215            cashu_payment_weight: 0.0,
216            cashu_payment_default_block_threshold: 0,
217            cashu_accepted_mints: Vec::new(),
218            cashu_default_mint: None,
219            cashu_peer_suggested_mint_base_cap_sat: 0,
220            cashu_peer_suggested_mint_success_step_sat: 0,
221            cashu_peer_suggested_mint_receipt_step_sat: 0,
222            cashu_peer_suggested_mint_max_cap_sat: 0,
223            dispatch: RequestDispatchConfig::default(),
224            response_behavior: ResponseBehaviorConfig::default(),
225        }
226    }
227}
228
229/// Generic mesh store that works with any storage backend and transport
230/// implementation.
231///
232/// This is the shared code between production and simulation.
233/// - Production: `GenericStore<LmdbStore, NostrSignalingTransport, WebRtcPeerLinkFactory>`
234/// - Simulation: `GenericStore<MemoryStore, MockSignalingTransport, MockConnectionFactory>`
235pub struct GenericStore<S, R, F>
236where
237    S: Store + Send + Sync + 'static,
238    R: SignalingTransport + Send + Sync + 'static,
239    F: PeerLinkFactory + Send + Sync + 'static,
240{
241    /// Local backing store
242    local_store: Arc<S>,
243    /// Mesh router (handles peer discovery and connection)
244    signaling: Arc<MeshRouter<R, F>>,
245    /// Per-peer HTL config
246    htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
247    /// Pending requests we sent
248    pending_requests: RwLock<HashMap<String, PendingRequest>>,
249    /// Pending quote negotiations keyed by requested hash.
250    pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
251    /// Quotes we issued to peers and will accept exactly once until expiry.
252    issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
253    /// Monotonic quote identifier generator.
254    next_quote_id: RwLock<u64>,
255    /// Adaptive selector for peer ordering.
256    peer_selector: RwLock<PeerSelector>,
257    /// Routing/dispatch configuration.
258    routing: GenericStoreRoutingConfig,
259    /// Request timeout
260    request_timeout: Duration,
261    /// Debug mode
262    debug: bool,
263    /// Running flag
264    running: RwLock<bool>,
265}
266
267impl<S, R, F> GenericStore<S, R, F>
268where
269    S: Store + Send + Sync + 'static,
270    R: SignalingTransport + Send + Sync + 'static,
271    F: PeerLinkFactory + Send + Sync + 'static,
272{
273    /// Create a new generic store
274    pub fn new(
275        local_store: Arc<S>,
276        signaling: Arc<MeshRouter<R, F>>,
277        request_timeout: Duration,
278        debug: bool,
279    ) -> Self {
280        Self::new_with_routing(
281            local_store,
282            signaling,
283            request_timeout,
284            debug,
285            Default::default(),
286        )
287    }
288
289    /// Create a new generic store with explicit routing configuration.
290    pub fn new_with_routing(
291        local_store: Arc<S>,
292        signaling: Arc<MeshRouter<R, F>>,
293        request_timeout: Duration,
294        debug: bool,
295        routing: GenericStoreRoutingConfig,
296    ) -> Self {
297        let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
298        selector.set_fairness(routing.fairness_enabled);
299        selector.set_cashu_payment_weight(routing.cashu_payment_weight);
300        Self {
301            local_store,
302            signaling,
303            htl_configs: RwLock::new(HashMap::new()),
304            pending_requests: RwLock::new(HashMap::new()),
305            pending_quotes: RwLock::new(HashMap::new()),
306            issued_quotes: RwLock::new(HashMap::new()),
307            next_quote_id: RwLock::new(1),
308            peer_selector: RwLock::new(selector),
309            routing,
310            request_timeout,
311            debug,
312            running: RwLock::new(false),
313        }
314    }
315
316    /// Start the store (begin listening for messages)
317    pub async fn start(&self) -> Result<(), TransportError> {
318        *self.running.write().await = true;
319
320        // Send initial hello
321        self.signaling.send_hello(vec![]).await?;
322
323        Ok(())
324    }
325
326    /// Stop the store
327    pub async fn stop(&self) {
328        *self.running.write().await = false;
329    }
330
331    /// Process incoming signaling message
332    pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
333        // When a new peer connects, initialize their HTL config
334        let peer_id = msg.peer_id().to_string();
335        {
336            let mut configs = self.htl_configs.write().await;
337            if !configs.contains_key(&peer_id) {
338                configs.insert(peer_id.clone(), PeerHTLConfig::random());
339            }
340        }
341        self.peer_selector.write().await.add_peer(peer_id);
342
343        self.signaling.handle_message(msg).await
344    }
345
346    /// Get signaling manager reference
347    pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
348        &self.signaling
349    }
350
351    fn response_behavior(&self) -> ResponseBehaviorConfig {
352        self.routing.response_behavior.normalized()
353    }
354
355    fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
356        let mut hasher = DefaultHasher::new();
357        peer_id.hash(&mut hasher);
358        hash.hash(&mut hasher);
359        salt.hash(&mut hasher);
360        let v = hasher.finish();
361        (v as f64) / (u64::MAX as f64)
362    }
363
364    fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
365        Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
366    }
367
368    fn peer_metadata_pointer_slot_hash() -> Hash {
369        hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
370    }
371
372    fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
373        let bytes = hex::decode(hash_hex)
374            .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
375        if bytes.len() != 32 {
376            return Err(StoreError::Other(format!(
377                "Invalid hash length {}, expected 32 bytes",
378                bytes.len()
379            )));
380        }
381        let mut hash = [0u8; 32];
382        hash.copy_from_slice(&bytes);
383        Ok(hash)
384    }
385
386    fn should_drop_response(&self, hash: &Hash) -> bool {
387        let p = self.response_behavior().drop_response_prob;
388        if p <= 0.0 {
389            return false;
390        }
391        self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
392    }
393
394    fn should_corrupt_response(&self, hash: &Hash) -> bool {
395        let p = self.response_behavior().corrupt_response_prob;
396        if p <= 0.0 {
397            return false;
398        }
399        self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
400    }
401
402    async fn ordered_connected_peers(&self) -> Vec<String> {
403        let current_peer_ids = self.signaling.peer_ids().await;
404        if current_peer_ids.is_empty() {
405            return Vec::new();
406        }
407
408        sync_selector_peers(&self.peer_selector, &current_peer_ids).await;
409        let current_set: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
410        let mut ordered_peer_ids = self.peer_selector.write().await.select_peers();
411        ordered_peer_ids.retain(|peer_id| current_set.contains(peer_id.as_str()));
412        if ordered_peer_ids.is_empty() {
413            let mut fallback = current_peer_ids;
414            fallback.sort();
415            return fallback;
416        }
417        ordered_peer_ids
418    }
419
420    fn requested_quote_mint(&self) -> Option<&str> {
421        if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
422            if self.routing.cashu_accepted_mints.is_empty()
423                || self
424                    .routing
425                    .cashu_accepted_mints
426                    .iter()
427                    .any(|mint| mint == default_mint)
428            {
429                return Some(default_mint);
430            }
431        }
432
433        self.routing
434            .cashu_accepted_mints
435            .first()
436            .map(String::as_str)
437    }
438
439    fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
440        if let Some(requested_mint) = requested_mint {
441            if self.accepts_quote_mint(Some(requested_mint)) {
442                return Some(requested_mint.to_string());
443            }
444        }
445        if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
446            return Some(default_mint.clone());
447        }
448        if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
449            return Some(first_mint.clone());
450        }
451        requested_mint.map(str::to_string)
452    }
453
454    fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
455        if self.routing.cashu_accepted_mints.is_empty() {
456            return true;
457        }
458
459        let Some(mint_url) = mint_url else {
460            return false;
461        };
462        self.routing
463            .cashu_accepted_mints
464            .iter()
465            .any(|mint| mint == mint_url)
466    }
467
468    fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
469        let Some(mint_url) = mint_url else {
470            return self.routing.cashu_default_mint.is_none()
471                && self.routing.cashu_accepted_mints.is_empty();
472        };
473        self.routing.cashu_default_mint.as_deref() == Some(mint_url)
474            || self
475                .routing
476                .cashu_accepted_mints
477                .iter()
478                .any(|mint| mint == mint_url)
479    }
480
481    async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
482        let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
483        if base == 0 {
484            return 0;
485        }
486
487        let selector = self.peer_selector.read().await;
488        let Some(stats) = selector.get_stats(peer_id) else {
489            let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
490            return if max_cap > 0 { base.min(max_cap) } else { base };
491        };
492
493        if stats.cashu_payment_defaults > 0
494            && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
495        {
496            return 0;
497        }
498
499        let success_bonus = stats
500            .successes
501            .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
502        let receipt_bonus = stats
503            .cashu_payment_receipts
504            .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
505        let mut cap = base
506            .saturating_add(success_bonus)
507            .saturating_add(receipt_bonus);
508        let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
509        if max_cap > 0 {
510            cap = cap.min(max_cap);
511        }
512        cap
513    }
514
515    async fn should_accept_quote_response(
516        &self,
517        from_peer: &str,
518        preferred_mint_url: Option<&str>,
519        offered_payment_sat: u64,
520        res: &DataQuoteResponse,
521    ) -> bool {
522        let Some(payment_sat) = res.p else {
523            return false;
524        };
525        if payment_sat > offered_payment_sat {
526            return false;
527        }
528
529        let response_mint = res.m.as_deref();
530        if response_mint == preferred_mint_url {
531            return true;
532        }
533        if self.trusts_quote_mint(response_mint) {
534            return true;
535        }
536        if response_mint.is_none() {
537            return false;
538        }
539
540        payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
541    }
542
543    async fn issue_quote(
544        &self,
545        peer_id: &str,
546        hash_key: &str,
547        payment_sat: u64,
548        ttl_ms: u32,
549        mint_url: Option<&str>,
550    ) -> u64 {
551        let quote_id = {
552            let mut next = self.next_quote_id.write().await;
553            let quote_id = *next;
554            *next = next.saturating_add(1);
555            quote_id
556        };
557
558        let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
559        self.issued_quotes.write().await.insert(
560            (peer_id.to_string(), hash_key.to_string(), quote_id),
561            IssuedQuote {
562                expires_at,
563                payment_sat,
564                mint_url: mint_url.map(str::to_string),
565            },
566        );
567        quote_id
568    }
569
570    async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
571        let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
572        let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
573            return false;
574        };
575        quote.expires_at > Instant::now()
576    }
577
578    async fn send_request_to_peer(
579        &self,
580        peer_id: &str,
581        hash: &Hash,
582        quote_id: Option<u64>,
583    ) -> bool {
584        let channel = match self.signaling.get_channel(peer_id).await {
585            Some(c) => c,
586            None => return false,
587        };
588
589        let htl_config = {
590            let configs = self.htl_configs.read().await;
591            configs
592                .get(peer_id)
593                .cloned()
594                .unwrap_or_else(PeerHTLConfig::random)
595        };
596
597        let send_htl = htl_config.decrement(MAX_HTL);
598        let req = match quote_id {
599            Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
600            None => create_request(hash, send_htl),
601        };
602        let request_bytes = encode_request(&req);
603
604        {
605            let mut selector = self.peer_selector.write().await;
606            selector.record_request(peer_id, request_bytes.len() as u64);
607        }
608
609        match channel.send(request_bytes).await {
610            Ok(()) => true,
611            Err(_) => {
612                self.peer_selector.write().await.record_failure(peer_id);
613                false
614            }
615        }
616    }
617
618    async fn send_quote_request_to_peer(
619        &self,
620        peer_id: &str,
621        hash: &Hash,
622        payment_sat: u64,
623        ttl_ms: u32,
624        mint_url: Option<&str>,
625    ) -> bool {
626        let channel = match self.signaling.get_channel(peer_id).await {
627            Some(c) => c,
628            None => return false,
629        };
630
631        let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
632        let request_bytes = encode_quote_request(&req);
633
634        match channel.send(request_bytes).await {
635            Ok(()) => true,
636            Err(_) => false,
637        }
638    }
639
640    /// Get peer count
641    pub async fn peer_count(&self) -> usize {
642        self.signaling.peer_count().await
643    }
644
645    /// Check if we need more peers
646    pub async fn needs_peers(&self) -> bool {
647        self.signaling.needs_peers().await
648    }
649
650    /// Re-broadcast hello to refresh discovery as topology changes.
651    pub async fn send_hello(&self) -> Result<(), TransportError> {
652        self.signaling.send_hello(vec![]).await
653    }
654
655    /// Apply an out-of-band payment credit to a peer's routing priority.
656    pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
657        self.peer_selector
658            .write()
659            .await
660            .record_cashu_payment(peer_id, amount_sat);
661    }
662
663    /// Record a post-delivery payment we received from a peer.
664    pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
665        self.peer_selector
666            .write()
667            .await
668            .record_cashu_receipt(peer_id, amount_sat);
669    }
670
671    /// Record that a peer failed to pay after we delivered successfully.
672    pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
673        self.peer_selector
674            .write()
675            .await
676            .record_cashu_payment_default(peer_id);
677    }
678
679    fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
680        selector.is_peer_blocked_for_payment_defaults(
681            peer_id,
682            self.routing.cashu_payment_default_block_threshold,
683        )
684    }
685
686    /// Export live peer metadata for inspection/debugging.
687    pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
688        self.peer_selector
689            .read()
690            .await
691            .export_peer_metadata_snapshot()
692    }
693
694    /// Snapshot current peer metadata and persist it into `local_store`.
695    ///
696    /// Uses content-addressed storage for the snapshot body and a reserved
697    /// mutable pointer slot for the "latest snapshot hash".
698    pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
699        let snapshot = self
700            .peer_selector
701            .read()
702            .await
703            .export_peer_metadata_snapshot();
704        let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
705            StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
706        })?;
707        let snapshot_hash = hashtree_core::sha256(&bytes);
708        let _ = self.local_store.put(snapshot_hash, bytes).await?;
709
710        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
711        let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
712        let _ = self.local_store.delete(&pointer_slot).await?;
713        let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
714
715        Ok(snapshot_hash)
716    }
717
718    /// Load persisted peer metadata from `local_store` if available.
719    pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
720        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
721        let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
722            return Ok(false);
723        };
724        let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
725            StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
726        })?;
727        let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
728
729        let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
730            return Ok(false);
731        };
732        let snapshot: PeerMetadataSnapshot =
733            serde_json::from_slice(&snapshot_bytes).map_err(|e| {
734                StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
735            })?;
736        self.peer_selector
737            .write()
738            .await
739            .import_peer_metadata_snapshot(&snapshot);
740        Ok(true)
741    }
742
743    /// Request data from peers after negotiating a paid quote.
744    ///
745    /// If quote negotiation fails or the quoted peer does not deliver, the store
746    /// falls back to the normal unpaid retrieval path to preserve liveness.
747    pub async fn get_with_quote(
748        &self,
749        hash: &Hash,
750        payment_sat: u64,
751        quote_ttl: Duration,
752    ) -> Result<Option<Vec<u8>>, StoreError> {
753        if let Some(data) = self.local_store.get(hash).await? {
754            return Ok(Some(data));
755        }
756        Ok(self
757            .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
758            .await)
759    }
760
761    async fn request_from_peers_with_quote(
762        &self,
763        hash: &Hash,
764        payment_sat: u64,
765        quote_ttl: Duration,
766    ) -> Option<Vec<u8>> {
767        let ordered_peer_ids = self.ordered_connected_peers().await;
768        if ordered_peer_ids.is_empty() {
769            return None;
770        }
771
772        if let Some(quote) = self
773            .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
774            .await
775        {
776            if let Some(data) = self
777                .request_from_single_peer(hash, &quote.peer_id, Some(quote.quote_id))
778                .await
779            {
780                return Some(data);
781            }
782        }
783
784        self.request_from_ordered_peers(hash, &ordered_peer_ids)
785            .await
786    }
787
788    async fn request_quote_from_peers(
789        &self,
790        hash: &Hash,
791        payment_sat: u64,
792        quote_ttl: Duration,
793        ordered_peer_ids: &[String],
794    ) -> Option<NegotiatedQuote> {
795        if ordered_peer_ids.is_empty() {
796            return None;
797        }
798        let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
799        if ttl_ms == 0 {
800            return None;
801        }
802        let requested_mint = self.requested_quote_mint().map(str::to_string);
803
804        let dispatch = normalize_dispatch_config(self.routing.dispatch, ordered_peer_ids.len());
805        let wave_plan = build_hedged_wave_plan(ordered_peer_ids.len(), dispatch);
806        if wave_plan.is_empty() {
807            return None;
808        }
809
810        let hash_key = hash_to_key(hash);
811        let (tx, rx) = oneshot::channel();
812        self.pending_quotes.write().await.insert(
813            hash_key.clone(),
814            PendingQuoteRequest {
815                response_tx: tx,
816                preferred_mint_url: requested_mint.clone(),
817                offered_payment_sat: payment_sat,
818            },
819        );
820
821        let mut sent_total = 0usize;
822        let mut next_peer_idx = 0usize;
823        let mut rx = rx;
824        let deadline = Instant::now() + self.request_timeout;
825
826        for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
827            let from = next_peer_idx;
828            let to = (next_peer_idx + wave_size).min(ordered_peer_ids.len());
829            for peer_id in &ordered_peer_ids[from..to] {
830                if self
831                    .send_quote_request_to_peer(
832                        peer_id,
833                        hash,
834                        payment_sat,
835                        ttl_ms,
836                        requested_mint.as_deref(),
837                    )
838                    .await
839                {
840                    sent_total += 1;
841                }
842            }
843            next_peer_idx = to;
844
845            if sent_total == 0 {
846                if next_peer_idx >= ordered_peer_ids.len() {
847                    break;
848                }
849                continue;
850            }
851
852            let now = Instant::now();
853            if now >= deadline {
854                break;
855            }
856            let remaining = deadline.saturating_duration_since(now);
857            let is_last_wave =
858                wave_idx + 1 == wave_plan.len() || next_peer_idx >= ordered_peer_ids.len();
859            let wait = if is_last_wave {
860                remaining
861            } else if dispatch.hedge_interval_ms == 0 {
862                Duration::ZERO
863            } else {
864                Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
865            };
866
867            if wait.is_zero() {
868                continue;
869            }
870
871            match tokio::time::timeout(wait, &mut rx).await {
872                Ok(Ok(Some(quote))) => {
873                    let _ = self.pending_quotes.write().await.remove(&hash_key);
874                    return Some(quote);
875                }
876                Ok(Ok(None)) => break,
877                Ok(Err(_)) => break,
878                Err(_) => {}
879            }
880        }
881
882        let _ = self.pending_quotes.write().await.remove(&hash_key);
883        None
884    }
885
886    async fn request_from_single_peer(
887        &self,
888        hash: &Hash,
889        peer_id: &str,
890        quote_id: Option<u64>,
891    ) -> Option<Vec<u8>> {
892        let hash_key = hash_to_key(hash);
893        let (tx, rx) = oneshot::channel();
894        self.pending_requests.write().await.insert(
895            hash_key.clone(),
896            PendingRequest {
897                response_tx: tx,
898                started_at: Instant::now(),
899                queried_peers: vec![peer_id.to_string()],
900            },
901        );
902
903        let mut rx = rx;
904        if !self.send_request_to_peer(peer_id, hash, quote_id).await {
905            let _ = self.pending_requests.write().await.remove(&hash_key);
906            return None;
907        }
908
909        if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
910            if hashtree_core::sha256(&data) == *hash {
911                let _ = self.local_store.put(*hash, data.clone()).await;
912                return Some(data);
913            }
914        }
915
916        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
917            for peer_id in pending.queried_peers {
918                self.peer_selector.write().await.record_timeout(&peer_id);
919            }
920        }
921        None
922    }
923
924    async fn request_from_ordered_peers(
925        &self,
926        hash: &Hash,
927        ordered_peer_ids: &[String],
928    ) -> Option<Vec<u8>> {
929        let dispatch = normalize_dispatch_config(self.routing.dispatch, ordered_peer_ids.len());
930        let wave_plan = build_hedged_wave_plan(ordered_peer_ids.len(), dispatch);
931        if wave_plan.is_empty() {
932            return None;
933        }
934
935        let hash_key = hash_to_key(hash);
936        let (tx, rx) = oneshot::channel();
937        self.pending_requests.write().await.insert(
938            hash_key.clone(),
939            PendingRequest {
940                response_tx: tx,
941                started_at: Instant::now(),
942                queried_peers: Vec::new(),
943            },
944        );
945
946        let mut sent_total = 0usize;
947        let mut next_peer_idx = 0usize;
948        let mut rx = rx;
949        let deadline = Instant::now() + self.request_timeout;
950
951        for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
952            let from = next_peer_idx;
953            let to = (next_peer_idx + wave_size).min(ordered_peer_ids.len());
954            for peer_id in &ordered_peer_ids[from..to] {
955                if self.send_request_to_peer(peer_id, hash, None).await {
956                    sent_total += 1;
957                    if let Some(pending) = self.pending_requests.write().await.get_mut(&hash_key) {
958                        pending.queried_peers.push(peer_id.clone());
959                    }
960                }
961            }
962            next_peer_idx = to;
963
964            if sent_total == 0 {
965                if next_peer_idx >= ordered_peer_ids.len() {
966                    break;
967                }
968                continue;
969            }
970
971            let now = Instant::now();
972            if now >= deadline {
973                break;
974            }
975            let remaining = deadline.saturating_duration_since(now);
976            let is_last_wave =
977                wave_idx + 1 == wave_plan.len() || next_peer_idx >= ordered_peer_ids.len();
978            let wait = if is_last_wave {
979                remaining
980            } else if dispatch.hedge_interval_ms == 0 {
981                Duration::ZERO
982            } else {
983                Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
984            };
985
986            if wait.is_zero() {
987                continue;
988            }
989
990            match tokio::time::timeout(wait, &mut rx).await {
991                Ok(Ok(Some(data))) => {
992                    if hashtree_core::sha256(&data) == *hash {
993                        let _ = self.local_store.put(*hash, data.clone()).await;
994                        return Some(data);
995                    }
996                }
997                Ok(Ok(None)) => break,
998                Ok(Err(_)) => break,
999                Err(_) => {
1000                    // Timed wait window expired; send next hedge wave if any.
1001                }
1002            }
1003        }
1004
1005        if sent_total == 0 {
1006            let _ = self.pending_requests.write().await.remove(&hash_key);
1007            return None;
1008        }
1009
1010        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1011            for peer_id in pending.queried_peers {
1012                self.peer_selector.write().await.record_timeout(&peer_id);
1013            }
1014        }
1015        None
1016    }
1017
1018    /// Request data from peers
1019    async fn request_from_peers(&self, hash: &Hash) -> Option<Vec<u8>> {
1020        let ordered_peer_ids = self.ordered_connected_peers().await;
1021        if ordered_peer_ids.is_empty() {
1022            return None;
1023        }
1024        self.request_from_ordered_peers(hash, &ordered_peer_ids)
1025            .await
1026    }
1027
1028    async fn complete_pending_response(&self, from_peer: &str, hash_key: String, payload: Vec<u8>) {
1029        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1030            let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
1031            self.peer_selector.write().await.record_success(
1032                from_peer,
1033                rtt_ms,
1034                payload.len() as u64,
1035            );
1036            let _ = pending.response_tx.send(Some(payload));
1037        }
1038    }
1039
1040    async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
1041        if !res.a {
1042            return;
1043        }
1044
1045        let Some(quote_id) = res.q else {
1046            return;
1047        };
1048
1049        let hash_key = hash_to_key(&res.h);
1050        let (preferred_mint_url, offered_payment_sat) = {
1051            let pending_quotes = self.pending_quotes.read().await;
1052            let Some(pending) = pending_quotes.get(&hash_key) else {
1053                return;
1054            };
1055            (
1056                pending.preferred_mint_url.clone(),
1057                pending.offered_payment_sat,
1058            )
1059        };
1060        if !self
1061            .should_accept_quote_response(
1062                from_peer,
1063                preferred_mint_url.as_deref(),
1064                offered_payment_sat,
1065                &res,
1066            )
1067            .await
1068        {
1069            return;
1070        }
1071        let mut pending_quotes = self.pending_quotes.write().await;
1072        if let Some(pending) = pending_quotes.remove(&hash_key) {
1073            let _ = pending.response_tx.send(Some(NegotiatedQuote {
1074                peer_id: from_peer.to_string(),
1075                quote_id,
1076                mint_url: res.m,
1077            }));
1078        }
1079    }
1080
1081    async fn handle_response_message(&self, from_peer: &str, res: crate::protocol::DataResponse) {
1082        let hash_key = hash_to_key(&res.h);
1083        let hash = match crate::protocol::bytes_to_hash(&res.h) {
1084            Some(h) => h,
1085            None => return,
1086        };
1087
1088        // Ignore malformed/corrupt payload and keep waiting for a valid response.
1089        if hashtree_core::sha256(&res.d) != hash {
1090            self.peer_selector.write().await.record_failure(from_peer);
1091            if self.debug {
1092                println!("[GenericStore] Ignoring invalid response payload for {hash_key}");
1093            }
1094            return;
1095        }
1096
1097        self.complete_pending_response(from_peer, hash_key, res.d)
1098            .await;
1099    }
1100
1101    async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
1102        let hash = match crate::protocol::bytes_to_hash(&req.h) {
1103            Some(h) => h,
1104            None => return,
1105        };
1106        let hash_key = hash_to_key(&hash);
1107
1108        {
1109            let selector = self.peer_selector.read().await;
1110            if self.should_refuse_requests_from_peer(&selector, from_peer) {
1111                if self.debug {
1112                    println!(
1113                        "[GenericStore] Refusing quote request from delinquent peer {}",
1114                        from_peer
1115                    );
1116                }
1117                return;
1118            }
1119        }
1120
1121        let chosen_mint = self.choose_quote_mint(req.m.as_deref());
1122        let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
1123            && !self.should_drop_response(&hash)
1124            && !self.should_corrupt_response(&hash);
1125
1126        let res = if can_serve {
1127            let quote_id = self
1128                .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
1129                .await;
1130            create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
1131        } else {
1132            create_quote_response_unavailable(&hash)
1133        };
1134        let response_bytes = encode_quote_response(&res);
1135        if let Some(channel) = self.signaling.get_channel(from_peer).await {
1136            let _ = channel.send(response_bytes).await;
1137        }
1138    }
1139
1140    async fn handle_request_message(&self, from_peer: &str, req: crate::protocol::DataRequest) {
1141        let hash = match crate::protocol::bytes_to_hash(&req.h) {
1142            Some(h) => h,
1143            None => return,
1144        };
1145        let hash_key = hash_to_key(&hash);
1146
1147        {
1148            let selector = self.peer_selector.read().await;
1149            if self.should_refuse_requests_from_peer(&selector, from_peer) {
1150                if self.debug {
1151                    println!(
1152                        "[GenericStore] Refusing request from delinquent peer {}",
1153                        from_peer
1154                    );
1155                }
1156                return;
1157            }
1158        }
1159
1160        if let Some(quote_id) = req.q {
1161            if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
1162                if self.debug {
1163                    println!(
1164                        "[GenericStore] Refusing request with invalid or expired quote {} from {}",
1165                        quote_id, from_peer
1166                    );
1167                }
1168                return;
1169            }
1170        }
1171
1172        // Check local store
1173        if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
1174            if self.should_drop_response(&hash) {
1175                if self.debug {
1176                    println!(
1177                        "[GenericStore] Dropping response for {} due to actor profile",
1178                        hash_to_key(&hash)
1179                    );
1180                }
1181                return;
1182            }
1183
1184            let behavior = self.response_behavior();
1185            if behavior.extra_delay_ms > 0 {
1186                tokio::time::sleep(Duration::from_millis(behavior.extra_delay_ms)).await;
1187            }
1188
1189            if self.should_corrupt_response(&hash) {
1190                if data.is_empty() {
1191                    data.push(0x80);
1192                } else {
1193                    data[0] ^= 0x80;
1194                }
1195            }
1196
1197            // Send response
1198            let res = create_response(&hash, data);
1199            let response_bytes = encode_response(&res);
1200            if let Some(channel) = self.signaling.get_channel(from_peer).await {
1201                let _ = channel.send(response_bytes).await;
1202            }
1203        }
1204        // For now, don't forward - keep it simple
1205    }
1206
1207    /// Handle incoming data message
1208    pub async fn handle_data_message(&self, from_peer: &str, data: &[u8]) {
1209        let parsed = match parse_message(data) {
1210            Some(m) => m,
1211            None => return,
1212        };
1213
1214        match parsed {
1215            DataMessage::Request(req) => {
1216                self.handle_request_message(from_peer, req).await;
1217            }
1218            DataMessage::Response(res) => {
1219                self.handle_response_message(from_peer, res).await;
1220            }
1221            DataMessage::QuoteRequest(req) => {
1222                self.handle_quote_request_message(from_peer, req).await;
1223            }
1224            DataMessage::QuoteResponse(res) => {
1225                self.handle_quote_response_message(from_peer, res).await;
1226            }
1227        }
1228    }
1229}
1230
1231#[async_trait]
1232impl<S, R, F> Store for GenericStore<S, R, F>
1233where
1234    S: Store + Send + Sync + 'static,
1235    R: SignalingTransport + Send + Sync + 'static,
1236    F: PeerLinkFactory + Send + Sync + 'static,
1237{
1238    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
1239        self.local_store.put(hash, data).await
1240    }
1241
1242    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
1243        // Try local first
1244        if let Some(data) = self.local_store.get(hash).await? {
1245            return Ok(Some(data));
1246        }
1247
1248        // Try peers
1249        Ok(self.request_from_peers(hash).await)
1250    }
1251
1252    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
1253        self.local_store.has(hash).await
1254    }
1255
1256    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
1257        self.local_store.delete(hash).await
1258    }
1259}
1260
1261#[cfg(test)]
1262mod tests {
1263    use super::*;
1264    use hashtree_core::MemoryStore;
1265    use std::sync::Arc;
1266    use std::sync::OnceLock;
1267    use std::time::Duration;
1268
1269    type TestStore = GenericStore<
1270        MemoryStore,
1271        crate::mock::MockRelayTransport,
1272        crate::mock::MockConnectionFactory,
1273    >;
1274
1275    struct TestNode {
1276        store: Arc<TestStore>,
1277        local_store: Arc<MemoryStore>,
1278        transport: Arc<crate::mock::MockRelayTransport>,
1279    }
1280
1281    fn mock_network_lock() -> &'static tokio::sync::Mutex<()> {
1282        static LOCK: OnceLock<tokio::sync::Mutex<()>> = OnceLock::new();
1283        LOCK.get_or_init(|| tokio::sync::Mutex::new(()))
1284    }
1285
1286    fn make_test_store(local_store: Arc<MemoryStore>, node_id: &str) -> TestStore {
1287        make_test_store_with_routing(local_store, node_id, GenericStoreRoutingConfig::default())
1288    }
1289
1290    fn make_test_store_with_routing(
1291        local_store: Arc<MemoryStore>,
1292        node_id: &str,
1293        routing: GenericStoreRoutingConfig,
1294    ) -> TestStore {
1295        let relay = crate::mock::MockRelay::new();
1296        let transport = Arc::new(relay.create_transport(node_id.to_string(), node_id.to_string()));
1297        let conn_factory = Arc::new(crate::mock::MockConnectionFactory::new(
1298            node_id.to_string(),
1299            0,
1300        ));
1301        let signaling = Arc::new(crate::signaling::MeshRouter::new(
1302            node_id.to_string(),
1303            node_id.to_string(),
1304            transport,
1305            conn_factory,
1306            crate::types::PoolSettings::default(),
1307            false,
1308        ));
1309
1310        TestStore::new_with_routing(
1311            local_store,
1312            signaling,
1313            Duration::from_millis(200),
1314            false,
1315            routing,
1316        )
1317    }
1318
1319    fn make_shared_test_node(
1320        relay: Arc<crate::mock::MockRelay>,
1321        node_id: &str,
1322        routing: GenericStoreRoutingConfig,
1323    ) -> TestNode {
1324        let transport = Arc::new(relay.create_transport(node_id.to_string(), node_id.to_string()));
1325        let conn_factory = Arc::new(crate::mock::MockConnectionFactory::new(
1326            node_id.to_string(),
1327            0,
1328        ));
1329        let signaling = Arc::new(crate::signaling::MeshRouter::new(
1330            node_id.to_string(),
1331            node_id.to_string(),
1332            transport.clone(),
1333            conn_factory,
1334            crate::types::PoolSettings::default(),
1335            false,
1336        ));
1337        let local_store = Arc::new(MemoryStore::new());
1338        let store = Arc::new(TestStore::new_with_routing(
1339            local_store.clone(),
1340            signaling,
1341            Duration::from_millis(120),
1342            false,
1343            routing,
1344        ));
1345
1346        TestNode {
1347            store,
1348            local_store,
1349            transport,
1350        }
1351    }
1352
1353    async fn pump_test_signaling(nodes: &[&TestNode]) -> usize {
1354        let mut processed = 0usize;
1355        for node in nodes {
1356            while let Some(msg) = node.transport.try_recv() {
1357                node.store
1358                    .process_signaling(msg)
1359                    .await
1360                    .expect("process signaling");
1361                processed += 1;
1362            }
1363        }
1364        processed
1365    }
1366
1367    async fn pump_test_data(nodes: &[&TestNode]) -> usize {
1368        let mut processed = 0usize;
1369        for node in nodes {
1370            let peer_ids = node.store.signaling().peer_ids().await;
1371            for peer_id in peer_ids {
1372                let Some(channel) = node.store.signaling().get_channel(&peer_id).await else {
1373                    continue;
1374                };
1375                while let Some(data) = channel.try_recv() {
1376                    node.store.handle_data_message(&peer_id, &data).await;
1377                    processed += 1;
1378                }
1379            }
1380        }
1381        processed
1382    }
1383
1384    async fn pump_test_network(nodes: &[&TestNode], max_steps: usize) {
1385        for _ in 0..max_steps {
1386            let signaling = pump_test_signaling(nodes).await;
1387            let data = pump_test_data(nodes).await;
1388            if signaling + data == 0 {
1389                tokio::task::yield_now().await;
1390            }
1391        }
1392    }
1393
1394    async fn run_get_with_pumps(
1395        requester: Arc<TestStore>,
1396        hash: Hash,
1397        nodes: &[&TestNode],
1398    ) -> Option<Vec<u8>> {
1399        let task = tokio::spawn(async move { requester.get(&hash).await.ok().flatten() });
1400        let started = Instant::now();
1401
1402        loop {
1403            if task.is_finished() {
1404                return task.await.expect("request task join");
1405            }
1406
1407            if started.elapsed() > Duration::from_secs(1) {
1408                task.abort();
1409                return None;
1410            }
1411
1412            pump_test_network(nodes, 4).await;
1413        }
1414    }
1415
1416    async fn run_bad_peer_series(strategy: SelectionStrategy) -> usize {
1417        let _guard = mock_network_lock().lock().await;
1418        crate::mock::clear_channel_registry().await;
1419
1420        let relay = crate::mock::MockRelay::new();
1421        let requester = make_shared_test_node(
1422            relay.clone(),
1423            "requester-reject",
1424            GenericStoreRoutingConfig {
1425                selection_strategy: strategy,
1426                fairness_enabled: false,
1427                dispatch: RequestDispatchConfig {
1428                    initial_fanout: 1,
1429                    hedge_fanout: 1,
1430                    max_fanout: 1,
1431                    hedge_interval_ms: 5,
1432                },
1433                ..Default::default()
1434            },
1435        );
1436        let bad = make_shared_test_node(
1437            relay.clone(),
1438            "a-bad",
1439            GenericStoreRoutingConfig {
1440                response_behavior: ResponseBehaviorConfig {
1441                    drop_response_prob: 1.0,
1442                    ..Default::default()
1443                },
1444                ..Default::default()
1445            },
1446        );
1447        let honest = make_shared_test_node(relay, "b-honest", GenericStoreRoutingConfig::default());
1448        let nodes = [&requester, &bad, &honest];
1449
1450        for node in &nodes {
1451            node.transport
1452                .connect(&[])
1453                .await
1454                .expect("connect transport");
1455            node.store.start().await.expect("start store");
1456        }
1457        pump_test_network(&nodes, 24).await;
1458
1459        let mut successes = 0usize;
1460        for round in 0..6 {
1461            let payload = format!("payload-{round}").into_bytes();
1462            let hash = hashtree_core::sha256(&payload);
1463            let _ = bad.local_store.put(hash, payload.clone()).await;
1464            let _ = honest.local_store.put(hash, payload.clone()).await;
1465
1466            let result = run_get_with_pumps(requester.store.clone(), hash, &nodes).await;
1467            if result.as_ref() == Some(&payload) {
1468                successes += 1;
1469            }
1470        }
1471
1472        crate::mock::clear_channel_registry().await;
1473        successes
1474    }
1475
1476    #[test]
1477    fn test_hedged_wave_plan_flood_all() {
1478        let plan = build_hedged_wave_plan(7, RequestDispatchConfig::default());
1479        assert_eq!(plan, vec![7]);
1480    }
1481
1482    #[test]
1483    fn test_hedged_wave_plan_staged() {
1484        let plan = build_hedged_wave_plan(
1485            10,
1486            RequestDispatchConfig {
1487                initial_fanout: 2,
1488                hedge_fanout: 3,
1489                max_fanout: 8,
1490                hedge_interval_ms: 25,
1491            },
1492        );
1493        assert_eq!(plan, vec![2, 3, 3]);
1494    }
1495
1496    #[test]
1497    fn test_response_behavior_normalization_clamps_probs() {
1498        let raw = ResponseBehaviorConfig {
1499            drop_response_prob: -1.5,
1500            corrupt_response_prob: 9.0,
1501            extra_delay_ms: 12,
1502        };
1503        let normalized = raw.normalized();
1504        assert_eq!(normalized.drop_response_prob, 0.0);
1505        assert_eq!(normalized.corrupt_response_prob, 1.0);
1506        assert_eq!(normalized.extra_delay_ms, 12);
1507    }
1508
1509    #[test]
1510    fn test_actor_draw_is_deterministic_per_peer_hash_and_salt() {
1511        let hash = hashtree_core::sha256(b"deterministic");
1512        let a = TestStore::deterministic_actor_draw_for("peer-a", &hash, 7);
1513        let b = TestStore::deterministic_actor_draw_for("peer-a", &hash, 7);
1514        assert!((a - b).abs() < f64::EPSILON);
1515    }
1516
1517    #[tokio::test]
1518    async fn test_load_peer_metadata_returns_false_when_missing() {
1519        let local_store = Arc::new(MemoryStore::new());
1520        let store = make_test_store(local_store, "0");
1521        assert!(!store.load_peer_metadata().await.expect("load result"));
1522    }
1523
1524    #[tokio::test]
1525    async fn test_persist_and_load_peer_metadata_with_existing_store_adapter() {
1526        let local_store = Arc::new(MemoryStore::new());
1527        let writer = make_test_store(local_store.clone(), "0");
1528        {
1529            let mut selector = writer.peer_selector.write().await;
1530            selector.add_peer("npub1stable:session-a");
1531            selector.record_request("npub1stable:session-a", 64);
1532            selector.record_success("npub1stable:session-a", 35, 1024);
1533            selector.record_cashu_payment("npub1stable:session-a", 120);
1534            selector.record_cashu_receipt("npub1stable:session-a", 40);
1535            selector.record_cashu_payment_default("npub1stable:session-a");
1536        }
1537
1538        let snapshot_hash = writer
1539            .persist_peer_metadata()
1540            .await
1541            .expect("persist peer metadata");
1542        assert!(local_store
1543            .get(&snapshot_hash)
1544            .await
1545            .expect("snapshot lookup")
1546            .is_some());
1547
1548        let reader = make_test_store(local_store, "1");
1549        assert!(reader
1550            .load_peer_metadata()
1551            .await
1552            .expect("load peer metadata snapshot"));
1553
1554        let mut selector = reader.peer_selector.write().await;
1555        selector.add_peer("npub1stable:session-b");
1556        let stats = selector
1557            .get_stats("npub1stable:session-b")
1558            .expect("restored peer stats");
1559        assert_eq!(stats.requests_sent, 1);
1560        assert_eq!(stats.successes, 1);
1561        assert_eq!(stats.cashu_paid_sat, 120);
1562        assert_eq!(stats.cashu_received_sat, 40);
1563        assert_eq!(stats.cashu_payment_receipts, 1);
1564        assert_eq!(stats.cashu_payment_defaults, 1);
1565    }
1566
1567    #[tokio::test]
1568    async fn test_should_refuse_requests_from_peer_after_payment_defaults() {
1569        let local_store = Arc::new(MemoryStore::new());
1570        let store = make_test_store_with_routing(
1571            local_store,
1572            "0",
1573            GenericStoreRoutingConfig {
1574                cashu_payment_default_block_threshold: 1,
1575                ..Default::default()
1576            },
1577        );
1578        store.record_cashu_payment_default_from_peer("peer-a").await;
1579
1580        let selector = store.peer_selector.read().await;
1581        assert!(store.should_refuse_requests_from_peer(&selector, "peer-a"));
1582        assert!(!store.should_refuse_requests_from_peer(&selector, "peer-b"));
1583    }
1584
1585    #[tokio::test]
1586    async fn test_take_valid_quote_consumes_once_and_rejects_expired_quotes() {
1587        let local_store = Arc::new(MemoryStore::new());
1588        let store = make_test_store(local_store, "0");
1589        let hash = hashtree_core::sha256(b"quote-test");
1590        let hash_key = hash_to_key(&hash);
1591
1592        {
1593            let mut issued = store.issued_quotes.write().await;
1594            issued.insert(
1595                ("peer-a".to_string(), hash_key.clone(), 11),
1596                IssuedQuote {
1597                    expires_at: Instant::now() + Duration::from_secs(1),
1598                    payment_sat: 5,
1599                    mint_url: Some("https://mint-a.example".to_string()),
1600                },
1601            );
1602            issued.insert(
1603                ("peer-a".to_string(), hash_key.clone(), 12),
1604                IssuedQuote {
1605                    expires_at: Instant::now() - Duration::from_millis(1),
1606                    payment_sat: 5,
1607                    mint_url: Some("https://mint-a.example".to_string()),
1608                },
1609            );
1610        }
1611
1612        assert!(store.take_valid_quote("peer-a", &hash_key, 11).await);
1613        assert!(!store.take_valid_quote("peer-a", &hash_key, 11).await);
1614        assert!(!store.take_valid_quote("peer-a", &hash_key, 12).await);
1615    }
1616
1617    async fn run_quote_with_pumps(
1618        requester: Arc<TestStore>,
1619        hash: Hash,
1620        payment_sat: u64,
1621        quote_ttl: Duration,
1622        peer_ids: Vec<String>,
1623        nodes: &[&TestNode],
1624    ) -> Option<NegotiatedQuote> {
1625        let task = tokio::spawn(async move {
1626            requester
1627                .request_quote_from_peers(&hash, payment_sat, quote_ttl, &peer_ids)
1628                .await
1629        });
1630        let started = Instant::now();
1631
1632        loop {
1633            if task.is_finished() {
1634                return task.await.expect("quote task join");
1635            }
1636            if started.elapsed() > Duration::from_secs(1) {
1637                task.abort();
1638                return None;
1639            }
1640            pump_test_network(nodes, 4).await;
1641        }
1642    }
1643
1644    #[tokio::test]
1645    async fn test_request_quote_from_peers_rejects_unaccepted_mint() {
1646        let _guard = mock_network_lock().lock().await;
1647        crate::mock::clear_channel_registry().await;
1648
1649        let relay = crate::mock::MockRelay::new();
1650        let requester = make_shared_test_node(
1651            relay.clone(),
1652            "requester-reject",
1653            GenericStoreRoutingConfig {
1654                cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1655                cashu_default_mint: Some("https://mint-a.example".to_string()),
1656                dispatch: RequestDispatchConfig {
1657                    initial_fanout: 1,
1658                    hedge_fanout: 1,
1659                    max_fanout: 1,
1660                    hedge_interval_ms: 5,
1661                },
1662                ..Default::default()
1663            },
1664        );
1665        let provider = make_shared_test_node(
1666            relay,
1667            "provider-reject",
1668            GenericStoreRoutingConfig {
1669                cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1670                ..Default::default()
1671            },
1672        );
1673        let nodes = [&requester, &provider];
1674
1675        requester.transport.connect(&[]).await.expect("connect");
1676        provider.transport.connect(&[]).await.expect("connect");
1677        requester.store.start().await.expect("start");
1678        provider.store.start().await.expect("start");
1679        pump_test_network(&nodes, 24).await;
1680
1681        let payload = b"quoted-data".to_vec();
1682        let hash = hashtree_core::sha256(&payload);
1683        provider.local_store.put(hash, payload).await.expect("put");
1684
1685        let quote = run_quote_with_pumps(
1686            requester.store.clone(),
1687            hash,
1688            9,
1689            Duration::from_millis(80),
1690            vec!["provider-reject".to_string()],
1691            &nodes,
1692        )
1693        .await;
1694        assert!(
1695            quote.is_none(),
1696            "expected quote to be rejected on mint mismatch"
1697        );
1698    }
1699
1700    #[tokio::test]
1701    async fn test_request_quote_from_peers_accepts_small_peer_suggested_mint_under_cap() {
1702        let _guard = mock_network_lock().lock().await;
1703        crate::mock::clear_channel_registry().await;
1704
1705        let relay = crate::mock::MockRelay::new();
1706        let requester = make_shared_test_node(
1707            relay.clone(),
1708            "requester-suggested",
1709            GenericStoreRoutingConfig {
1710                cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1711                cashu_default_mint: Some("https://mint-a.example".to_string()),
1712                cashu_peer_suggested_mint_base_cap_sat: 3,
1713                cashu_peer_suggested_mint_max_cap_sat: 3,
1714                dispatch: RequestDispatchConfig {
1715                    initial_fanout: 1,
1716                    hedge_fanout: 1,
1717                    max_fanout: 1,
1718                    hedge_interval_ms: 5,
1719                },
1720                ..Default::default()
1721            },
1722        );
1723        let provider = make_shared_test_node(
1724            relay,
1725            "provider-suggested",
1726            GenericStoreRoutingConfig {
1727                cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1728                cashu_default_mint: Some("https://mint-b.example".to_string()),
1729                ..Default::default()
1730            },
1731        );
1732        let nodes = [&requester, &provider];
1733
1734        requester.transport.connect(&[]).await.expect("connect");
1735        provider.transport.connect(&[]).await.expect("connect");
1736        requester.store.start().await.expect("start");
1737        provider.store.start().await.expect("start");
1738        pump_test_network(&nodes, 24).await;
1739
1740        let payload = b"quoted-data".to_vec();
1741        let hash = hashtree_core::sha256(&payload);
1742        provider.local_store.put(hash, payload).await.expect("put");
1743
1744        let quote = run_quote_with_pumps(
1745            requester.store.clone(),
1746            hash,
1747            3,
1748            Duration::from_millis(80),
1749            vec!["provider-suggested".to_string()],
1750            &nodes,
1751        )
1752        .await
1753        .expect("expected bounded peer-suggested mint quote");
1754        assert_eq!(quote.mint_url.as_deref(), Some("https://mint-b.example"));
1755    }
1756
1757    #[tokio::test]
1758    async fn test_request_quote_from_peers_scales_peer_suggested_mint_cap_with_reputation() {
1759        let _guard = mock_network_lock().lock().await;
1760        crate::mock::clear_channel_registry().await;
1761
1762        let relay = crate::mock::MockRelay::new();
1763        let requester = make_shared_test_node(
1764            relay.clone(),
1765            "requester-reputation",
1766            GenericStoreRoutingConfig {
1767                cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1768                cashu_default_mint: Some("https://mint-a.example".to_string()),
1769                cashu_peer_suggested_mint_base_cap_sat: 1,
1770                cashu_peer_suggested_mint_success_step_sat: 1,
1771                cashu_peer_suggested_mint_receipt_step_sat: 2,
1772                cashu_peer_suggested_mint_max_cap_sat: 5,
1773                dispatch: RequestDispatchConfig {
1774                    initial_fanout: 1,
1775                    hedge_fanout: 1,
1776                    max_fanout: 1,
1777                    hedge_interval_ms: 5,
1778                },
1779                ..Default::default()
1780            },
1781        );
1782        let provider = make_shared_test_node(
1783            relay,
1784            "provider-reputation",
1785            GenericStoreRoutingConfig {
1786                cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1787                cashu_default_mint: Some("https://mint-b.example".to_string()),
1788                ..Default::default()
1789            },
1790        );
1791        let nodes = [&requester, &provider];
1792
1793        requester.transport.connect(&[]).await.expect("connect");
1794        provider.transport.connect(&[]).await.expect("connect");
1795        requester.store.start().await.expect("start");
1796        provider.store.start().await.expect("start");
1797        pump_test_network(&nodes, 24).await;
1798
1799        let payload = b"quoted-data".to_vec();
1800        let hash = hashtree_core::sha256(&payload);
1801        provider.local_store.put(hash, payload).await.expect("put");
1802
1803        let quote = run_quote_with_pumps(
1804            requester.store.clone(),
1805            hash,
1806            4,
1807            Duration::from_millis(80),
1808            vec!["provider-reputation".to_string()],
1809            &nodes,
1810        )
1811        .await;
1812        assert!(
1813            quote.is_none(),
1814            "new peer should not get a 4 sat untrusted-mint quote"
1815        );
1816
1817        {
1818            let mut selector = requester.store.peer_selector.write().await;
1819            selector.add_peer("provider-reputation");
1820            selector.record_success("provider-reputation", 20, 1024);
1821            selector.record_cashu_receipt("provider-reputation", 2);
1822        }
1823
1824        let quote = run_quote_with_pumps(
1825            requester.store.clone(),
1826            hash,
1827            4,
1828            Duration::from_millis(80),
1829            vec!["provider-reputation".to_string()],
1830            &nodes,
1831        )
1832        .await
1833        .expect("reputable peer should get larger bounded quote");
1834        assert_eq!(quote.mint_url.as_deref(), Some("https://mint-b.example"));
1835
1836        requester
1837            .store
1838            .record_cashu_payment_default_from_peer("provider-reputation")
1839            .await;
1840        let quote = run_quote_with_pumps(
1841            requester.store.clone(),
1842            hash,
1843            4,
1844            Duration::from_millis(80),
1845            vec!["provider-reputation".to_string()],
1846            &nodes,
1847        )
1848        .await;
1849        assert!(
1850            quote.is_none(),
1851            "peer-suggested mint exposure should drop to zero after defaults exceed receipts"
1852        );
1853    }
1854
1855    #[tokio::test]
1856    async fn test_request_quote_from_peers_returns_matching_mint() {
1857        let _guard = mock_network_lock().lock().await;
1858        crate::mock::clear_channel_registry().await;
1859
1860        let relay = crate::mock::MockRelay::new();
1861        let requester = make_shared_test_node(
1862            relay.clone(),
1863            "requester-match",
1864            GenericStoreRoutingConfig {
1865                cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1866                cashu_default_mint: Some("https://mint-a.example".to_string()),
1867                dispatch: RequestDispatchConfig {
1868                    initial_fanout: 1,
1869                    hedge_fanout: 1,
1870                    max_fanout: 1,
1871                    hedge_interval_ms: 5,
1872                },
1873                ..Default::default()
1874            },
1875        );
1876        let provider = make_shared_test_node(
1877            relay,
1878            "provider-match",
1879            GenericStoreRoutingConfig {
1880                cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1881                ..Default::default()
1882            },
1883        );
1884        let nodes = [&requester, &provider];
1885
1886        requester.transport.connect(&[]).await.expect("connect");
1887        provider.transport.connect(&[]).await.expect("connect");
1888        requester.store.start().await.expect("start");
1889        provider.store.start().await.expect("start");
1890        pump_test_network(&nodes, 24).await;
1891
1892        let payload = b"quoted-data".to_vec();
1893        let hash = hashtree_core::sha256(&payload);
1894        provider.local_store.put(hash, payload).await.expect("put");
1895
1896        let quote = run_quote_with_pumps(
1897            requester.store.clone(),
1898            hash,
1899            9,
1900            Duration::from_millis(80),
1901            vec!["provider-match".to_string()],
1902            &nodes,
1903        )
1904        .await
1905        .expect("expected quote");
1906        assert_eq!(quote.mint_url.as_deref(), Some("https://mint-a.example"));
1907    }
1908
1909    #[tokio::test]
1910    async fn test_tit_for_tat_store_path_recovers_after_bad_peer_observation() {
1911        let tit_for_tat_successes = run_bad_peer_series(SelectionStrategy::TitForTat).await;
1912
1913        assert!(
1914            tit_for_tat_successes >= 5,
1915            "expected tit-for-tat path to recover after the first consistently bad peer observation (successes={tit_for_tat_successes})"
1916        );
1917    }
1918}
1919
1920/// Type alias for simulation store
1921pub type SimStore<S> =
1922    GenericStore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
1923
1924/// Type alias for production store (using real WebRTC)
1925pub type ProductionStore<S> = GenericStore<
1926    S,
1927    crate::nostr::NostrRelayTransport,
1928    crate::real_factory::RealPeerConnectionFactory,
1929>;