Skip to main content

hashtree_network/
mesh_store_core.rs

1//! Shared routed mesh store core.
2//!
3//! This module provides a concrete store wrapper that works with any local storage
4//! backend plus any signaling transport and peer-link factory. Both production
5//! (Nostr websockets + WebRTC) and simulation (mocks) use this same code.
6
7use async_trait::async_trait;
8use std::collections::hash_map::DefaultHasher;
9use std::collections::{HashMap, HashSet};
10use std::future::Future;
11use std::hash::{Hash as _, Hasher};
12use std::ops::Range;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use tokio::sync::{oneshot, Mutex, RwLock};
16
17use hashtree_core::{Hash, Store, StoreError};
18
19use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
20use crate::protocol::{
21    create_quote_request, create_quote_response_available, create_quote_response_unavailable,
22    create_request, create_request_with_quote, create_response, encode_quote_request,
23    encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
24    DataMessage, DataQuoteRequest, DataQuoteResponse,
25};
26use crate::signaling::MeshRouter;
27use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
28use crate::types::{PeerHTLConfig, SignalingMessage, MAX_HTL};
29
30// Keep the on-disk namespace stable across the crate rename so existing peer
31// metadata does not disappear for users upgrading from the old package name.
32const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
33
34/// Pending request awaiting response
35struct PendingRequest {
36    response_tx: oneshot::Sender<Option<Vec<u8>>>,
37    started_at: Instant,
38    queried_peers: Vec<String>,
39}
40
41struct PendingQuoteRequest {
42    response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
43    preferred_mint_url: Option<String>,
44    offered_payment_sat: u64,
45}
46
47#[derive(Debug, Clone)]
48struct NegotiatedQuote {
49    peer_id: String,
50    quote_id: u64,
51    #[allow(dead_code)]
52    mint_url: Option<String>,
53}
54
55struct IssuedQuote {
56    expires_at: Instant,
57    #[allow(dead_code)]
58    payment_sat: u64,
59    #[allow(dead_code)]
60    mint_url: Option<String>,
61}
62
63/// Aggregate stats from draining currently available peer-link messages.
64#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
65pub struct DataPumpStats {
66    pub processed: usize,
67    pub request_messages: usize,
68    pub response_messages: usize,
69    pub quote_request_messages: u64,
70    pub quote_response_messages: u64,
71    pub processed_bytes: u64,
72}
73
74/// Request dispatch strategy for peer queries.
75///
76/// `MeshStoreCore` supports two practical retrieval modes:
77/// - Flood (`usize::MAX` fanout): maximize success/latency at bandwidth cost.
78/// - Staged hedging: probe a subset first, then expand.
79#[derive(Debug, Clone, Copy)]
80pub struct RequestDispatchConfig {
81    /// Number of peers queried immediately.
82    pub initial_fanout: usize,
83    /// Number of additional peers to query on each hedge step.
84    pub hedge_fanout: usize,
85    /// Total peers allowed for this request.
86    pub max_fanout: usize,
87    /// Delay between hedge waves (ms). `0` means send all waves immediately.
88    pub hedge_interval_ms: u64,
89}
90
91impl Default for RequestDispatchConfig {
92    fn default() -> Self {
93        Self {
94            initial_fanout: usize::MAX,
95            hedge_fanout: usize::MAX,
96            max_fanout: usize::MAX,
97            hedge_interval_ms: 0,
98        }
99    }
100}
101
102/// Normalize fanout config against current peer availability.
103pub fn normalize_dispatch_config(
104    dispatch: RequestDispatchConfig,
105    available_peers: usize,
106) -> RequestDispatchConfig {
107    let mut cfg = dispatch;
108    let cap = if cfg.max_fanout == 0 {
109        available_peers
110    } else {
111        cfg.max_fanout.min(available_peers)
112    };
113    cfg.max_fanout = cap;
114    cfg.initial_fanout = if cfg.initial_fanout == 0 {
115        1
116    } else {
117        cfg.initial_fanout.min(cap.max(1))
118    };
119    cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
120        1
121    } else {
122        cfg.hedge_fanout.min(cap.max(1))
123    };
124    cfg
125}
126
127/// Build wave sizes for staged hedged dispatch.
128pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
129    if peer_count == 0 {
130        return Vec::new();
131    }
132    let cap = dispatch.max_fanout.min(peer_count);
133    if cap == 0 {
134        return Vec::new();
135    }
136
137    let mut plan = Vec::new();
138    let mut sent = 0usize;
139    let first = dispatch.initial_fanout.min(cap).max(1);
140    plan.push(first);
141    sent += first;
142
143    while sent < cap {
144        let next = dispatch.hedge_fanout.min(cap - sent).max(1);
145        plan.push(next);
146        sent += next;
147    }
148    plan
149}
150
151/// Outcome returned after waiting on a hedged dispatch wave.
152#[derive(Debug)]
153pub enum HedgedWaveAction<T> {
154    Continue,
155    Success(T),
156    Abort,
157}
158
159/// Run a staged hedged dispatch over peer index ranges.
160///
161/// This scheduler is shared by the reusable `MeshStoreCore` and the native
162/// `hashtree-cli` mesh path so tests and production use the same wave timing.
163pub async fn run_hedged_waves<T, SendWave, SendWaveFut, WaitWave, WaitWaveFut>(
164    peer_count: usize,
165    dispatch: RequestDispatchConfig,
166    request_timeout: Duration,
167    mut send_wave: SendWave,
168    mut wait_wave: WaitWave,
169) -> Option<T>
170where
171    SendWave: FnMut(Range<usize>) -> SendWaveFut,
172    SendWaveFut: Future<Output = usize>,
173    WaitWave: FnMut(Duration) -> WaitWaveFut,
174    WaitWaveFut: Future<Output = HedgedWaveAction<T>>,
175{
176    let dispatch = normalize_dispatch_config(dispatch, peer_count);
177    let wave_plan = build_hedged_wave_plan(peer_count, dispatch);
178    if wave_plan.is_empty() {
179        return None;
180    }
181
182    let deadline = Instant::now() + request_timeout;
183    let mut sent_total = 0usize;
184    let mut next_peer_idx = 0usize;
185
186    for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
187        let from = next_peer_idx;
188        let to = (next_peer_idx + wave_size).min(peer_count);
189        next_peer_idx = to;
190
191        if from == to {
192            continue;
193        }
194
195        sent_total += send_wave(from..to).await;
196        if sent_total == 0 {
197            if next_peer_idx >= peer_count {
198                break;
199            }
200            continue;
201        }
202
203        let now = Instant::now();
204        if now >= deadline {
205            break;
206        }
207        let remaining = deadline.saturating_duration_since(now);
208        let is_last_wave = wave_idx + 1 == wave_plan.len() || next_peer_idx >= peer_count;
209        let wait = if is_last_wave {
210            remaining
211        } else if dispatch.hedge_interval_ms == 0 {
212            Duration::ZERO
213        } else {
214            Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
215        };
216
217        if wait.is_zero() {
218            continue;
219        }
220
221        match wait_wave(wait).await {
222            HedgedWaveAction::Continue => {}
223            HedgedWaveAction::Success(value) => return Some(value),
224            HedgedWaveAction::Abort => break,
225        }
226    }
227
228    None
229}
230
231/// Keep selector membership aligned with currently connected peer IDs.
232pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
233    let mut selector = selector.write().await;
234    let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
235    let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
236    for peer_id in known {
237        if !current.contains(peer_id.as_str()) {
238            selector.remove_peer(&peer_id);
239        }
240    }
241    for peer_id in current_peer_ids {
242        selector.add_peer(peer_id.clone());
243    }
244}
245
246/// Response behavior profile for simulation/game-theory actors.
247///
248/// Defaults to honest behavior (always respond correctly, no extra delay).
249#[derive(Debug, Clone, Copy)]
250pub struct ResponseBehaviorConfig {
251    /// Probability that a node drops a response even when it has data.
252    pub drop_response_prob: f64,
253    /// Probability that a node responds with corrupted payload.
254    pub corrupt_response_prob: f64,
255    /// Optional response delay to model slow/incompetent peers.
256    pub extra_delay_ms: u64,
257}
258
259impl Default for ResponseBehaviorConfig {
260    fn default() -> Self {
261        Self {
262            drop_response_prob: 0.0,
263            corrupt_response_prob: 0.0,
264            extra_delay_ms: 0,
265        }
266    }
267}
268
269impl ResponseBehaviorConfig {
270    fn normalized(self) -> Self {
271        Self {
272            drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
273            corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
274            extra_delay_ms: self.extra_delay_ms,
275        }
276    }
277}
278
279/// Routing policy for request ordering + dispatch fanout.
280#[derive(Debug, Clone)]
281pub struct MeshRoutingConfig {
282    pub selection_strategy: SelectionStrategy,
283    pub fairness_enabled: bool,
284    /// Blend weight for payment-priority ranking in selector (`0.0` disables).
285    pub cashu_payment_weight: f64,
286    /// Refuse serving peers that have reached this many unpaid post-delivery settlements.
287    /// `0` disables refusal and only keeps metadata/downranking.
288    pub cashu_payment_default_block_threshold: u64,
289    /// Cashu mint URLs this node is willing to use for settlement.
290    pub cashu_accepted_mints: Vec<String>,
291    /// Preferred Cashu mint URL when initiating paid retrieval.
292    pub cashu_default_mint: Option<String>,
293    /// Baseline cap for accepting a peer-suggested mint outside the trusted set.
294    pub cashu_peer_suggested_mint_base_cap_sat: u64,
295    /// Additional sats allowed per successful delivery from that peer.
296    pub cashu_peer_suggested_mint_success_step_sat: u64,
297    /// Additional sats allowed per successful post-delivery payment received from that peer.
298    pub cashu_peer_suggested_mint_receipt_step_sat: u64,
299    /// Hard upper bound for any single peer-suggested mint quote we accept.
300    pub cashu_peer_suggested_mint_max_cap_sat: u64,
301    pub dispatch: RequestDispatchConfig,
302    pub response_behavior: ResponseBehaviorConfig,
303}
304
305impl Default for MeshRoutingConfig {
306    fn default() -> Self {
307        Self {
308            selection_strategy: SelectionStrategy::Weighted,
309            fairness_enabled: true,
310            cashu_payment_weight: 0.0,
311            cashu_payment_default_block_threshold: 0,
312            cashu_accepted_mints: Vec::new(),
313            cashu_default_mint: None,
314            cashu_peer_suggested_mint_base_cap_sat: 0,
315            cashu_peer_suggested_mint_success_step_sat: 0,
316            cashu_peer_suggested_mint_receipt_step_sat: 0,
317            cashu_peer_suggested_mint_max_cap_sat: 0,
318            dispatch: RequestDispatchConfig::default(),
319            response_behavior: ResponseBehaviorConfig::default(),
320        }
321    }
322}
323
324/// Routed mesh store core that works with any storage backend and transport
325/// implementation.
326///
327/// This is the shared code between production and simulation.
328/// - Production: `MeshStoreCore<LmdbStore, NostrRelayTransport, WebRtcPeerLinkFactory>`
329/// - Simulation: `MeshStoreCore<MemoryStore, MockRelayTransport, MockConnectionFactory>`
330pub struct MeshStoreCore<S, R, F>
331where
332    S: Store + Send + Sync + 'static,
333    R: SignalingTransport + Send + Sync + 'static,
334    F: PeerLinkFactory + Send + Sync + 'static,
335{
336    /// Local backing store
337    local_store: Arc<S>,
338    /// Mesh router (handles peer discovery and connection)
339    signaling: Arc<MeshRouter<R, F>>,
340    /// Per-peer HTL config
341    htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
342    /// Pending requests we sent
343    pending_requests: RwLock<HashMap<String, PendingRequest>>,
344    /// Pending quote negotiations keyed by requested hash.
345    pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
346    /// Quotes we issued to peers and will accept exactly once until expiry.
347    issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
348    /// Monotonic quote identifier generator.
349    next_quote_id: RwLock<u64>,
350    /// Adaptive selector for peer ordering.
351    peer_selector: RwLock<PeerSelector>,
352    /// Routing/dispatch configuration.
353    routing: MeshRoutingConfig,
354    /// Request timeout
355    request_timeout: Duration,
356    /// Debug mode
357    debug: bool,
358    /// Running flag
359    running: RwLock<bool>,
360}
361
362impl<S, R, F> MeshStoreCore<S, R, F>
363where
364    S: Store + Send + Sync + 'static,
365    R: SignalingTransport + Send + Sync + 'static,
366    F: PeerLinkFactory + Send + Sync + 'static,
367{
368    /// Create a new routed mesh store core.
369    pub fn new(
370        local_store: Arc<S>,
371        signaling: Arc<MeshRouter<R, F>>,
372        request_timeout: Duration,
373        debug: bool,
374    ) -> Self {
375        Self::new_with_routing(
376            local_store,
377            signaling,
378            request_timeout,
379            debug,
380            Default::default(),
381        )
382    }
383
384    /// Create a new routed mesh store core with explicit routing configuration.
385    pub fn new_with_routing(
386        local_store: Arc<S>,
387        signaling: Arc<MeshRouter<R, F>>,
388        request_timeout: Duration,
389        debug: bool,
390        routing: MeshRoutingConfig,
391    ) -> Self {
392        let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
393        selector.set_fairness(routing.fairness_enabled);
394        selector.set_cashu_payment_weight(routing.cashu_payment_weight);
395        Self {
396            local_store,
397            signaling,
398            htl_configs: RwLock::new(HashMap::new()),
399            pending_requests: RwLock::new(HashMap::new()),
400            pending_quotes: RwLock::new(HashMap::new()),
401            issued_quotes: RwLock::new(HashMap::new()),
402            next_quote_id: RwLock::new(1),
403            peer_selector: RwLock::new(selector),
404            routing,
405            request_timeout,
406            debug,
407            running: RwLock::new(false),
408        }
409    }
410
411    /// Start the store (begin listening for messages)
412    pub async fn start(&self) -> Result<(), TransportError> {
413        *self.running.write().await = true;
414
415        // Send initial hello
416        self.signaling.send_hello(vec![]).await?;
417
418        Ok(())
419    }
420
421    /// Stop the store
422    pub async fn stop(&self) {
423        *self.running.write().await = false;
424    }
425
426    /// Process incoming signaling message
427    pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
428        // When a new peer connects, initialize their HTL config
429        let peer_id = msg.peer_id().to_string();
430        {
431            let mut configs = self.htl_configs.write().await;
432            if !configs.contains_key(&peer_id) {
433                configs.insert(peer_id.clone(), PeerHTLConfig::random());
434            }
435        }
436        self.peer_selector.write().await.add_peer(peer_id);
437
438        self.signaling.handle_message(msg).await
439    }
440
441    /// Get signaling manager reference
442    pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
443        &self.signaling
444    }
445
446    fn response_behavior(&self) -> ResponseBehaviorConfig {
447        self.routing.response_behavior.normalized()
448    }
449
450    fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
451        let mut hasher = DefaultHasher::new();
452        peer_id.hash(&mut hasher);
453        hash.hash(&mut hasher);
454        salt.hash(&mut hasher);
455        let v = hasher.finish();
456        (v as f64) / (u64::MAX as f64)
457    }
458
459    fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
460        Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
461    }
462
463    fn peer_metadata_pointer_slot_hash() -> Hash {
464        hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
465    }
466
467    fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
468        let bytes = hex::decode(hash_hex)
469            .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
470        if bytes.len() != 32 {
471            return Err(StoreError::Other(format!(
472                "Invalid hash length {}, expected 32 bytes",
473                bytes.len()
474            )));
475        }
476        let mut hash = [0u8; 32];
477        hash.copy_from_slice(&bytes);
478        Ok(hash)
479    }
480
481    fn should_drop_response(&self, hash: &Hash) -> bool {
482        let p = self.response_behavior().drop_response_prob;
483        if p <= 0.0 {
484            return false;
485        }
486        self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
487    }
488
489    fn should_corrupt_response(&self, hash: &Hash) -> bool {
490        let p = self.response_behavior().corrupt_response_prob;
491        if p <= 0.0 {
492            return false;
493        }
494        self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
495    }
496
497    async fn ordered_connected_peers(&self) -> Vec<String> {
498        let current_peer_ids = self.signaling.peer_ids().await;
499        if current_peer_ids.is_empty() {
500            return Vec::new();
501        }
502
503        sync_selector_peers(&self.peer_selector, &current_peer_ids).await;
504        let current_set: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
505        let mut ordered_peer_ids = self.peer_selector.write().await.select_peers();
506        ordered_peer_ids.retain(|peer_id| current_set.contains(peer_id.as_str()));
507        if ordered_peer_ids.is_empty() {
508            let mut fallback = current_peer_ids;
509            fallback.sort();
510            return fallback;
511        }
512        ordered_peer_ids
513    }
514
515    fn requested_quote_mint(&self) -> Option<&str> {
516        if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
517            if self.routing.cashu_accepted_mints.is_empty()
518                || self
519                    .routing
520                    .cashu_accepted_mints
521                    .iter()
522                    .any(|mint| mint == default_mint)
523            {
524                return Some(default_mint);
525            }
526        }
527
528        self.routing
529            .cashu_accepted_mints
530            .first()
531            .map(String::as_str)
532    }
533
534    fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
535        if let Some(requested_mint) = requested_mint {
536            if self.accepts_quote_mint(Some(requested_mint)) {
537                return Some(requested_mint.to_string());
538            }
539        }
540        if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
541            return Some(default_mint.clone());
542        }
543        if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
544            return Some(first_mint.clone());
545        }
546        requested_mint.map(str::to_string)
547    }
548
549    fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
550        if self.routing.cashu_accepted_mints.is_empty() {
551            return true;
552        }
553
554        let Some(mint_url) = mint_url else {
555            return false;
556        };
557        self.routing
558            .cashu_accepted_mints
559            .iter()
560            .any(|mint| mint == mint_url)
561    }
562
563    fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
564        let Some(mint_url) = mint_url else {
565            return self.routing.cashu_default_mint.is_none()
566                && self.routing.cashu_accepted_mints.is_empty();
567        };
568        self.routing.cashu_default_mint.as_deref() == Some(mint_url)
569            || self
570                .routing
571                .cashu_accepted_mints
572                .iter()
573                .any(|mint| mint == mint_url)
574    }
575
576    async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
577        let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
578        if base == 0 {
579            return 0;
580        }
581
582        let selector = self.peer_selector.read().await;
583        let Some(stats) = selector.get_stats(peer_id) else {
584            let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
585            return if max_cap > 0 { base.min(max_cap) } else { base };
586        };
587
588        if stats.cashu_payment_defaults > 0
589            && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
590        {
591            return 0;
592        }
593
594        let success_bonus = stats
595            .successes
596            .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
597        let receipt_bonus = stats
598            .cashu_payment_receipts
599            .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
600        let mut cap = base
601            .saturating_add(success_bonus)
602            .saturating_add(receipt_bonus);
603        let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
604        if max_cap > 0 {
605            cap = cap.min(max_cap);
606        }
607        cap
608    }
609
610    async fn should_accept_quote_response(
611        &self,
612        from_peer: &str,
613        preferred_mint_url: Option<&str>,
614        offered_payment_sat: u64,
615        res: &DataQuoteResponse,
616    ) -> bool {
617        let Some(payment_sat) = res.p else {
618            return false;
619        };
620        if payment_sat > offered_payment_sat {
621            return false;
622        }
623
624        let response_mint = res.m.as_deref();
625        if response_mint == preferred_mint_url {
626            return true;
627        }
628        if self.trusts_quote_mint(response_mint) {
629            return true;
630        }
631        if response_mint.is_none() {
632            return false;
633        }
634
635        payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
636    }
637
638    async fn issue_quote(
639        &self,
640        peer_id: &str,
641        hash_key: &str,
642        payment_sat: u64,
643        ttl_ms: u32,
644        mint_url: Option<&str>,
645    ) -> u64 {
646        let quote_id = {
647            let mut next = self.next_quote_id.write().await;
648            let quote_id = *next;
649            *next = next.saturating_add(1);
650            quote_id
651        };
652
653        let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
654        self.issued_quotes.write().await.insert(
655            (peer_id.to_string(), hash_key.to_string(), quote_id),
656            IssuedQuote {
657                expires_at,
658                payment_sat,
659                mint_url: mint_url.map(str::to_string),
660            },
661        );
662        quote_id
663    }
664
665    async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
666        let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
667        let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
668            return false;
669        };
670        quote.expires_at > Instant::now()
671    }
672
673    async fn send_request_to_peer(
674        &self,
675        peer_id: &str,
676        hash: &Hash,
677        quote_id: Option<u64>,
678    ) -> bool {
679        let channel = match self.signaling.get_channel(peer_id).await {
680            Some(c) => c,
681            None => return false,
682        };
683
684        let htl_config = {
685            let configs = self.htl_configs.read().await;
686            configs
687                .get(peer_id)
688                .cloned()
689                .unwrap_or_else(PeerHTLConfig::random)
690        };
691
692        let send_htl = htl_config.decrement(MAX_HTL);
693        let req = match quote_id {
694            Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
695            None => create_request(hash, send_htl),
696        };
697        let request_bytes = encode_request(&req);
698
699        {
700            let mut selector = self.peer_selector.write().await;
701            selector.record_request(peer_id, request_bytes.len() as u64);
702        }
703
704        match channel.send(request_bytes).await {
705            Ok(()) => true,
706            Err(_) => {
707                self.peer_selector.write().await.record_failure(peer_id);
708                false
709            }
710        }
711    }
712
713    async fn send_quote_request_to_peer(
714        &self,
715        peer_id: &str,
716        hash: &Hash,
717        payment_sat: u64,
718        ttl_ms: u32,
719        mint_url: Option<&str>,
720    ) -> bool {
721        let channel = match self.signaling.get_channel(peer_id).await {
722            Some(c) => c,
723            None => return false,
724        };
725
726        let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
727        let request_bytes = encode_quote_request(&req);
728
729        match channel.send(request_bytes).await {
730            Ok(()) => true,
731            Err(_) => false,
732        }
733    }
734
735    /// Get peer count
736    pub async fn peer_count(&self) -> usize {
737        self.signaling.peer_count().await
738    }
739
740    /// Check if we need more peers
741    pub async fn needs_peers(&self) -> bool {
742        self.signaling.needs_peers().await
743    }
744
745    /// Re-broadcast hello to refresh discovery as topology changes.
746    pub async fn send_hello(&self) -> Result<(), TransportError> {
747        self.signaling.send_hello(vec![]).await
748    }
749
750    /// Drain all currently available peer-link messages and handle them.
751    ///
752    /// This keeps the message pump logic shared between simulation and the
753    /// default production wrapper instead of duplicating per-channel loops.
754    pub async fn drain_available_data_messages(&self) -> DataPumpStats {
755        let mut stats = DataPumpStats::default();
756        let peer_ids = self.signaling.peer_ids().await;
757        for peer_id in peer_ids {
758            let Some(channel) = self.signaling.get_channel(&peer_id).await else {
759                continue;
760            };
761
762            while let Some(data) = channel.try_recv() {
763                stats.processed += 1;
764                stats.processed_bytes += data.len() as u64;
765                if let Some(msg) = parse_message(&data) {
766                    match msg {
767                        DataMessage::Request(_) => stats.request_messages += 1,
768                        DataMessage::Response(_) => stats.response_messages += 1,
769                        DataMessage::QuoteRequest(_) => stats.quote_request_messages += 1,
770                        DataMessage::QuoteResponse(_) => stats.quote_response_messages += 1,
771                        DataMessage::Payment(_)
772                        | DataMessage::PaymentAck(_)
773                        | DataMessage::Chunk(_) => {}
774                    }
775                }
776                self.handle_data_message(&peer_id, &data).await;
777            }
778        }
779        stats
780    }
781
782    /// Apply an out-of-band payment credit to a peer's routing priority.
783    pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
784        self.peer_selector
785            .write()
786            .await
787            .record_cashu_payment(peer_id, amount_sat);
788    }
789
790    /// Record a post-delivery payment we received from a peer.
791    pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
792        self.peer_selector
793            .write()
794            .await
795            .record_cashu_receipt(peer_id, amount_sat);
796    }
797
798    /// Record that a peer failed to pay after we delivered successfully.
799    pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
800        self.peer_selector
801            .write()
802            .await
803            .record_cashu_payment_default(peer_id);
804    }
805
806    /// Snapshot routing/selection summary for inspection/debugging.
807    pub async fn selector_summary(&self) -> crate::peer_selector::SelectorSummary {
808        self.peer_selector.read().await.summary()
809    }
810
811    fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
812        selector.is_peer_blocked_for_payment_defaults(
813            peer_id,
814            self.routing.cashu_payment_default_block_threshold,
815        )
816    }
817
818    /// Export live peer metadata for inspection/debugging.
819    pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
820        self.peer_selector
821            .read()
822            .await
823            .export_peer_metadata_snapshot()
824    }
825
826    /// Snapshot current peer metadata and persist it into `local_store`.
827    ///
828    /// Uses content-addressed storage for the snapshot body and a reserved
829    /// mutable pointer slot for the "latest snapshot hash".
830    pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
831        let snapshot = self
832            .peer_selector
833            .read()
834            .await
835            .export_peer_metadata_snapshot();
836        let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
837            StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
838        })?;
839        let snapshot_hash = hashtree_core::sha256(&bytes);
840        let _ = self.local_store.put(snapshot_hash, bytes).await?;
841
842        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
843        let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
844        let _ = self.local_store.delete(&pointer_slot).await?;
845        let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
846
847        Ok(snapshot_hash)
848    }
849
850    /// Load persisted peer metadata from `local_store` if available.
851    pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
852        let pointer_slot = Self::peer_metadata_pointer_slot_hash();
853        let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
854            return Ok(false);
855        };
856        let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
857            StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
858        })?;
859        let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
860
861        let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
862            return Ok(false);
863        };
864        let snapshot: PeerMetadataSnapshot =
865            serde_json::from_slice(&snapshot_bytes).map_err(|e| {
866                StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
867            })?;
868        self.peer_selector
869            .write()
870            .await
871            .import_peer_metadata_snapshot(&snapshot);
872        Ok(true)
873    }
874
875    /// Request data from peers after negotiating a paid quote.
876    ///
877    /// If quote negotiation fails or the quoted peer does not deliver, the store
878    /// falls back to the normal unpaid retrieval path to preserve liveness.
879    pub async fn get_with_quote(
880        &self,
881        hash: &Hash,
882        payment_sat: u64,
883        quote_ttl: Duration,
884    ) -> Result<Option<Vec<u8>>, StoreError> {
885        if let Some(data) = self.local_store.get(hash).await? {
886            return Ok(Some(data));
887        }
888        Ok(self
889            .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
890            .await)
891    }
892
893    async fn request_from_peers_with_quote(
894        &self,
895        hash: &Hash,
896        payment_sat: u64,
897        quote_ttl: Duration,
898    ) -> Option<Vec<u8>> {
899        let ordered_peer_ids = self.ordered_connected_peers().await;
900        if ordered_peer_ids.is_empty() {
901            return None;
902        }
903
904        if let Some(quote) = self
905            .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
906            .await
907        {
908            if let Some(data) = self
909                .request_from_single_peer(hash, &quote.peer_id, Some(quote.quote_id))
910                .await
911            {
912                return Some(data);
913            }
914        }
915
916        self.request_from_ordered_peers(hash, &ordered_peer_ids)
917            .await
918    }
919
920    async fn request_quote_from_peers(
921        &self,
922        hash: &Hash,
923        payment_sat: u64,
924        quote_ttl: Duration,
925        ordered_peer_ids: &[String],
926    ) -> Option<NegotiatedQuote> {
927        if ordered_peer_ids.is_empty() {
928            return None;
929        }
930        let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
931        if ttl_ms == 0 {
932            return None;
933        }
934        let requested_mint = self.requested_quote_mint().map(str::to_string);
935
936        let hash_key = hash_to_key(hash);
937        let (tx, rx) = oneshot::channel();
938        self.pending_quotes.write().await.insert(
939            hash_key.clone(),
940            PendingQuoteRequest {
941                response_tx: tx,
942                preferred_mint_url: requested_mint.clone(),
943                offered_payment_sat: payment_sat,
944            },
945        );
946
947        let rx = Arc::new(Mutex::new(rx));
948        let result = run_hedged_waves(
949            ordered_peer_ids.len(),
950            self.routing.dispatch,
951            self.request_timeout,
952            |range| {
953                let wave_peer_ids = ordered_peer_ids[range].to_vec();
954                let requested_mint = requested_mint.clone();
955                let hash = *hash;
956                async move {
957                    let mut sent = 0usize;
958                    for peer_id in wave_peer_ids {
959                        if self
960                            .send_quote_request_to_peer(
961                                &peer_id,
962                                &hash,
963                                payment_sat,
964                                ttl_ms,
965                                requested_mint.as_deref(),
966                            )
967                            .await
968                        {
969                            sent += 1;
970                        }
971                    }
972                    sent
973                }
974            },
975            |wait| {
976                let rx = rx.clone();
977                async move {
978                    let mut rx = rx.lock().await;
979                    match tokio::time::timeout(wait, &mut *rx).await {
980                        Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
981                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
982                        Err(_) => HedgedWaveAction::Continue,
983                    }
984                }
985            },
986        )
987        .await;
988        let _ = self.pending_quotes.write().await.remove(&hash_key);
989        result
990    }
991
992    async fn request_from_single_peer(
993        &self,
994        hash: &Hash,
995        peer_id: &str,
996        quote_id: Option<u64>,
997    ) -> Option<Vec<u8>> {
998        let hash_key = hash_to_key(hash);
999        let (tx, rx) = oneshot::channel();
1000        self.pending_requests.write().await.insert(
1001            hash_key.clone(),
1002            PendingRequest {
1003                response_tx: tx,
1004                started_at: Instant::now(),
1005                queried_peers: vec![peer_id.to_string()],
1006            },
1007        );
1008
1009        let mut rx = rx;
1010        if !self.send_request_to_peer(peer_id, hash, quote_id).await {
1011            let _ = self.pending_requests.write().await.remove(&hash_key);
1012            return None;
1013        }
1014
1015        if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
1016            if hashtree_core::sha256(&data) == *hash {
1017                let _ = self.local_store.put(*hash, data.clone()).await;
1018                return Some(data);
1019            }
1020        }
1021
1022        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1023            for peer_id in pending.queried_peers {
1024                self.peer_selector.write().await.record_timeout(&peer_id);
1025            }
1026        }
1027        None
1028    }
1029
1030    async fn request_from_ordered_peers(
1031        &self,
1032        hash: &Hash,
1033        ordered_peer_ids: &[String],
1034    ) -> Option<Vec<u8>> {
1035        let hash_key = hash_to_key(hash);
1036        let (tx, rx) = oneshot::channel();
1037        self.pending_requests.write().await.insert(
1038            hash_key.clone(),
1039            PendingRequest {
1040                response_tx: tx,
1041                started_at: Instant::now(),
1042                queried_peers: Vec::new(),
1043            },
1044        );
1045
1046        let rx = Arc::new(Mutex::new(rx));
1047        let result = run_hedged_waves(
1048            ordered_peer_ids.len(),
1049            self.routing.dispatch,
1050            self.request_timeout,
1051            |range| {
1052                let wave_peer_ids = ordered_peer_ids[range].to_vec();
1053                let hash = *hash;
1054                let hash_key = hash_key.clone();
1055                async move {
1056                    let mut sent = 0usize;
1057                    for peer_id in wave_peer_ids {
1058                        if self.send_request_to_peer(&peer_id, &hash, None).await {
1059                            sent += 1;
1060                            if let Some(pending) =
1061                                self.pending_requests.write().await.get_mut(&hash_key)
1062                            {
1063                                pending.queried_peers.push(peer_id);
1064                            }
1065                        }
1066                    }
1067                    sent
1068                }
1069            },
1070            |wait| {
1071                let rx = rx.clone();
1072                async move {
1073                    let mut rx = rx.lock().await;
1074                    match tokio::time::timeout(wait, &mut *rx).await {
1075                        Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == *hash => {
1076                            HedgedWaveAction::Success(data)
1077                        }
1078                        Ok(Ok(Some(_))) => HedgedWaveAction::Continue,
1079                        Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1080                        Err(_) => HedgedWaveAction::Continue,
1081                    }
1082                }
1083            },
1084        )
1085        .await;
1086
1087        let Some(data) = result else {
1088            if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1089                for peer_id in pending.queried_peers {
1090                    self.peer_selector.write().await.record_timeout(&peer_id);
1091                }
1092            }
1093            return None;
1094        };
1095
1096        let _ = self.local_store.put(*hash, data.clone()).await;
1097        Some(data)
1098    }
1099
1100    /// Request data from peers
1101    async fn request_from_peers(&self, hash: &Hash) -> Option<Vec<u8>> {
1102        let ordered_peer_ids = self.ordered_connected_peers().await;
1103        if ordered_peer_ids.is_empty() {
1104            return None;
1105        }
1106        self.request_from_ordered_peers(hash, &ordered_peer_ids)
1107            .await
1108    }
1109
1110    async fn complete_pending_response(&self, from_peer: &str, hash_key: String, payload: Vec<u8>) {
1111        if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1112            let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
1113            self.peer_selector.write().await.record_success(
1114                from_peer,
1115                rtt_ms,
1116                payload.len() as u64,
1117            );
1118            let _ = pending.response_tx.send(Some(payload));
1119        }
1120    }
1121
1122    async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
1123        if !res.a {
1124            return;
1125        }
1126
1127        let Some(quote_id) = res.q else {
1128            return;
1129        };
1130
1131        let hash_key = hash_to_key(&res.h);
1132        let (preferred_mint_url, offered_payment_sat) = {
1133            let pending_quotes = self.pending_quotes.read().await;
1134            let Some(pending) = pending_quotes.get(&hash_key) else {
1135                return;
1136            };
1137            (
1138                pending.preferred_mint_url.clone(),
1139                pending.offered_payment_sat,
1140            )
1141        };
1142        if !self
1143            .should_accept_quote_response(
1144                from_peer,
1145                preferred_mint_url.as_deref(),
1146                offered_payment_sat,
1147                &res,
1148            )
1149            .await
1150        {
1151            return;
1152        }
1153        let mut pending_quotes = self.pending_quotes.write().await;
1154        if let Some(pending) = pending_quotes.remove(&hash_key) {
1155            let _ = pending.response_tx.send(Some(NegotiatedQuote {
1156                peer_id: from_peer.to_string(),
1157                quote_id,
1158                mint_url: res.m,
1159            }));
1160        }
1161    }
1162
1163    async fn handle_response_message(&self, from_peer: &str, res: crate::protocol::DataResponse) {
1164        let hash_key = hash_to_key(&res.h);
1165        let hash = match crate::protocol::bytes_to_hash(&res.h) {
1166            Some(h) => h,
1167            None => return,
1168        };
1169
1170        // Ignore malformed/corrupt payload and keep waiting for a valid response.
1171        if hashtree_core::sha256(&res.d) != hash {
1172            self.peer_selector.write().await.record_failure(from_peer);
1173            if self.debug {
1174                println!("[MeshStoreCore] Ignoring invalid response payload for {hash_key}");
1175            }
1176            return;
1177        }
1178
1179        self.complete_pending_response(from_peer, hash_key, res.d)
1180            .await;
1181    }
1182
1183    async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
1184        let hash = match crate::protocol::bytes_to_hash(&req.h) {
1185            Some(h) => h,
1186            None => return,
1187        };
1188        let hash_key = hash_to_key(&hash);
1189
1190        {
1191            let selector = self.peer_selector.read().await;
1192            if self.should_refuse_requests_from_peer(&selector, from_peer) {
1193                if self.debug {
1194                    println!(
1195                        "[MeshStoreCore] Refusing quote request from delinquent peer {}",
1196                        from_peer
1197                    );
1198                }
1199                return;
1200            }
1201        }
1202
1203        let chosen_mint = self.choose_quote_mint(req.m.as_deref());
1204        let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
1205            && !self.should_drop_response(&hash)
1206            && !self.should_corrupt_response(&hash);
1207
1208        let res = if can_serve {
1209            let quote_id = self
1210                .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
1211                .await;
1212            create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
1213        } else {
1214            create_quote_response_unavailable(&hash)
1215        };
1216        let response_bytes = encode_quote_response(&res);
1217        if let Some(channel) = self.signaling.get_channel(from_peer).await {
1218            let _ = channel.send(response_bytes).await;
1219        }
1220    }
1221
1222    async fn handle_request_message(&self, from_peer: &str, req: crate::protocol::DataRequest) {
1223        let hash = match crate::protocol::bytes_to_hash(&req.h) {
1224            Some(h) => h,
1225            None => return,
1226        };
1227        let hash_key = hash_to_key(&hash);
1228
1229        {
1230            let selector = self.peer_selector.read().await;
1231            if self.should_refuse_requests_from_peer(&selector, from_peer) {
1232                if self.debug {
1233                    println!(
1234                        "[MeshStoreCore] Refusing request from delinquent peer {}",
1235                        from_peer
1236                    );
1237                }
1238                return;
1239            }
1240        }
1241
1242        if let Some(quote_id) = req.q {
1243            if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
1244                if self.debug {
1245                    println!(
1246                        "[MeshStoreCore] Refusing request with invalid or expired quote {} from {}",
1247                        quote_id, from_peer
1248                    );
1249                }
1250                return;
1251            }
1252        }
1253
1254        // Check local store
1255        if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
1256            if self.should_drop_response(&hash) {
1257                if self.debug {
1258                    println!(
1259                        "[MeshStoreCore] Dropping response for {} due to actor profile",
1260                        hash_to_key(&hash)
1261                    );
1262                }
1263                return;
1264            }
1265
1266            let behavior = self.response_behavior();
1267            if behavior.extra_delay_ms > 0 {
1268                tokio::time::sleep(Duration::from_millis(behavior.extra_delay_ms)).await;
1269            }
1270
1271            if self.should_corrupt_response(&hash) {
1272                if data.is_empty() {
1273                    data.push(0x80);
1274                } else {
1275                    data[0] ^= 0x80;
1276                }
1277            }
1278
1279            // Send response
1280            let res = create_response(&hash, data);
1281            let response_bytes = encode_response(&res);
1282            if let Some(channel) = self.signaling.get_channel(from_peer).await {
1283                let _ = channel.send(response_bytes).await;
1284            }
1285        }
1286        // For now, don't forward - keep it simple
1287    }
1288
1289    /// Handle incoming data message
1290    pub async fn handle_data_message(&self, from_peer: &str, data: &[u8]) {
1291        let parsed = match parse_message(data) {
1292            Some(m) => m,
1293            None => return,
1294        };
1295
1296        match parsed {
1297            DataMessage::Request(req) => {
1298                self.handle_request_message(from_peer, req).await;
1299            }
1300            DataMessage::Response(res) => {
1301                self.handle_response_message(from_peer, res).await;
1302            }
1303            DataMessage::QuoteRequest(req) => {
1304                self.handle_quote_request_message(from_peer, req).await;
1305            }
1306            DataMessage::QuoteResponse(res) => {
1307                self.handle_quote_response_message(from_peer, res).await;
1308            }
1309            DataMessage::Payment(_) | DataMessage::PaymentAck(_) | DataMessage::Chunk(_) => {}
1310        }
1311    }
1312}
1313
1314#[async_trait]
1315impl<S, R, F> Store for MeshStoreCore<S, R, F>
1316where
1317    S: Store + Send + Sync + 'static,
1318    R: SignalingTransport + Send + Sync + 'static,
1319    F: PeerLinkFactory + Send + Sync + 'static,
1320{
1321    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
1322        self.local_store.put(hash, data).await
1323    }
1324
1325    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
1326        // Try local first
1327        if let Some(data) = self.local_store.get(hash).await? {
1328            return Ok(Some(data));
1329        }
1330
1331        // Try peers
1332        Ok(self.request_from_peers(hash).await)
1333    }
1334
1335    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
1336        self.local_store.has(hash).await
1337    }
1338
1339    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
1340        self.local_store.delete(hash).await
1341    }
1342}
1343
1344#[cfg(test)]
1345mod tests;
1346
1347/// Type alias for simulation store.
1348pub type SimMeshStore<S> =
1349    MeshStoreCore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
1350
1351/// Type alias for the default production core (Nostr signaling + WebRTC links).
1352pub type ProductionMeshStore<S> =
1353    MeshStoreCore<S, crate::nostr::NostrRelayTransport, crate::real_factory::WebRtcPeerLinkFactory>;