Skip to main content

hashtree_webrtc/
generic_store.rs

1//! Generic P2P store using abstract transports
2//!
3//! This module provides a Store implementation that works with any
4//! RelayTransport and PeerConnectionFactory. Both production (real WebRTC)
5//! and simulation (mocks) use this same code.
6
7use async_trait::async_trait;
8use std::collections::hash_map::DefaultHasher;
9use std::collections::{HashMap, HashSet};
10use std::hash::{Hash as _, Hasher};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::{oneshot, RwLock};
14
15use hashtree_core::{Hash, Store, StoreError};
16
17use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
18use crate::protocol::{
19    create_quote_request, create_quote_response_available, create_quote_response_unavailable,
20    create_request, create_request_with_quote, create_response, encode_quote_request,
21    encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
22    DataMessage, DataQuoteRequest, DataQuoteResponse,
23};
24use crate::signaling::SignalingManager;
25use crate::transport::{PeerConnectionFactory, RelayTransport, TransportError};
26use crate::types::{PeerHTLConfig, SignalingMessage, MAX_HTL};
27
28const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
29
30/// Pending request awaiting response
31struct PendingRequest {
32    response_tx: oneshot::Sender<Option<Vec<u8>>>,
33    started_at: Instant,
34    queried_peers: Vec<String>,
35}
36
37struct PendingQuoteRequest {
38    response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
39    preferred_mint_url: Option<String>,
40    offered_payment_sat: u64,
41}
42
43#[derive(Debug, Clone)]
44struct NegotiatedQuote {
45    peer_id: String,
46    quote_id: u64,
47    #[allow(dead_code)]
48    mint_url: Option<String>,
49}
50
51struct IssuedQuote {
52    expires_at: Instant,
53    #[allow(dead_code)]
54    payment_sat: u64,
55    #[allow(dead_code)]
56    mint_url: Option<String>,
57}
58
59/// Request dispatch strategy for peer queries.
60///
61/// `GenericStore` supports two practical modes:
62/// - Flood (`usize::MAX` fanout): maximize success/latency at bandwidth cost.
63/// - Staged hedging: probe a subset first, then expand.
64#[derive(Debug, Clone, Copy)]
65pub struct RequestDispatchConfig {
66    /// Number of peers queried immediately.
67    pub initial_fanout: usize,
68    /// Number of additional peers to query on each hedge step.
69    pub hedge_fanout: usize,
70    /// Total peers allowed for this request.
71    pub max_fanout: usize,
72    /// Delay between hedge waves (ms). `0` means send all waves immediately.
73    pub hedge_interval_ms: u64,
74}
75
76impl Default for RequestDispatchConfig {
77    fn default() -> Self {
78        Self {
79            initial_fanout: usize::MAX,
80            hedge_fanout: usize::MAX,
81            max_fanout: usize::MAX,
82            hedge_interval_ms: 0,
83        }
84    }
85}
86
87/// Normalize fanout config against current peer availability.
88pub fn normalize_dispatch_config(
89    dispatch: RequestDispatchConfig,
90    available_peers: usize,
91) -> RequestDispatchConfig {
92    let mut cfg = dispatch;
93    let cap = if cfg.max_fanout == 0 {
94        available_peers
95    } else {
96        cfg.max_fanout.min(available_peers)
97    };
98    cfg.max_fanout = cap;
99    cfg.initial_fanout = if cfg.initial_fanout == 0 {
100        1
101    } else {
102        cfg.initial_fanout.min(cap.max(1))
103    };
104    cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
105        1
106    } else {
107        cfg.hedge_fanout.min(cap.max(1))
108    };
109    cfg
110}
111
112/// Build wave sizes for staged hedged dispatch.
113pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
114    if peer_count == 0 {
115        return Vec::new();
116    }
117    let cap = dispatch.max_fanout.min(peer_count);
118    if cap == 0 {
119        return Vec::new();
120    }
121
122    let mut plan = Vec::new();
123    let mut sent = 0usize;
124    let first = dispatch.initial_fanout.min(cap).max(1);
125    plan.push(first);
126    sent += first;
127
128    while sent < cap {
129        let next = dispatch.hedge_fanout.min(cap - sent).max(1);
130        plan.push(next);
131        sent += next;
132    }
133    plan
134}
135
136/// Keep selector membership aligned with currently connected peer IDs.
137pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
138    let mut selector = selector.write().await;
139    let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
140    let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
141    for peer_id in known {
142        if !current.contains(peer_id.as_str()) {
143            selector.remove_peer(&peer_id);
144        }
145    }
146    for peer_id in current_peer_ids {
147        selector.add_peer(peer_id.clone());
148    }
149}
150
151/// Response behavior profile for simulation/game-theory actors.
152///
153/// Defaults to honest behavior (always respond correctly, no extra delay).
154#[derive(Debug, Clone, Copy)]
155pub struct ResponseBehaviorConfig {
156    /// Probability that a node drops a response even when it has data.
157    pub drop_response_prob: f64,
158    /// Probability that a node responds with corrupted payload.
159    pub corrupt_response_prob: f64,
160    /// Optional response delay to model slow/incompetent peers.
161    pub extra_delay_ms: u64,
162}
163
164impl Default for ResponseBehaviorConfig {
165    fn default() -> Self {
166        Self {
167            drop_response_prob: 0.0,
168            corrupt_response_prob: 0.0,
169            extra_delay_ms: 0,
170        }
171    }
172}
173
174impl ResponseBehaviorConfig {
175    fn normalized(self) -> Self {
176        Self {
177            drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
178            corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
179            extra_delay_ms: self.extra_delay_ms,
180        }
181    }
182}
183
184/// Routing policy for request ordering + dispatch fanout.
185#[derive(Debug, Clone)]
186pub struct GenericStoreRoutingConfig {
187    pub selection_strategy: SelectionStrategy,
188    pub fairness_enabled: bool,
189    /// Blend weight for payment-priority ranking in selector (`0.0` disables).
190    pub cashu_payment_weight: f64,
191    /// Refuse serving peers that have reached this many unpaid post-delivery settlements.
192    /// `0` disables refusal and only keeps metadata/downranking.
193    pub cashu_payment_default_block_threshold: u64,
194    /// Cashu mint URLs this node is willing to use for settlement.
195    pub cashu_accepted_mints: Vec<String>,
196    /// Preferred Cashu mint URL when initiating paid retrieval.
197    pub cashu_default_mint: Option<String>,
198    /// Baseline cap for accepting a peer-suggested mint outside the trusted set.
199    pub cashu_peer_suggested_mint_base_cap_sat: u64,
200    /// Additional sats allowed per successful delivery from that peer.
201    pub cashu_peer_suggested_mint_success_step_sat: u64,
202    /// Additional sats allowed per successful post-delivery payment received from that peer.
203    pub cashu_peer_suggested_mint_receipt_step_sat: u64,
204    /// Hard upper bound for any single peer-suggested mint quote we accept.
205    pub cashu_peer_suggested_mint_max_cap_sat: u64,
206    pub dispatch: RequestDispatchConfig,
207    pub response_behavior: ResponseBehaviorConfig,
208}
209
210impl Default for GenericStoreRoutingConfig {
211    fn default() -> Self {
212        Self {
213            selection_strategy: SelectionStrategy::Weighted,
214            fairness_enabled: true,
215            cashu_payment_weight: 0.0,
216            cashu_payment_default_block_threshold: 0,
217            cashu_accepted_mints: Vec::new(),
218            cashu_default_mint: None,
219            cashu_peer_suggested_mint_base_cap_sat: 0,
220            cashu_peer_suggested_mint_success_step_sat: 0,
221            cashu_peer_suggested_mint_receipt_step_sat: 0,
222            cashu_peer_suggested_mint_max_cap_sat: 0,
223            dispatch: RequestDispatchConfig::default(),
224            response_behavior: ResponseBehaviorConfig::default(),
225        }
226    }
227}
228
229/// Generic P2P store that works with any transport implementation
230///
231/// This is the shared code between production and simulation.
232/// - Production: GenericStore<NostrRelayTransport, RealPeerConnectionFactory>
233/// - Simulation: GenericStore<MockRelayTransport, MockConnectionFactory>
234pub struct GenericStore<S, R, F>
235where
236    S: Store + Send + Sync + 'static,
237    R: RelayTransport + Send + Sync + 'static,
238    F: PeerConnectionFactory + Send + Sync + 'static,
239{
240    /// Local backing store
241    local_store: Arc<S>,
242    /// Signaling manager (handles peer discovery and connection)
243    signaling: Arc<SignalingManager<R, F>>,
244    /// Per-peer HTL config
245    htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
246    /// Pending requests we sent
247    pending_requests: RwLock<HashMap<String, PendingRequest>>,
248    /// Pending quote negotiations keyed by requested hash.
249    pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
250    /// Quotes we issued to peers and will accept exactly once until expiry.
251    issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
252    /// Monotonic quote identifier generator.
253    next_quote_id: RwLock<u64>,
254    /// Adaptive selector for peer ordering.
255    peer_selector: RwLock<PeerSelector>,
256    /// Routing/dispatch configuration.
257    routing: GenericStoreRoutingConfig,
258    /// Request timeout
259    request_timeout: Duration,
260    /// Debug mode
261    debug: bool,
262    /// Running flag
263    running: RwLock<bool>,
264}
265
266impl<S, R, F> GenericStore<S, R, F>
267where
268    S: Store + Send + Sync + 'static,
269    R: RelayTransport + Send + Sync + 'static,
270    F: PeerConnectionFactory + Send + Sync + 'static,
271{
272    /// Create a new generic store
273    pub fn new(
274        local_store: Arc<S>,
275        signaling: Arc<SignalingManager<R, F>>,
276        request_timeout: Duration,
277        debug: bool,
278    ) -> Self {
279        Self::new_with_routing(
280            local_store,
281            signaling,
282            request_timeout,
283            debug,
284            Default::default(),
285        )
286    }
287
288    /// Create a new generic store with explicit routing configuration.
289    pub fn new_with_routing(
290        local_store: Arc<S>,
291        signaling: Arc<SignalingManager<R, F>>,
292        request_timeout: Duration,
293        debug: bool,
294        routing: GenericStoreRoutingConfig,
295    ) -> Self {
296        let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
297        selector.set_fairness(routing.fairness_enabled);
298        selector.set_cashu_payment_weight(routing.cashu_payment_weight);
299        Self {
300            local_store,
301            signaling,
302            htl_configs: RwLock::new(HashMap::new()),
303            pending_requests: RwLock::new(HashMap::new()),
304            pending_quotes: RwLock::new(HashMap::new()),
305            issued_quotes: RwLock::new(HashMap::new()),
306            next_quote_id: RwLock::new(1),
307            peer_selector: RwLock::new(selector),
308            routing,
309            request_timeout,
310            debug,
311            running: RwLock::new(false),
312        }
313    }
314
315    /// Start the store (begin listening for messages)
316    pub async fn start(&self) -> Result<(), TransportError> {
317        *self.running.write().await = true;
318
319        // Send initial hello
320        self.signaling.send_hello(vec![]).await?;
321
322        Ok(())
323    }
324
325    /// Stop the store
326    pub async fn stop(&self) {
327        *self.running.write().await = false;
328    }
329
330    /// Process incoming signaling message
331    pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
332        // When a new peer connects, initialize their HTL config
333        let peer_id = msg.peer_id().to_string();
334        {
335            let mut configs = self.htl_configs.write().await;
336            if !configs.contains_key(&peer_id) {
337                configs.insert(peer_id.clone(), PeerHTLConfig::random());
338            }
339        }
340        self.peer_selector.write().await.add_peer(peer_id);
341
342        self.signaling.handle_message(msg).await
343    }
344
345    /// Get signaling manager reference
346    pub fn signaling(&self) -> &Arc<SignalingManager<R, F>> {
347        &self.signaling
348    }
349
350    fn response_behavior(&self) -> ResponseBehaviorConfig {
351        self.routing.response_behavior.normalized()
352    }
353
354    fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
355        let mut hasher = DefaultHasher::new();
356        peer_id.hash(&mut hasher);
357        hash.hash(&mut hasher);
358        salt.hash(&mut hasher);
359        let v = hasher.finish();
360        (v as f64) / (u64::MAX as f64)
361    }
362
363    fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
364        Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
365    }
366
367    fn peer_metadata_pointer_slot_hash() -> Hash {
368        hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
369    }
370
371    fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
372        let bytes = hex::decode(hash_hex)
373            .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
374        if bytes.len() != 32 {
375            return Err(StoreError::Other(format!(
376                "Invalid hash length {}, expected 32 bytes",
377                bytes.len()
378            )));
379        }
380        let mut hash = [0u8; 32];
381        hash.copy_from_slice(&bytes);
382        Ok(hash)
383    }
384
385    fn should_drop_response(&self, hash: &Hash) -> bool {
386        let p = self.response_behavior().drop_response_prob;
387        if p <= 0.0 {
388            return false;
389        }
390        self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
391    }
392
393    fn should_corrupt_response(&self, hash: &Hash) -> bool {
394        let p = self.response_behavior().corrupt_response_prob;
395        if p <= 0.0 {
396            return false;
397        }
398        self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
399    }
400
401    async fn ordered_connected_peers(&self) -> Vec<String> {
402        let current_peer_ids = self.signaling.peer_ids().await;
403        if current_peer_ids.is_empty() {
404            return Vec::new();
405        }
406
407        sync_selector_peers(&self.peer_selector, &current_peer_ids).await;
408        let current_set: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
409        let mut ordered_peer_ids = self.peer_selector.write().await.select_peers();
410        ordered_peer_ids.retain(|peer_id| current_set.contains(peer_id.as_str()));
411        if ordered_peer_ids.is_empty() {
412            let mut fallback = current_peer_ids;
413            fallback.sort();
414            return fallback;
415        }
416        ordered_peer_ids
417    }
418
419    fn requested_quote_mint(&self) -> Option<&str> {
420        if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
421            if self.routing.cashu_accepted_mints.is_empty()
422                || self
423                    .routing
424                    .cashu_accepted_mints
425                    .iter()
426                    .any(|mint| mint == default_mint)
427            {
428                return Some(default_mint);
429            }
430        }
431
432        self.routing
433            .cashu_accepted_mints
434            .first()
435            .map(String::as_str)
436    }
437
438    fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
439        if let Some(requested_mint) = requested_mint {
440            if self.accepts_quote_mint(Some(requested_mint)) {
441                return Some(requested_mint.to_string());
442            }
443        }
444        if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
445            return Some(default_mint.clone());
446        }
447        if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
448            return Some(first_mint.clone());
449        }
450        requested_mint.map(str::to_string)
451    }
452
453    fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
454        if self.routing.cashu_accepted_mints.is_empty() {
455            return true;
456        }
457
458        let Some(mint_url) = mint_url else {
459            return false;
460        };
461        self.routing
462            .cashu_accepted_mints
463            .iter()
464            .any(|mint| mint == mint_url)
465    }
466
467    fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
468        let Some(mint_url) = mint_url else {
469            return self.routing.cashu_default_mint.is_none()
470                && self.routing.cashu_accepted_mints.is_empty();
471        };
472        self.routing.cashu_default_mint.as_deref() == Some(mint_url)
473            || self
474                .routing
475                .cashu_accepted_mints
476                .iter()
477                .any(|mint| mint == mint_url)
478    }
479
480    async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
481        let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
482        if base == 0 {
483            return 0;
484        }
485
486        let selector = self.peer_selector.read().await;
487        let Some(stats) = selector.get_stats(peer_id) else {
488            let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
489            return if max_cap > 0 { base.min(max_cap) } else { base };
490        };
491
492        if stats.cashu_payment_defaults > 0
493            && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
494        {
495            return 0;
496        }
497
498        let success_bonus = stats
499            .successes
500            .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
501        let receipt_bonus = stats
502            .cashu_payment_receipts
503            .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
504        let mut cap = base
505            .saturating_add(success_bonus)
506            .saturating_add(receipt_bonus);
507        let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
508        if max_cap > 0 {
509            cap = cap.min(max_cap);
510        }
511        cap
512    }
513
514    async fn should_accept_quote_response(
515        &self,
516        from_peer: &str,
517        preferred_mint_url: Option<&str>,
518        offered_payment_sat: u64,
519        res: &DataQuoteResponse,
520    ) -> bool {
521        let Some(payment_sat) = res.p else {
522            return false;
523        };
524        if payment_sat > offered_payment_sat {
525            return false;
526        }
527
528        let response_mint = res.m.as_deref();
529        if response_mint == preferred_mint_url {
530            return true;
531        }
532        if self.trusts_quote_mint(response_mint) {
533            return true;
534        }
535        if response_mint.is_none() {
536            return false;
537        }
538
539        payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
540    }
541
542    async fn issue_quote(
543        &self,
544        peer_id: &str,
545        hash_key: &str,
546        payment_sat: u64,
547        ttl_ms: u32,
548        mint_url: Option<&str>,
549    ) -> u64 {
550        let quote_id = {
551            let mut next = self.next_quote_id.write().await;
552            let quote_id = *next;
553            *next = next.saturating_add(1);
554            quote_id
555        };
556
557        let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
558        self.issued_quotes.write().await.insert(
559            (peer_id.to_string(), hash_key.to_string(), quote_id),
560            IssuedQuote {
561                expires_at,
562                payment_sat,
563                mint_url: mint_url.map(str::to_string),
564            },
565        );
566        quote_id
567    }
568
569    async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
570        let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
571        let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
572            return false;
573        };
574        quote.expires_at > Instant::now()
575    }
576
577    async fn send_request_to_peer(
578        &self,
579        peer_id: &str,
580        hash: &Hash,
581        quote_id: Option<u64>,
582    ) -> bool {
583        let channel = match self.signaling.get_channel(peer_id).await {
584            Some(c) => c,
585            None => return false,
586        };
587
588        let htl_config = {
589            let configs = self.htl_configs.read().await;
590            configs
591                .get(peer_id)
592                .cloned()
593                .unwrap_or_else(PeerHTLConfig::random)
594        };
595
596        let send_htl = htl_config.decrement(MAX_HTL);
597        let req = match quote_id {
598            Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
599            None => create_request(hash, send_htl),
600        };
601        let request_bytes = encode_request(&req);
602
603        {
604            let mut selector = self.peer_selector.write().await;
605            selector.record_request(peer_id, request_bytes.len() as u64);
606        }
607
608        match channel.send(request_bytes).await {
609            Ok(()) => true,
610            Err(_) => {
611                self.peer_selector.write().await.record_failure(peer_id);
612                false
613            }
614        }
615    }
616
617    async fn send_quote_request_to_peer(
618        &self,
619        peer_id: &str,
620        hash: &Hash,
621        payment_sat: u64,
622        ttl_ms: u32,
623        mint_url: Option<&str>,
624    ) -> bool {
625        let channel = match self.signaling.get_channel(peer_id).await {
626            Some(c) => c,
627            None => return false,
628        };
629
630        let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
631        let request_bytes = encode_quote_request(&req);
632
633        match channel.send(request_bytes).await {
634            Ok(()) => true,
635            Err(_) => false,
636        }
637    }
638
639    /// Get peer count
640    pub async fn peer_count(&self) -> usize {
641        self.signaling.peer_count().await
642    }
643
644    /// Check if we need more peers
645    pub async fn needs_peers(&self) -> bool {
646        self.signaling.needs_peers().await
647    }
648
649    /// Re-broadcast hello to refresh discovery as topology changes.
650    pub async fn send_hello(&self) -> Result<(), TransportError> {
651        self.signaling.send_hello(vec![]).await
652    }
653
654    /// Apply an out-of-band payment credit to a peer's routing priority.
655    pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
656        self.peer_selector
657            .write()
658            .await
659            .record_cashu_payment(peer_id, amount_sat);
660    }
661
662    /// Record a post-delivery payment we received from a peer.
663    pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
664        self.peer_selector
665            .write()
666            .await
667            .record_cashu_receipt(peer_id, amount_sat);
668    }
669
670    /// Record that a peer failed to pay after we delivered successfully.
671    pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
672        self.peer_selector
673            .write()
674            .await
675            .record_cashu_payment_default(peer_id);
676    }
677
678    fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
679        selector.is_peer_blocked_for_payment_defaults(
680            peer_id,
681            self.routing.cashu_payment_default_block_threshold,
682        )
683    }
684
685    /// Export live peer metadata for inspection/debugging.
686    pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
687        self.peer_selector
688            .read()
689            .await
690            .export_peer_metadata_snapshot()
691    }
692
693    /// Snapshot current peer metadata and persist it into `local_store`.
694    ///
695    /// Uses content-addressed storage for the snapshot body and a reserved
696    /// mutable pointer slot for the "latest snapshot hash".
697    pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
698        let snapshot = self
699            .peer_selector
700            .read()
701            .await
702            .export_peer_metadata_snapshot();
703        let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
704            StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
705        })?;
706        let snapshot_hash = hashtree_core::sha256(&bytes);
707        let _ = self.local_store.put(snapshot_hash, bytes).await?;
708
709        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
710        let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
711        let _ = self.local_store.delete(&pointer_slot).await?;
712        let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
713
714        Ok(snapshot_hash)
715    }
716
717    /// Load persisted peer metadata from `local_store` if available.
718    pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
719        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
720        let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
721            return Ok(false);
722        };
723        let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
724            StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
725        })?;
726        let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
727
728        let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
729            return Ok(false);
730        };
731        let snapshot: PeerMetadataSnapshot =
732            serde_json::from_slice(&snapshot_bytes).map_err(|e| {
733                StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
734            })?;
735        self.peer_selector
736            .write()
737            .await
738            .import_peer_metadata_snapshot(&snapshot);
739        Ok(true)
740    }
741
742    /// Request data from peers after negotiating a paid quote.
743    ///
744    /// If quote negotiation fails or the quoted peer does not deliver, the store
745    /// falls back to the normal unpaid retrieval path to preserve liveness.
746    pub async fn get_with_quote(
747        &self,
748        hash: &Hash,
749        payment_sat: u64,
750        quote_ttl: Duration,
751    ) -> Result<Option<Vec<u8>>, StoreError> {
752        if let Some(data) = self.local_store.get(hash).await? {
753            return Ok(Some(data));
754        }
755        Ok(self
756            .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
757            .await)
758    }
759
760    async fn request_from_peers_with_quote(
761        &self,
762        hash: &Hash,
763        payment_sat: u64,
764        quote_ttl: Duration,
765    ) -> Option<Vec<u8>> {
766        let ordered_peer_ids = self.ordered_connected_peers().await;
767        if ordered_peer_ids.is_empty() {
768            return None;
769        }
770
771        if let Some(quote) = self
772            .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
773            .await
774        {
775            if let Some(data) = self
776                .request_from_single_peer(hash, &quote.peer_id, Some(quote.quote_id))
777                .await
778            {
779                return Some(data);
780            }
781        }
782
783        self.request_from_ordered_peers(hash, &ordered_peer_ids)
784            .await
785    }
786
787    async fn request_quote_from_peers(
788        &self,
789        hash: &Hash,
790        payment_sat: u64,
791        quote_ttl: Duration,
792        ordered_peer_ids: &[String],
793    ) -> Option<NegotiatedQuote> {
794        if ordered_peer_ids.is_empty() {
795            return None;
796        }
797        let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
798        if ttl_ms == 0 {
799            return None;
800        }
801        let requested_mint = self.requested_quote_mint().map(str::to_string);
802
803        let dispatch = normalize_dispatch_config(self.routing.dispatch, ordered_peer_ids.len());
804        let wave_plan = build_hedged_wave_plan(ordered_peer_ids.len(), dispatch);
805        if wave_plan.is_empty() {
806            return None;
807        }
808
809        let hash_key = hash_to_key(hash);
810        let (tx, rx) = oneshot::channel();
811        self.pending_quotes.write().await.insert(
812            hash_key.clone(),
813            PendingQuoteRequest {
814                response_tx: tx,
815                preferred_mint_url: requested_mint.clone(),
816                offered_payment_sat: payment_sat,
817            },
818        );
819
820        let mut sent_total = 0usize;
821        let mut next_peer_idx = 0usize;
822        let mut rx = rx;
823        let deadline = Instant::now() + self.request_timeout;
824
825        for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
826            let from = next_peer_idx;
827            let to = (next_peer_idx + wave_size).min(ordered_peer_ids.len());
828            for peer_id in &ordered_peer_ids[from..to] {
829                if self
830                    .send_quote_request_to_peer(
831                        peer_id,
832                        hash,
833                        payment_sat,
834                        ttl_ms,
835                        requested_mint.as_deref(),
836                    )
837                    .await
838                {
839                    sent_total += 1;
840                }
841            }
842            next_peer_idx = to;
843
844            if sent_total == 0 {
845                if next_peer_idx >= ordered_peer_ids.len() {
846                    break;
847                }
848                continue;
849            }
850
851            let now = Instant::now();
852            if now >= deadline {
853                break;
854            }
855            let remaining = deadline.saturating_duration_since(now);
856            let is_last_wave =
857                wave_idx + 1 == wave_plan.len() || next_peer_idx >= ordered_peer_ids.len();
858            let wait = if is_last_wave {
859                remaining
860            } else if dispatch.hedge_interval_ms == 0 {
861                Duration::ZERO
862            } else {
863                Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
864            };
865
866            if wait.is_zero() {
867                continue;
868            }
869
870            match tokio::time::timeout(wait, &mut rx).await {
871                Ok(Ok(Some(quote))) => {
872                    let _ = self.pending_quotes.write().await.remove(&hash_key);
873                    return Some(quote);
874                }
875                Ok(Ok(None)) => break,
876                Ok(Err(_)) => break,
877                Err(_) => {}
878            }
879        }
880
881        let _ = self.pending_quotes.write().await.remove(&hash_key);
882        None
883    }
884
885    async fn request_from_single_peer(
886        &self,
887        hash: &Hash,
888        peer_id: &str,
889        quote_id: Option<u64>,
890    ) -> Option<Vec<u8>> {
891        let hash_key = hash_to_key(hash);
892        let (tx, rx) = oneshot::channel();
893        self.pending_requests.write().await.insert(
894            hash_key.clone(),
895            PendingRequest {
896                response_tx: tx,
897                started_at: Instant::now(),
898                queried_peers: vec![peer_id.to_string()],
899            },
900        );
901
902        let mut rx = rx;
903        if !self.send_request_to_peer(peer_id, hash, quote_id).await {
904            let _ = self.pending_requests.write().await.remove(&hash_key);
905            return None;
906        }
907
908        if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
909            if hashtree_core::sha256(&data) == *hash {
910                let _ = self.local_store.put(*hash, data.clone()).await;
911                return Some(data);
912            }
913        }
914
915        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
916            for peer_id in pending.queried_peers {
917                self.peer_selector.write().await.record_timeout(&peer_id);
918            }
919        }
920        None
921    }
922
923    async fn request_from_ordered_peers(
924        &self,
925        hash: &Hash,
926        ordered_peer_ids: &[String],
927    ) -> Option<Vec<u8>> {
928        let dispatch = normalize_dispatch_config(self.routing.dispatch, ordered_peer_ids.len());
929        let wave_plan = build_hedged_wave_plan(ordered_peer_ids.len(), dispatch);
930        if wave_plan.is_empty() {
931            return None;
932        }
933
934        let hash_key = hash_to_key(hash);
935        let (tx, rx) = oneshot::channel();
936        self.pending_requests.write().await.insert(
937            hash_key.clone(),
938            PendingRequest {
939                response_tx: tx,
940                started_at: Instant::now(),
941                queried_peers: Vec::new(),
942            },
943        );
944
945        let mut sent_total = 0usize;
946        let mut next_peer_idx = 0usize;
947        let mut rx = rx;
948        let deadline = Instant::now() + self.request_timeout;
949
950        for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
951            let from = next_peer_idx;
952            let to = (next_peer_idx + wave_size).min(ordered_peer_ids.len());
953            for peer_id in &ordered_peer_ids[from..to] {
954                if self.send_request_to_peer(peer_id, hash, None).await {
955                    sent_total += 1;
956                    if let Some(pending) = self.pending_requests.write().await.get_mut(&hash_key) {
957                        pending.queried_peers.push(peer_id.clone());
958                    }
959                }
960            }
961            next_peer_idx = to;
962
963            if sent_total == 0 {
964                if next_peer_idx >= ordered_peer_ids.len() {
965                    break;
966                }
967                continue;
968            }
969
970            let now = Instant::now();
971            if now >= deadline {
972                break;
973            }
974            let remaining = deadline.saturating_duration_since(now);
975            let is_last_wave =
976                wave_idx + 1 == wave_plan.len() || next_peer_idx >= ordered_peer_ids.len();
977            let wait = if is_last_wave {
978                remaining
979            } else if dispatch.hedge_interval_ms == 0 {
980                Duration::ZERO
981            } else {
982                Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
983            };
984
985            if wait.is_zero() {
986                continue;
987            }
988
989            match tokio::time::timeout(wait, &mut rx).await {
990                Ok(Ok(Some(data))) => {
991                    if hashtree_core::sha256(&data) == *hash {
992                        let _ = self.local_store.put(*hash, data.clone()).await;
993                        return Some(data);
994                    }
995                }
996                Ok(Ok(None)) => break,
997                Ok(Err(_)) => break,
998                Err(_) => {
999                    // Timed wait window expired; send next hedge wave if any.
1000                }
1001            }
1002        }
1003
1004        if sent_total == 0 {
1005            let _ = self.pending_requests.write().await.remove(&hash_key);
1006            return None;
1007        }
1008
1009        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1010            for peer_id in pending.queried_peers {
1011                self.peer_selector.write().await.record_timeout(&peer_id);
1012            }
1013        }
1014        None
1015    }
1016
1017    /// Request data from peers
1018    async fn request_from_peers(&self, hash: &Hash) -> Option<Vec<u8>> {
1019        let ordered_peer_ids = self.ordered_connected_peers().await;
1020        if ordered_peer_ids.is_empty() {
1021            return None;
1022        }
1023        self.request_from_ordered_peers(hash, &ordered_peer_ids)
1024            .await
1025    }
1026
1027    async fn complete_pending_response(&self, from_peer: &str, hash_key: String, payload: Vec<u8>) {
1028        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1029            let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
1030            self.peer_selector.write().await.record_success(
1031                from_peer,
1032                rtt_ms,
1033                payload.len() as u64,
1034            );
1035            let _ = pending.response_tx.send(Some(payload));
1036        }
1037    }
1038
1039    async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
1040        if !res.a {
1041            return;
1042        }
1043
1044        let Some(quote_id) = res.q else {
1045            return;
1046        };
1047
1048        let hash_key = hash_to_key(&res.h);
1049        let (preferred_mint_url, offered_payment_sat) = {
1050            let pending_quotes = self.pending_quotes.read().await;
1051            let Some(pending) = pending_quotes.get(&hash_key) else {
1052                return;
1053            };
1054            (
1055                pending.preferred_mint_url.clone(),
1056                pending.offered_payment_sat,
1057            )
1058        };
1059        if !self
1060            .should_accept_quote_response(
1061                from_peer,
1062                preferred_mint_url.as_deref(),
1063                offered_payment_sat,
1064                &res,
1065            )
1066            .await
1067        {
1068            return;
1069        }
1070        let mut pending_quotes = self.pending_quotes.write().await;
1071        if let Some(pending) = pending_quotes.remove(&hash_key) {
1072            let _ = pending.response_tx.send(Some(NegotiatedQuote {
1073                peer_id: from_peer.to_string(),
1074                quote_id,
1075                mint_url: res.m,
1076            }));
1077        }
1078    }
1079
1080    async fn handle_response_message(&self, from_peer: &str, res: crate::protocol::DataResponse) {
1081        let hash_key = hash_to_key(&res.h);
1082        let hash = match crate::protocol::bytes_to_hash(&res.h) {
1083            Some(h) => h,
1084            None => return,
1085        };
1086
1087        // Ignore malformed/corrupt payload and keep waiting for a valid response.
1088        if hashtree_core::sha256(&res.d) != hash {
1089            self.peer_selector.write().await.record_failure(from_peer);
1090            if self.debug {
1091                println!("[GenericStore] Ignoring invalid response payload for {hash_key}");
1092            }
1093            return;
1094        }
1095
1096        self.complete_pending_response(from_peer, hash_key, res.d)
1097            .await;
1098    }
1099
1100    async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
1101        let hash = match crate::protocol::bytes_to_hash(&req.h) {
1102            Some(h) => h,
1103            None => return,
1104        };
1105        let hash_key = hash_to_key(&hash);
1106
1107        {
1108            let selector = self.peer_selector.read().await;
1109            if self.should_refuse_requests_from_peer(&selector, from_peer) {
1110                if self.debug {
1111                    println!(
1112                        "[GenericStore] Refusing quote request from delinquent peer {}",
1113                        from_peer
1114                    );
1115                }
1116                return;
1117            }
1118        }
1119
1120        let chosen_mint = self.choose_quote_mint(req.m.as_deref());
1121        let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
1122            && !self.should_drop_response(&hash)
1123            && !self.should_corrupt_response(&hash);
1124
1125        let res = if can_serve {
1126            let quote_id = self
1127                .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
1128                .await;
1129            create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
1130        } else {
1131            create_quote_response_unavailable(&hash)
1132        };
1133        let response_bytes = encode_quote_response(&res);
1134        if let Some(channel) = self.signaling.get_channel(from_peer).await {
1135            let _ = channel.send(response_bytes).await;
1136        }
1137    }
1138
1139    async fn handle_request_message(&self, from_peer: &str, req: crate::protocol::DataRequest) {
1140        let hash = match crate::protocol::bytes_to_hash(&req.h) {
1141            Some(h) => h,
1142            None => return,
1143        };
1144        let hash_key = hash_to_key(&hash);
1145
1146        {
1147            let selector = self.peer_selector.read().await;
1148            if self.should_refuse_requests_from_peer(&selector, from_peer) {
1149                if self.debug {
1150                    println!(
1151                        "[GenericStore] Refusing request from delinquent peer {}",
1152                        from_peer
1153                    );
1154                }
1155                return;
1156            }
1157        }
1158
1159        if let Some(quote_id) = req.q {
1160            if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
1161                if self.debug {
1162                    println!(
1163                        "[GenericStore] Refusing request with invalid or expired quote {} from {}",
1164                        quote_id, from_peer
1165                    );
1166                }
1167                return;
1168            }
1169        }
1170
1171        // Check local store
1172        if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
1173            if self.should_drop_response(&hash) {
1174                if self.debug {
1175                    println!(
1176                        "[GenericStore] Dropping response for {} due to actor profile",
1177                        hash_to_key(&hash)
1178                    );
1179                }
1180                return;
1181            }
1182
1183            let behavior = self.response_behavior();
1184            if behavior.extra_delay_ms > 0 {
1185                tokio::time::sleep(Duration::from_millis(behavior.extra_delay_ms)).await;
1186            }
1187
1188            if self.should_corrupt_response(&hash) {
1189                if data.is_empty() {
1190                    data.push(0x80);
1191                } else {
1192                    data[0] ^= 0x80;
1193                }
1194            }
1195
1196            // Send response
1197            let res = create_response(&hash, data);
1198            let response_bytes = encode_response(&res);
1199            if let Some(channel) = self.signaling.get_channel(from_peer).await {
1200                let _ = channel.send(response_bytes).await;
1201            }
1202        }
1203        // For now, don't forward - keep it simple
1204    }
1205
1206    /// Handle incoming data message
1207    pub async fn handle_data_message(&self, from_peer: &str, data: &[u8]) {
1208        let parsed = match parse_message(data) {
1209            Some(m) => m,
1210            None => return,
1211        };
1212
1213        match parsed {
1214            DataMessage::Request(req) => {
1215                self.handle_request_message(from_peer, req).await;
1216            }
1217            DataMessage::Response(res) => {
1218                self.handle_response_message(from_peer, res).await;
1219            }
1220            DataMessage::QuoteRequest(req) => {
1221                self.handle_quote_request_message(from_peer, req).await;
1222            }
1223            DataMessage::QuoteResponse(res) => {
1224                self.handle_quote_response_message(from_peer, res).await;
1225            }
1226        }
1227    }
1228}
1229
1230#[async_trait]
1231impl<S, R, F> Store for GenericStore<S, R, F>
1232where
1233    S: Store + Send + Sync + 'static,
1234    R: RelayTransport + Send + Sync + 'static,
1235    F: PeerConnectionFactory + Send + Sync + 'static,
1236{
1237    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
1238        self.local_store.put(hash, data).await
1239    }
1240
1241    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
1242        // Try local first
1243        if let Some(data) = self.local_store.get(hash).await? {
1244            return Ok(Some(data));
1245        }
1246
1247        // Try peers
1248        Ok(self.request_from_peers(hash).await)
1249    }
1250
1251    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
1252        self.local_store.has(hash).await
1253    }
1254
1255    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
1256        self.local_store.delete(hash).await
1257    }
1258}
1259
1260#[cfg(test)]
1261mod tests {
1262    use super::*;
1263    use hashtree_core::MemoryStore;
1264    use std::sync::Arc;
1265    use std::sync::OnceLock;
1266    use std::time::Duration;
1267
1268    type TestStore = GenericStore<
1269        MemoryStore,
1270        crate::mock::MockRelayTransport,
1271        crate::mock::MockConnectionFactory,
1272    >;
1273
1274    struct TestNode {
1275        store: Arc<TestStore>,
1276        local_store: Arc<MemoryStore>,
1277        transport: Arc<crate::mock::MockRelayTransport>,
1278    }
1279
1280    fn mock_network_lock() -> &'static tokio::sync::Mutex<()> {
1281        static LOCK: OnceLock<tokio::sync::Mutex<()>> = OnceLock::new();
1282        LOCK.get_or_init(|| tokio::sync::Mutex::new(()))
1283    }
1284
1285    fn make_test_store(local_store: Arc<MemoryStore>, node_id: &str) -> TestStore {
1286        make_test_store_with_routing(local_store, node_id, GenericStoreRoutingConfig::default())
1287    }
1288
1289    fn make_test_store_with_routing(
1290        local_store: Arc<MemoryStore>,
1291        node_id: &str,
1292        routing: GenericStoreRoutingConfig,
1293    ) -> TestStore {
1294        let relay = crate::mock::MockRelay::new();
1295        let transport = Arc::new(relay.create_transport(node_id.to_string(), node_id.to_string()));
1296        let conn_factory = Arc::new(crate::mock::MockConnectionFactory::new(
1297            node_id.to_string(),
1298            0,
1299        ));
1300        let signaling = Arc::new(crate::signaling::SignalingManager::new(
1301            node_id.to_string(),
1302            node_id.to_string(),
1303            transport,
1304            conn_factory,
1305            crate::types::PoolSettings::default(),
1306            false,
1307        ));
1308
1309        TestStore::new_with_routing(
1310            local_store,
1311            signaling,
1312            Duration::from_millis(200),
1313            false,
1314            routing,
1315        )
1316    }
1317
1318    fn make_shared_test_node(
1319        relay: Arc<crate::mock::MockRelay>,
1320        node_id: &str,
1321        routing: GenericStoreRoutingConfig,
1322    ) -> TestNode {
1323        let transport = Arc::new(relay.create_transport(node_id.to_string(), node_id.to_string()));
1324        let conn_factory = Arc::new(crate::mock::MockConnectionFactory::new(
1325            node_id.to_string(),
1326            0,
1327        ));
1328        let signaling = Arc::new(crate::signaling::SignalingManager::new(
1329            node_id.to_string(),
1330            node_id.to_string(),
1331            transport.clone(),
1332            conn_factory,
1333            crate::types::PoolSettings::default(),
1334            false,
1335        ));
1336        let local_store = Arc::new(MemoryStore::new());
1337        let store = Arc::new(TestStore::new_with_routing(
1338            local_store.clone(),
1339            signaling,
1340            Duration::from_millis(120),
1341            false,
1342            routing,
1343        ));
1344
1345        TestNode {
1346            store,
1347            local_store,
1348            transport,
1349        }
1350    }
1351
1352    async fn pump_test_signaling(nodes: &[&TestNode]) -> usize {
1353        let mut processed = 0usize;
1354        for node in nodes {
1355            while let Some(msg) = node.transport.try_recv() {
1356                node.store
1357                    .process_signaling(msg)
1358                    .await
1359                    .expect("process signaling");
1360                processed += 1;
1361            }
1362        }
1363        processed
1364    }
1365
1366    async fn pump_test_data(nodes: &[&TestNode]) -> usize {
1367        let mut processed = 0usize;
1368        for node in nodes {
1369            let peer_ids = node.store.signaling().peer_ids().await;
1370            for peer_id in peer_ids {
1371                let Some(channel) = node.store.signaling().get_channel(&peer_id).await else {
1372                    continue;
1373                };
1374                while let Some(data) = channel.try_recv() {
1375                    node.store.handle_data_message(&peer_id, &data).await;
1376                    processed += 1;
1377                }
1378            }
1379        }
1380        processed
1381    }
1382
1383    async fn pump_test_network(nodes: &[&TestNode], max_steps: usize) {
1384        for _ in 0..max_steps {
1385            let signaling = pump_test_signaling(nodes).await;
1386            let data = pump_test_data(nodes).await;
1387            if signaling + data == 0 {
1388                tokio::task::yield_now().await;
1389            }
1390        }
1391    }
1392
1393    async fn run_get_with_pumps(
1394        requester: Arc<TestStore>,
1395        hash: Hash,
1396        nodes: &[&TestNode],
1397    ) -> Option<Vec<u8>> {
1398        let task = tokio::spawn(async move { requester.get(&hash).await.ok().flatten() });
1399        let started = Instant::now();
1400
1401        loop {
1402            if task.is_finished() {
1403                return task.await.expect("request task join");
1404            }
1405
1406            if started.elapsed() > Duration::from_secs(1) {
1407                task.abort();
1408                return None;
1409            }
1410
1411            pump_test_network(nodes, 4).await;
1412        }
1413    }
1414
1415    async fn run_bad_peer_series(strategy: SelectionStrategy) -> usize {
1416        let _guard = mock_network_lock().lock().await;
1417        crate::mock::clear_channel_registry().await;
1418
1419        let relay = crate::mock::MockRelay::new();
1420        let requester = make_shared_test_node(
1421            relay.clone(),
1422            "requester-reject",
1423            GenericStoreRoutingConfig {
1424                selection_strategy: strategy,
1425                fairness_enabled: false,
1426                dispatch: RequestDispatchConfig {
1427                    initial_fanout: 1,
1428                    hedge_fanout: 1,
1429                    max_fanout: 1,
1430                    hedge_interval_ms: 5,
1431                },
1432                ..Default::default()
1433            },
1434        );
1435        let bad = make_shared_test_node(
1436            relay.clone(),
1437            "a-bad",
1438            GenericStoreRoutingConfig {
1439                response_behavior: ResponseBehaviorConfig {
1440                    drop_response_prob: 1.0,
1441                    ..Default::default()
1442                },
1443                ..Default::default()
1444            },
1445        );
1446        let honest = make_shared_test_node(relay, "b-honest", GenericStoreRoutingConfig::default());
1447        let nodes = [&requester, &bad, &honest];
1448
1449        for node in &nodes {
1450            node.transport
1451                .connect(&[])
1452                .await
1453                .expect("connect transport");
1454            node.store.start().await.expect("start store");
1455        }
1456        pump_test_network(&nodes, 24).await;
1457
1458        let mut successes = 0usize;
1459        for round in 0..6 {
1460            let payload = format!("payload-{round}").into_bytes();
1461            let hash = hashtree_core::sha256(&payload);
1462            let _ = bad.local_store.put(hash, payload.clone()).await;
1463            let _ = honest.local_store.put(hash, payload.clone()).await;
1464
1465            let result = run_get_with_pumps(requester.store.clone(), hash, &nodes).await;
1466            if result.as_ref() == Some(&payload) {
1467                successes += 1;
1468            }
1469        }
1470
1471        crate::mock::clear_channel_registry().await;
1472        successes
1473    }
1474
1475    #[test]
1476    fn test_hedged_wave_plan_flood_all() {
1477        let plan = build_hedged_wave_plan(7, RequestDispatchConfig::default());
1478        assert_eq!(plan, vec![7]);
1479    }
1480
1481    #[test]
1482    fn test_hedged_wave_plan_staged() {
1483        let plan = build_hedged_wave_plan(
1484            10,
1485            RequestDispatchConfig {
1486                initial_fanout: 2,
1487                hedge_fanout: 3,
1488                max_fanout: 8,
1489                hedge_interval_ms: 25,
1490            },
1491        );
1492        assert_eq!(plan, vec![2, 3, 3]);
1493    }
1494
1495    #[test]
1496    fn test_response_behavior_normalization_clamps_probs() {
1497        let raw = ResponseBehaviorConfig {
1498            drop_response_prob: -1.5,
1499            corrupt_response_prob: 9.0,
1500            extra_delay_ms: 12,
1501        };
1502        let normalized = raw.normalized();
1503        assert_eq!(normalized.drop_response_prob, 0.0);
1504        assert_eq!(normalized.corrupt_response_prob, 1.0);
1505        assert_eq!(normalized.extra_delay_ms, 12);
1506    }
1507
1508    #[test]
1509    fn test_actor_draw_is_deterministic_per_peer_hash_and_salt() {
1510        let hash = hashtree_core::sha256(b"deterministic");
1511        let a = TestStore::deterministic_actor_draw_for("peer-a", &hash, 7);
1512        let b = TestStore::deterministic_actor_draw_for("peer-a", &hash, 7);
1513        assert!((a - b).abs() < f64::EPSILON);
1514    }
1515
1516    #[tokio::test]
1517    async fn test_load_peer_metadata_returns_false_when_missing() {
1518        let local_store = Arc::new(MemoryStore::new());
1519        let store = make_test_store(local_store, "0");
1520        assert!(!store.load_peer_metadata().await.expect("load result"));
1521    }
1522
1523    #[tokio::test]
1524    async fn test_persist_and_load_peer_metadata_with_existing_store_adapter() {
1525        let local_store = Arc::new(MemoryStore::new());
1526        let writer = make_test_store(local_store.clone(), "0");
1527        {
1528            let mut selector = writer.peer_selector.write().await;
1529            selector.add_peer("npub1stable:session-a");
1530            selector.record_request("npub1stable:session-a", 64);
1531            selector.record_success("npub1stable:session-a", 35, 1024);
1532            selector.record_cashu_payment("npub1stable:session-a", 120);
1533            selector.record_cashu_receipt("npub1stable:session-a", 40);
1534            selector.record_cashu_payment_default("npub1stable:session-a");
1535        }
1536
1537        let snapshot_hash = writer
1538            .persist_peer_metadata()
1539            .await
1540            .expect("persist peer metadata");
1541        assert!(local_store
1542            .get(&snapshot_hash)
1543            .await
1544            .expect("snapshot lookup")
1545            .is_some());
1546
1547        let reader = make_test_store(local_store, "1");
1548        assert!(reader
1549            .load_peer_metadata()
1550            .await
1551            .expect("load peer metadata snapshot"));
1552
1553        let mut selector = reader.peer_selector.write().await;
1554        selector.add_peer("npub1stable:session-b");
1555        let stats = selector
1556            .get_stats("npub1stable:session-b")
1557            .expect("restored peer stats");
1558        assert_eq!(stats.requests_sent, 1);
1559        assert_eq!(stats.successes, 1);
1560        assert_eq!(stats.cashu_paid_sat, 120);
1561        assert_eq!(stats.cashu_received_sat, 40);
1562        assert_eq!(stats.cashu_payment_receipts, 1);
1563        assert_eq!(stats.cashu_payment_defaults, 1);
1564    }
1565
1566    #[tokio::test]
1567    async fn test_should_refuse_requests_from_peer_after_payment_defaults() {
1568        let local_store = Arc::new(MemoryStore::new());
1569        let store = make_test_store_with_routing(
1570            local_store,
1571            "0",
1572            GenericStoreRoutingConfig {
1573                cashu_payment_default_block_threshold: 1,
1574                ..Default::default()
1575            },
1576        );
1577        store.record_cashu_payment_default_from_peer("peer-a").await;
1578
1579        let selector = store.peer_selector.read().await;
1580        assert!(store.should_refuse_requests_from_peer(&selector, "peer-a"));
1581        assert!(!store.should_refuse_requests_from_peer(&selector, "peer-b"));
1582    }
1583
1584    #[tokio::test]
1585    async fn test_take_valid_quote_consumes_once_and_rejects_expired_quotes() {
1586        let local_store = Arc::new(MemoryStore::new());
1587        let store = make_test_store(local_store, "0");
1588        let hash = hashtree_core::sha256(b"quote-test");
1589        let hash_key = hash_to_key(&hash);
1590
1591        {
1592            let mut issued = store.issued_quotes.write().await;
1593            issued.insert(
1594                ("peer-a".to_string(), hash_key.clone(), 11),
1595                IssuedQuote {
1596                    expires_at: Instant::now() + Duration::from_secs(1),
1597                    payment_sat: 5,
1598                    mint_url: Some("https://mint-a.example".to_string()),
1599                },
1600            );
1601            issued.insert(
1602                ("peer-a".to_string(), hash_key.clone(), 12),
1603                IssuedQuote {
1604                    expires_at: Instant::now() - Duration::from_millis(1),
1605                    payment_sat: 5,
1606                    mint_url: Some("https://mint-a.example".to_string()),
1607                },
1608            );
1609        }
1610
1611        assert!(store.take_valid_quote("peer-a", &hash_key, 11).await);
1612        assert!(!store.take_valid_quote("peer-a", &hash_key, 11).await);
1613        assert!(!store.take_valid_quote("peer-a", &hash_key, 12).await);
1614    }
1615
1616    async fn run_quote_with_pumps(
1617        requester: Arc<TestStore>,
1618        hash: Hash,
1619        payment_sat: u64,
1620        quote_ttl: Duration,
1621        peer_ids: Vec<String>,
1622        nodes: &[&TestNode],
1623    ) -> Option<NegotiatedQuote> {
1624        let task = tokio::spawn(async move {
1625            requester
1626                .request_quote_from_peers(&hash, payment_sat, quote_ttl, &peer_ids)
1627                .await
1628        });
1629        let started = Instant::now();
1630
1631        loop {
1632            if task.is_finished() {
1633                return task.await.expect("quote task join");
1634            }
1635            if started.elapsed() > Duration::from_secs(1) {
1636                task.abort();
1637                return None;
1638            }
1639            pump_test_network(nodes, 4).await;
1640        }
1641    }
1642
1643    #[tokio::test]
1644    async fn test_request_quote_from_peers_rejects_unaccepted_mint() {
1645        let _guard = mock_network_lock().lock().await;
1646        crate::mock::clear_channel_registry().await;
1647
1648        let relay = crate::mock::MockRelay::new();
1649        let requester = make_shared_test_node(
1650            relay.clone(),
1651            "requester-reject",
1652            GenericStoreRoutingConfig {
1653                cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1654                cashu_default_mint: Some("https://mint-a.example".to_string()),
1655                dispatch: RequestDispatchConfig {
1656                    initial_fanout: 1,
1657                    hedge_fanout: 1,
1658                    max_fanout: 1,
1659                    hedge_interval_ms: 5,
1660                },
1661                ..Default::default()
1662            },
1663        );
1664        let provider = make_shared_test_node(
1665            relay,
1666            "provider-reject",
1667            GenericStoreRoutingConfig {
1668                cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1669                ..Default::default()
1670            },
1671        );
1672        let nodes = [&requester, &provider];
1673
1674        requester.transport.connect(&[]).await.expect("connect");
1675        provider.transport.connect(&[]).await.expect("connect");
1676        requester.store.start().await.expect("start");
1677        provider.store.start().await.expect("start");
1678        pump_test_network(&nodes, 24).await;
1679
1680        let payload = b"quoted-data".to_vec();
1681        let hash = hashtree_core::sha256(&payload);
1682        provider.local_store.put(hash, payload).await.expect("put");
1683
1684        let quote = run_quote_with_pumps(
1685            requester.store.clone(),
1686            hash,
1687            9,
1688            Duration::from_millis(80),
1689            vec!["provider-reject".to_string()],
1690            &nodes,
1691        )
1692        .await;
1693        assert!(
1694            quote.is_none(),
1695            "expected quote to be rejected on mint mismatch"
1696        );
1697    }
1698
1699    #[tokio::test]
1700    async fn test_request_quote_from_peers_accepts_small_peer_suggested_mint_under_cap() {
1701        let _guard = mock_network_lock().lock().await;
1702        crate::mock::clear_channel_registry().await;
1703
1704        let relay = crate::mock::MockRelay::new();
1705        let requester = make_shared_test_node(
1706            relay.clone(),
1707            "requester-suggested",
1708            GenericStoreRoutingConfig {
1709                cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1710                cashu_default_mint: Some("https://mint-a.example".to_string()),
1711                cashu_peer_suggested_mint_base_cap_sat: 3,
1712                cashu_peer_suggested_mint_max_cap_sat: 3,
1713                dispatch: RequestDispatchConfig {
1714                    initial_fanout: 1,
1715                    hedge_fanout: 1,
1716                    max_fanout: 1,
1717                    hedge_interval_ms: 5,
1718                },
1719                ..Default::default()
1720            },
1721        );
1722        let provider = make_shared_test_node(
1723            relay,
1724            "provider-suggested",
1725            GenericStoreRoutingConfig {
1726                cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1727                cashu_default_mint: Some("https://mint-b.example".to_string()),
1728                ..Default::default()
1729            },
1730        );
1731        let nodes = [&requester, &provider];
1732
1733        requester.transport.connect(&[]).await.expect("connect");
1734        provider.transport.connect(&[]).await.expect("connect");
1735        requester.store.start().await.expect("start");
1736        provider.store.start().await.expect("start");
1737        pump_test_network(&nodes, 24).await;
1738
1739        let payload = b"quoted-data".to_vec();
1740        let hash = hashtree_core::sha256(&payload);
1741        provider.local_store.put(hash, payload).await.expect("put");
1742
1743        let quote = run_quote_with_pumps(
1744            requester.store.clone(),
1745            hash,
1746            3,
1747            Duration::from_millis(80),
1748            vec!["provider-suggested".to_string()],
1749            &nodes,
1750        )
1751        .await
1752        .expect("expected bounded peer-suggested mint quote");
1753        assert_eq!(quote.mint_url.as_deref(), Some("https://mint-b.example"));
1754    }
1755
1756    #[tokio::test]
1757    async fn test_request_quote_from_peers_scales_peer_suggested_mint_cap_with_reputation() {
1758        let _guard = mock_network_lock().lock().await;
1759        crate::mock::clear_channel_registry().await;
1760
1761        let relay = crate::mock::MockRelay::new();
1762        let requester = make_shared_test_node(
1763            relay.clone(),
1764            "requester-reputation",
1765            GenericStoreRoutingConfig {
1766                cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1767                cashu_default_mint: Some("https://mint-a.example".to_string()),
1768                cashu_peer_suggested_mint_base_cap_sat: 1,
1769                cashu_peer_suggested_mint_success_step_sat: 1,
1770                cashu_peer_suggested_mint_receipt_step_sat: 2,
1771                cashu_peer_suggested_mint_max_cap_sat: 5,
1772                dispatch: RequestDispatchConfig {
1773                    initial_fanout: 1,
1774                    hedge_fanout: 1,
1775                    max_fanout: 1,
1776                    hedge_interval_ms: 5,
1777                },
1778                ..Default::default()
1779            },
1780        );
1781        let provider = make_shared_test_node(
1782            relay,
1783            "provider-reputation",
1784            GenericStoreRoutingConfig {
1785                cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1786                cashu_default_mint: Some("https://mint-b.example".to_string()),
1787                ..Default::default()
1788            },
1789        );
1790        let nodes = [&requester, &provider];
1791
1792        requester.transport.connect(&[]).await.expect("connect");
1793        provider.transport.connect(&[]).await.expect("connect");
1794        requester.store.start().await.expect("start");
1795        provider.store.start().await.expect("start");
1796        pump_test_network(&nodes, 24).await;
1797
1798        let payload = b"quoted-data".to_vec();
1799        let hash = hashtree_core::sha256(&payload);
1800        provider.local_store.put(hash, payload).await.expect("put");
1801
1802        let quote = run_quote_with_pumps(
1803            requester.store.clone(),
1804            hash,
1805            4,
1806            Duration::from_millis(80),
1807            vec!["provider-reputation".to_string()],
1808            &nodes,
1809        )
1810        .await;
1811        assert!(
1812            quote.is_none(),
1813            "new peer should not get a 4 sat untrusted-mint quote"
1814        );
1815
1816        {
1817            let mut selector = requester.store.peer_selector.write().await;
1818            selector.add_peer("provider-reputation");
1819            selector.record_success("provider-reputation", 20, 1024);
1820            selector.record_cashu_receipt("provider-reputation", 2);
1821        }
1822
1823        let quote = run_quote_with_pumps(
1824            requester.store.clone(),
1825            hash,
1826            4,
1827            Duration::from_millis(80),
1828            vec!["provider-reputation".to_string()],
1829            &nodes,
1830        )
1831        .await
1832        .expect("reputable peer should get larger bounded quote");
1833        assert_eq!(quote.mint_url.as_deref(), Some("https://mint-b.example"));
1834
1835        requester
1836            .store
1837            .record_cashu_payment_default_from_peer("provider-reputation")
1838            .await;
1839        let quote = run_quote_with_pumps(
1840            requester.store.clone(),
1841            hash,
1842            4,
1843            Duration::from_millis(80),
1844            vec!["provider-reputation".to_string()],
1845            &nodes,
1846        )
1847        .await;
1848        assert!(
1849            quote.is_none(),
1850            "peer-suggested mint exposure should drop to zero after defaults exceed receipts"
1851        );
1852    }
1853
1854    #[tokio::test]
1855    async fn test_request_quote_from_peers_returns_matching_mint() {
1856        let _guard = mock_network_lock().lock().await;
1857        crate::mock::clear_channel_registry().await;
1858
1859        let relay = crate::mock::MockRelay::new();
1860        let requester = make_shared_test_node(
1861            relay.clone(),
1862            "requester-match",
1863            GenericStoreRoutingConfig {
1864                cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1865                cashu_default_mint: Some("https://mint-a.example".to_string()),
1866                dispatch: RequestDispatchConfig {
1867                    initial_fanout: 1,
1868                    hedge_fanout: 1,
1869                    max_fanout: 1,
1870                    hedge_interval_ms: 5,
1871                },
1872                ..Default::default()
1873            },
1874        );
1875        let provider = make_shared_test_node(
1876            relay,
1877            "provider-match",
1878            GenericStoreRoutingConfig {
1879                cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1880                ..Default::default()
1881            },
1882        );
1883        let nodes = [&requester, &provider];
1884
1885        requester.transport.connect(&[]).await.expect("connect");
1886        provider.transport.connect(&[]).await.expect("connect");
1887        requester.store.start().await.expect("start");
1888        provider.store.start().await.expect("start");
1889        pump_test_network(&nodes, 24).await;
1890
1891        let payload = b"quoted-data".to_vec();
1892        let hash = hashtree_core::sha256(&payload);
1893        provider.local_store.put(hash, payload).await.expect("put");
1894
1895        let quote = run_quote_with_pumps(
1896            requester.store.clone(),
1897            hash,
1898            9,
1899            Duration::from_millis(80),
1900            vec!["provider-match".to_string()],
1901            &nodes,
1902        )
1903        .await
1904        .expect("expected quote");
1905        assert_eq!(quote.mint_url.as_deref(), Some("https://mint-a.example"));
1906    }
1907
1908    #[tokio::test]
1909    async fn test_tit_for_tat_store_path_recovers_after_bad_peer_observation() {
1910        let tit_for_tat_successes = run_bad_peer_series(SelectionStrategy::TitForTat).await;
1911
1912        assert!(
1913            tit_for_tat_successes >= 5,
1914            "expected tit-for-tat path to recover after the first consistently bad peer observation (successes={tit_for_tat_successes})"
1915        );
1916    }
1917}
1918
1919/// Type alias for simulation store
1920pub type SimStore<S> =
1921    GenericStore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
1922
1923/// Type alias for production store (using real WebRTC)
1924pub type ProductionStore<S> = GenericStore<
1925    S,
1926    crate::nostr::NostrRelayTransport,
1927    crate::real_factory::RealPeerConnectionFactory,
1928>;