1use async_trait::async_trait;
8use std::collections::hash_map::DefaultHasher;
9use std::collections::{HashMap, HashSet};
10use std::hash::{Hash as _, Hasher};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::{oneshot, RwLock};
14
15use hashtree_core::{Hash, Store, StoreError};
16
17use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
18use crate::protocol::{
19 create_quote_request, create_quote_response_available, create_quote_response_unavailable,
20 create_request, create_request_with_quote, create_response, encode_quote_request,
21 encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
22 DataMessage, DataQuoteRequest, DataQuoteResponse,
23};
24use crate::signaling::SignalingManager;
25use crate::transport::{PeerConnectionFactory, RelayTransport, TransportError};
26use crate::types::{PeerHTLConfig, SignalingMessage, MAX_HTL};
27
28const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
29
30struct PendingRequest {
32 response_tx: oneshot::Sender<Option<Vec<u8>>>,
33 started_at: Instant,
34 queried_peers: Vec<String>,
35}
36
37struct PendingQuoteRequest {
38 response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
39 preferred_mint_url: Option<String>,
40 offered_payment_sat: u64,
41}
42
43#[derive(Debug, Clone)]
44struct NegotiatedQuote {
45 peer_id: String,
46 quote_id: u64,
47 #[allow(dead_code)]
48 mint_url: Option<String>,
49}
50
51struct IssuedQuote {
52 expires_at: Instant,
53 #[allow(dead_code)]
54 payment_sat: u64,
55 #[allow(dead_code)]
56 mint_url: Option<String>,
57}
58
59#[derive(Debug, Clone, Copy)]
65pub struct RequestDispatchConfig {
66 pub initial_fanout: usize,
68 pub hedge_fanout: usize,
70 pub max_fanout: usize,
72 pub hedge_interval_ms: u64,
74}
75
76impl Default for RequestDispatchConfig {
77 fn default() -> Self {
78 Self {
79 initial_fanout: usize::MAX,
80 hedge_fanout: usize::MAX,
81 max_fanout: usize::MAX,
82 hedge_interval_ms: 0,
83 }
84 }
85}
86
87pub fn normalize_dispatch_config(
89 dispatch: RequestDispatchConfig,
90 available_peers: usize,
91) -> RequestDispatchConfig {
92 let mut cfg = dispatch;
93 let cap = if cfg.max_fanout == 0 {
94 available_peers
95 } else {
96 cfg.max_fanout.min(available_peers)
97 };
98 cfg.max_fanout = cap;
99 cfg.initial_fanout = if cfg.initial_fanout == 0 {
100 1
101 } else {
102 cfg.initial_fanout.min(cap.max(1))
103 };
104 cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
105 1
106 } else {
107 cfg.hedge_fanout.min(cap.max(1))
108 };
109 cfg
110}
111
112pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
114 if peer_count == 0 {
115 return Vec::new();
116 }
117 let cap = dispatch.max_fanout.min(peer_count);
118 if cap == 0 {
119 return Vec::new();
120 }
121
122 let mut plan = Vec::new();
123 let mut sent = 0usize;
124 let first = dispatch.initial_fanout.min(cap).max(1);
125 plan.push(first);
126 sent += first;
127
128 while sent < cap {
129 let next = dispatch.hedge_fanout.min(cap - sent).max(1);
130 plan.push(next);
131 sent += next;
132 }
133 plan
134}
135
136pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
138 let mut selector = selector.write().await;
139 let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
140 let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
141 for peer_id in known {
142 if !current.contains(peer_id.as_str()) {
143 selector.remove_peer(&peer_id);
144 }
145 }
146 for peer_id in current_peer_ids {
147 selector.add_peer(peer_id.clone());
148 }
149}
150
151#[derive(Debug, Clone, Copy)]
155pub struct ResponseBehaviorConfig {
156 pub drop_response_prob: f64,
158 pub corrupt_response_prob: f64,
160 pub extra_delay_ms: u64,
162}
163
164impl Default for ResponseBehaviorConfig {
165 fn default() -> Self {
166 Self {
167 drop_response_prob: 0.0,
168 corrupt_response_prob: 0.0,
169 extra_delay_ms: 0,
170 }
171 }
172}
173
174impl ResponseBehaviorConfig {
175 fn normalized(self) -> Self {
176 Self {
177 drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
178 corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
179 extra_delay_ms: self.extra_delay_ms,
180 }
181 }
182}
183
184#[derive(Debug, Clone)]
186pub struct GenericStoreRoutingConfig {
187 pub selection_strategy: SelectionStrategy,
188 pub fairness_enabled: bool,
189 pub cashu_payment_weight: f64,
191 pub cashu_payment_default_block_threshold: u64,
194 pub cashu_accepted_mints: Vec<String>,
196 pub cashu_default_mint: Option<String>,
198 pub cashu_peer_suggested_mint_base_cap_sat: u64,
200 pub cashu_peer_suggested_mint_success_step_sat: u64,
202 pub cashu_peer_suggested_mint_receipt_step_sat: u64,
204 pub cashu_peer_suggested_mint_max_cap_sat: u64,
206 pub dispatch: RequestDispatchConfig,
207 pub response_behavior: ResponseBehaviorConfig,
208}
209
210impl Default for GenericStoreRoutingConfig {
211 fn default() -> Self {
212 Self {
213 selection_strategy: SelectionStrategy::Weighted,
214 fairness_enabled: true,
215 cashu_payment_weight: 0.0,
216 cashu_payment_default_block_threshold: 0,
217 cashu_accepted_mints: Vec::new(),
218 cashu_default_mint: None,
219 cashu_peer_suggested_mint_base_cap_sat: 0,
220 cashu_peer_suggested_mint_success_step_sat: 0,
221 cashu_peer_suggested_mint_receipt_step_sat: 0,
222 cashu_peer_suggested_mint_max_cap_sat: 0,
223 dispatch: RequestDispatchConfig::default(),
224 response_behavior: ResponseBehaviorConfig::default(),
225 }
226 }
227}
228
229pub struct GenericStore<S, R, F>
235where
236 S: Store + Send + Sync + 'static,
237 R: RelayTransport + Send + Sync + 'static,
238 F: PeerConnectionFactory + Send + Sync + 'static,
239{
240 local_store: Arc<S>,
242 signaling: Arc<SignalingManager<R, F>>,
244 htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
246 pending_requests: RwLock<HashMap<String, PendingRequest>>,
248 pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
250 issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
252 next_quote_id: RwLock<u64>,
254 peer_selector: RwLock<PeerSelector>,
256 routing: GenericStoreRoutingConfig,
258 request_timeout: Duration,
260 debug: bool,
262 running: RwLock<bool>,
264}
265
266impl<S, R, F> GenericStore<S, R, F>
267where
268 S: Store + Send + Sync + 'static,
269 R: RelayTransport + Send + Sync + 'static,
270 F: PeerConnectionFactory + Send + Sync + 'static,
271{
272 pub fn new(
274 local_store: Arc<S>,
275 signaling: Arc<SignalingManager<R, F>>,
276 request_timeout: Duration,
277 debug: bool,
278 ) -> Self {
279 Self::new_with_routing(
280 local_store,
281 signaling,
282 request_timeout,
283 debug,
284 Default::default(),
285 )
286 }
287
288 pub fn new_with_routing(
290 local_store: Arc<S>,
291 signaling: Arc<SignalingManager<R, F>>,
292 request_timeout: Duration,
293 debug: bool,
294 routing: GenericStoreRoutingConfig,
295 ) -> Self {
296 let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
297 selector.set_fairness(routing.fairness_enabled);
298 selector.set_cashu_payment_weight(routing.cashu_payment_weight);
299 Self {
300 local_store,
301 signaling,
302 htl_configs: RwLock::new(HashMap::new()),
303 pending_requests: RwLock::new(HashMap::new()),
304 pending_quotes: RwLock::new(HashMap::new()),
305 issued_quotes: RwLock::new(HashMap::new()),
306 next_quote_id: RwLock::new(1),
307 peer_selector: RwLock::new(selector),
308 routing,
309 request_timeout,
310 debug,
311 running: RwLock::new(false),
312 }
313 }
314
315 pub async fn start(&self) -> Result<(), TransportError> {
317 *self.running.write().await = true;
318
319 self.signaling.send_hello(vec![]).await?;
321
322 Ok(())
323 }
324
325 pub async fn stop(&self) {
327 *self.running.write().await = false;
328 }
329
330 pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
332 let peer_id = msg.peer_id().to_string();
334 {
335 let mut configs = self.htl_configs.write().await;
336 if !configs.contains_key(&peer_id) {
337 configs.insert(peer_id.clone(), PeerHTLConfig::random());
338 }
339 }
340 self.peer_selector.write().await.add_peer(peer_id);
341
342 self.signaling.handle_message(msg).await
343 }
344
345 pub fn signaling(&self) -> &Arc<SignalingManager<R, F>> {
347 &self.signaling
348 }
349
350 fn response_behavior(&self) -> ResponseBehaviorConfig {
351 self.routing.response_behavior.normalized()
352 }
353
354 fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
355 let mut hasher = DefaultHasher::new();
356 peer_id.hash(&mut hasher);
357 hash.hash(&mut hasher);
358 salt.hash(&mut hasher);
359 let v = hasher.finish();
360 (v as f64) / (u64::MAX as f64)
361 }
362
363 fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
364 Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
365 }
366
367 fn peer_metadata_pointer_slot_hash() -> Hash {
368 hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
369 }
370
371 fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
372 let bytes = hex::decode(hash_hex)
373 .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
374 if bytes.len() != 32 {
375 return Err(StoreError::Other(format!(
376 "Invalid hash length {}, expected 32 bytes",
377 bytes.len()
378 )));
379 }
380 let mut hash = [0u8; 32];
381 hash.copy_from_slice(&bytes);
382 Ok(hash)
383 }
384
385 fn should_drop_response(&self, hash: &Hash) -> bool {
386 let p = self.response_behavior().drop_response_prob;
387 if p <= 0.0 {
388 return false;
389 }
390 self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
391 }
392
393 fn should_corrupt_response(&self, hash: &Hash) -> bool {
394 let p = self.response_behavior().corrupt_response_prob;
395 if p <= 0.0 {
396 return false;
397 }
398 self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
399 }
400
401 async fn ordered_connected_peers(&self) -> Vec<String> {
402 let current_peer_ids = self.signaling.peer_ids().await;
403 if current_peer_ids.is_empty() {
404 return Vec::new();
405 }
406
407 sync_selector_peers(&self.peer_selector, ¤t_peer_ids).await;
408 let current_set: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
409 let mut ordered_peer_ids = self.peer_selector.write().await.select_peers();
410 ordered_peer_ids.retain(|peer_id| current_set.contains(peer_id.as_str()));
411 if ordered_peer_ids.is_empty() {
412 let mut fallback = current_peer_ids;
413 fallback.sort();
414 return fallback;
415 }
416 ordered_peer_ids
417 }
418
419 fn requested_quote_mint(&self) -> Option<&str> {
420 if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
421 if self.routing.cashu_accepted_mints.is_empty()
422 || self
423 .routing
424 .cashu_accepted_mints
425 .iter()
426 .any(|mint| mint == default_mint)
427 {
428 return Some(default_mint);
429 }
430 }
431
432 self.routing
433 .cashu_accepted_mints
434 .first()
435 .map(String::as_str)
436 }
437
438 fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
439 if let Some(requested_mint) = requested_mint {
440 if self.accepts_quote_mint(Some(requested_mint)) {
441 return Some(requested_mint.to_string());
442 }
443 }
444 if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
445 return Some(default_mint.clone());
446 }
447 if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
448 return Some(first_mint.clone());
449 }
450 requested_mint.map(str::to_string)
451 }
452
453 fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
454 if self.routing.cashu_accepted_mints.is_empty() {
455 return true;
456 }
457
458 let Some(mint_url) = mint_url else {
459 return false;
460 };
461 self.routing
462 .cashu_accepted_mints
463 .iter()
464 .any(|mint| mint == mint_url)
465 }
466
467 fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
468 let Some(mint_url) = mint_url else {
469 return self.routing.cashu_default_mint.is_none()
470 && self.routing.cashu_accepted_mints.is_empty();
471 };
472 self.routing.cashu_default_mint.as_deref() == Some(mint_url)
473 || self
474 .routing
475 .cashu_accepted_mints
476 .iter()
477 .any(|mint| mint == mint_url)
478 }
479
480 async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
481 let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
482 if base == 0 {
483 return 0;
484 }
485
486 let selector = self.peer_selector.read().await;
487 let Some(stats) = selector.get_stats(peer_id) else {
488 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
489 return if max_cap > 0 { base.min(max_cap) } else { base };
490 };
491
492 if stats.cashu_payment_defaults > 0
493 && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
494 {
495 return 0;
496 }
497
498 let success_bonus = stats
499 .successes
500 .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
501 let receipt_bonus = stats
502 .cashu_payment_receipts
503 .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
504 let mut cap = base
505 .saturating_add(success_bonus)
506 .saturating_add(receipt_bonus);
507 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
508 if max_cap > 0 {
509 cap = cap.min(max_cap);
510 }
511 cap
512 }
513
514 async fn should_accept_quote_response(
515 &self,
516 from_peer: &str,
517 preferred_mint_url: Option<&str>,
518 offered_payment_sat: u64,
519 res: &DataQuoteResponse,
520 ) -> bool {
521 let Some(payment_sat) = res.p else {
522 return false;
523 };
524 if payment_sat > offered_payment_sat {
525 return false;
526 }
527
528 let response_mint = res.m.as_deref();
529 if response_mint == preferred_mint_url {
530 return true;
531 }
532 if self.trusts_quote_mint(response_mint) {
533 return true;
534 }
535 if response_mint.is_none() {
536 return false;
537 }
538
539 payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
540 }
541
542 async fn issue_quote(
543 &self,
544 peer_id: &str,
545 hash_key: &str,
546 payment_sat: u64,
547 ttl_ms: u32,
548 mint_url: Option<&str>,
549 ) -> u64 {
550 let quote_id = {
551 let mut next = self.next_quote_id.write().await;
552 let quote_id = *next;
553 *next = next.saturating_add(1);
554 quote_id
555 };
556
557 let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
558 self.issued_quotes.write().await.insert(
559 (peer_id.to_string(), hash_key.to_string(), quote_id),
560 IssuedQuote {
561 expires_at,
562 payment_sat,
563 mint_url: mint_url.map(str::to_string),
564 },
565 );
566 quote_id
567 }
568
569 async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
570 let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
571 let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
572 return false;
573 };
574 quote.expires_at > Instant::now()
575 }
576
577 async fn send_request_to_peer(
578 &self,
579 peer_id: &str,
580 hash: &Hash,
581 quote_id: Option<u64>,
582 ) -> bool {
583 let channel = match self.signaling.get_channel(peer_id).await {
584 Some(c) => c,
585 None => return false,
586 };
587
588 let htl_config = {
589 let configs = self.htl_configs.read().await;
590 configs
591 .get(peer_id)
592 .cloned()
593 .unwrap_or_else(PeerHTLConfig::random)
594 };
595
596 let send_htl = htl_config.decrement(MAX_HTL);
597 let req = match quote_id {
598 Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
599 None => create_request(hash, send_htl),
600 };
601 let request_bytes = encode_request(&req);
602
603 {
604 let mut selector = self.peer_selector.write().await;
605 selector.record_request(peer_id, request_bytes.len() as u64);
606 }
607
608 match channel.send(request_bytes).await {
609 Ok(()) => true,
610 Err(_) => {
611 self.peer_selector.write().await.record_failure(peer_id);
612 false
613 }
614 }
615 }
616
617 async fn send_quote_request_to_peer(
618 &self,
619 peer_id: &str,
620 hash: &Hash,
621 payment_sat: u64,
622 ttl_ms: u32,
623 mint_url: Option<&str>,
624 ) -> bool {
625 let channel = match self.signaling.get_channel(peer_id).await {
626 Some(c) => c,
627 None => return false,
628 };
629
630 let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
631 let request_bytes = encode_quote_request(&req);
632
633 match channel.send(request_bytes).await {
634 Ok(()) => true,
635 Err(_) => false,
636 }
637 }
638
639 pub async fn peer_count(&self) -> usize {
641 self.signaling.peer_count().await
642 }
643
644 pub async fn needs_peers(&self) -> bool {
646 self.signaling.needs_peers().await
647 }
648
649 pub async fn send_hello(&self) -> Result<(), TransportError> {
651 self.signaling.send_hello(vec![]).await
652 }
653
654 pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
656 self.peer_selector
657 .write()
658 .await
659 .record_cashu_payment(peer_id, amount_sat);
660 }
661
662 pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
664 self.peer_selector
665 .write()
666 .await
667 .record_cashu_receipt(peer_id, amount_sat);
668 }
669
670 pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
672 self.peer_selector
673 .write()
674 .await
675 .record_cashu_payment_default(peer_id);
676 }
677
678 fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
679 selector.is_peer_blocked_for_payment_defaults(
680 peer_id,
681 self.routing.cashu_payment_default_block_threshold,
682 )
683 }
684
685 pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
687 self.peer_selector
688 .read()
689 .await
690 .export_peer_metadata_snapshot()
691 }
692
693 pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
698 let snapshot = self
699 .peer_selector
700 .read()
701 .await
702 .export_peer_metadata_snapshot();
703 let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
704 StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
705 })?;
706 let snapshot_hash = hashtree_core::sha256(&bytes);
707 let _ = self.local_store.put(snapshot_hash, bytes).await?;
708
709 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
710 let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
711 let _ = self.local_store.delete(&pointer_slot).await?;
712 let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
713
714 Ok(snapshot_hash)
715 }
716
717 pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
719 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
720 let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
721 return Ok(false);
722 };
723 let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
724 StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
725 })?;
726 let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
727
728 let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
729 return Ok(false);
730 };
731 let snapshot: PeerMetadataSnapshot =
732 serde_json::from_slice(&snapshot_bytes).map_err(|e| {
733 StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
734 })?;
735 self.peer_selector
736 .write()
737 .await
738 .import_peer_metadata_snapshot(&snapshot);
739 Ok(true)
740 }
741
742 pub async fn get_with_quote(
747 &self,
748 hash: &Hash,
749 payment_sat: u64,
750 quote_ttl: Duration,
751 ) -> Result<Option<Vec<u8>>, StoreError> {
752 if let Some(data) = self.local_store.get(hash).await? {
753 return Ok(Some(data));
754 }
755 Ok(self
756 .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
757 .await)
758 }
759
760 async fn request_from_peers_with_quote(
761 &self,
762 hash: &Hash,
763 payment_sat: u64,
764 quote_ttl: Duration,
765 ) -> Option<Vec<u8>> {
766 let ordered_peer_ids = self.ordered_connected_peers().await;
767 if ordered_peer_ids.is_empty() {
768 return None;
769 }
770
771 if let Some(quote) = self
772 .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
773 .await
774 {
775 if let Some(data) = self
776 .request_from_single_peer(hash, "e.peer_id, Some(quote.quote_id))
777 .await
778 {
779 return Some(data);
780 }
781 }
782
783 self.request_from_ordered_peers(hash, &ordered_peer_ids)
784 .await
785 }
786
787 async fn request_quote_from_peers(
788 &self,
789 hash: &Hash,
790 payment_sat: u64,
791 quote_ttl: Duration,
792 ordered_peer_ids: &[String],
793 ) -> Option<NegotiatedQuote> {
794 if ordered_peer_ids.is_empty() {
795 return None;
796 }
797 let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
798 if ttl_ms == 0 {
799 return None;
800 }
801 let requested_mint = self.requested_quote_mint().map(str::to_string);
802
803 let dispatch = normalize_dispatch_config(self.routing.dispatch, ordered_peer_ids.len());
804 let wave_plan = build_hedged_wave_plan(ordered_peer_ids.len(), dispatch);
805 if wave_plan.is_empty() {
806 return None;
807 }
808
809 let hash_key = hash_to_key(hash);
810 let (tx, rx) = oneshot::channel();
811 self.pending_quotes.write().await.insert(
812 hash_key.clone(),
813 PendingQuoteRequest {
814 response_tx: tx,
815 preferred_mint_url: requested_mint.clone(),
816 offered_payment_sat: payment_sat,
817 },
818 );
819
820 let mut sent_total = 0usize;
821 let mut next_peer_idx = 0usize;
822 let mut rx = rx;
823 let deadline = Instant::now() + self.request_timeout;
824
825 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
826 let from = next_peer_idx;
827 let to = (next_peer_idx + wave_size).min(ordered_peer_ids.len());
828 for peer_id in &ordered_peer_ids[from..to] {
829 if self
830 .send_quote_request_to_peer(
831 peer_id,
832 hash,
833 payment_sat,
834 ttl_ms,
835 requested_mint.as_deref(),
836 )
837 .await
838 {
839 sent_total += 1;
840 }
841 }
842 next_peer_idx = to;
843
844 if sent_total == 0 {
845 if next_peer_idx >= ordered_peer_ids.len() {
846 break;
847 }
848 continue;
849 }
850
851 let now = Instant::now();
852 if now >= deadline {
853 break;
854 }
855 let remaining = deadline.saturating_duration_since(now);
856 let is_last_wave =
857 wave_idx + 1 == wave_plan.len() || next_peer_idx >= ordered_peer_ids.len();
858 let wait = if is_last_wave {
859 remaining
860 } else if dispatch.hedge_interval_ms == 0 {
861 Duration::ZERO
862 } else {
863 Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
864 };
865
866 if wait.is_zero() {
867 continue;
868 }
869
870 match tokio::time::timeout(wait, &mut rx).await {
871 Ok(Ok(Some(quote))) => {
872 let _ = self.pending_quotes.write().await.remove(&hash_key);
873 return Some(quote);
874 }
875 Ok(Ok(None)) => break,
876 Ok(Err(_)) => break,
877 Err(_) => {}
878 }
879 }
880
881 let _ = self.pending_quotes.write().await.remove(&hash_key);
882 None
883 }
884
885 async fn request_from_single_peer(
886 &self,
887 hash: &Hash,
888 peer_id: &str,
889 quote_id: Option<u64>,
890 ) -> Option<Vec<u8>> {
891 let hash_key = hash_to_key(hash);
892 let (tx, rx) = oneshot::channel();
893 self.pending_requests.write().await.insert(
894 hash_key.clone(),
895 PendingRequest {
896 response_tx: tx,
897 started_at: Instant::now(),
898 queried_peers: vec![peer_id.to_string()],
899 },
900 );
901
902 let mut rx = rx;
903 if !self.send_request_to_peer(peer_id, hash, quote_id).await {
904 let _ = self.pending_requests.write().await.remove(&hash_key);
905 return None;
906 }
907
908 if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
909 if hashtree_core::sha256(&data) == *hash {
910 let _ = self.local_store.put(*hash, data.clone()).await;
911 return Some(data);
912 }
913 }
914
915 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
916 for peer_id in pending.queried_peers {
917 self.peer_selector.write().await.record_timeout(&peer_id);
918 }
919 }
920 None
921 }
922
923 async fn request_from_ordered_peers(
924 &self,
925 hash: &Hash,
926 ordered_peer_ids: &[String],
927 ) -> Option<Vec<u8>> {
928 let dispatch = normalize_dispatch_config(self.routing.dispatch, ordered_peer_ids.len());
929 let wave_plan = build_hedged_wave_plan(ordered_peer_ids.len(), dispatch);
930 if wave_plan.is_empty() {
931 return None;
932 }
933
934 let hash_key = hash_to_key(hash);
935 let (tx, rx) = oneshot::channel();
936 self.pending_requests.write().await.insert(
937 hash_key.clone(),
938 PendingRequest {
939 response_tx: tx,
940 started_at: Instant::now(),
941 queried_peers: Vec::new(),
942 },
943 );
944
945 let mut sent_total = 0usize;
946 let mut next_peer_idx = 0usize;
947 let mut rx = rx;
948 let deadline = Instant::now() + self.request_timeout;
949
950 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
951 let from = next_peer_idx;
952 let to = (next_peer_idx + wave_size).min(ordered_peer_ids.len());
953 for peer_id in &ordered_peer_ids[from..to] {
954 if self.send_request_to_peer(peer_id, hash, None).await {
955 sent_total += 1;
956 if let Some(pending) = self.pending_requests.write().await.get_mut(&hash_key) {
957 pending.queried_peers.push(peer_id.clone());
958 }
959 }
960 }
961 next_peer_idx = to;
962
963 if sent_total == 0 {
964 if next_peer_idx >= ordered_peer_ids.len() {
965 break;
966 }
967 continue;
968 }
969
970 let now = Instant::now();
971 if now >= deadline {
972 break;
973 }
974 let remaining = deadline.saturating_duration_since(now);
975 let is_last_wave =
976 wave_idx + 1 == wave_plan.len() || next_peer_idx >= ordered_peer_ids.len();
977 let wait = if is_last_wave {
978 remaining
979 } else if dispatch.hedge_interval_ms == 0 {
980 Duration::ZERO
981 } else {
982 Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
983 };
984
985 if wait.is_zero() {
986 continue;
987 }
988
989 match tokio::time::timeout(wait, &mut rx).await {
990 Ok(Ok(Some(data))) => {
991 if hashtree_core::sha256(&data) == *hash {
992 let _ = self.local_store.put(*hash, data.clone()).await;
993 return Some(data);
994 }
995 }
996 Ok(Ok(None)) => break,
997 Ok(Err(_)) => break,
998 Err(_) => {
999 }
1001 }
1002 }
1003
1004 if sent_total == 0 {
1005 let _ = self.pending_requests.write().await.remove(&hash_key);
1006 return None;
1007 }
1008
1009 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1010 for peer_id in pending.queried_peers {
1011 self.peer_selector.write().await.record_timeout(&peer_id);
1012 }
1013 }
1014 None
1015 }
1016
1017 async fn request_from_peers(&self, hash: &Hash) -> Option<Vec<u8>> {
1019 let ordered_peer_ids = self.ordered_connected_peers().await;
1020 if ordered_peer_ids.is_empty() {
1021 return None;
1022 }
1023 self.request_from_ordered_peers(hash, &ordered_peer_ids)
1024 .await
1025 }
1026
1027 async fn complete_pending_response(&self, from_peer: &str, hash_key: String, payload: Vec<u8>) {
1028 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1029 let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
1030 self.peer_selector.write().await.record_success(
1031 from_peer,
1032 rtt_ms,
1033 payload.len() as u64,
1034 );
1035 let _ = pending.response_tx.send(Some(payload));
1036 }
1037 }
1038
1039 async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
1040 if !res.a {
1041 return;
1042 }
1043
1044 let Some(quote_id) = res.q else {
1045 return;
1046 };
1047
1048 let hash_key = hash_to_key(&res.h);
1049 let (preferred_mint_url, offered_payment_sat) = {
1050 let pending_quotes = self.pending_quotes.read().await;
1051 let Some(pending) = pending_quotes.get(&hash_key) else {
1052 return;
1053 };
1054 (
1055 pending.preferred_mint_url.clone(),
1056 pending.offered_payment_sat,
1057 )
1058 };
1059 if !self
1060 .should_accept_quote_response(
1061 from_peer,
1062 preferred_mint_url.as_deref(),
1063 offered_payment_sat,
1064 &res,
1065 )
1066 .await
1067 {
1068 return;
1069 }
1070 let mut pending_quotes = self.pending_quotes.write().await;
1071 if let Some(pending) = pending_quotes.remove(&hash_key) {
1072 let _ = pending.response_tx.send(Some(NegotiatedQuote {
1073 peer_id: from_peer.to_string(),
1074 quote_id,
1075 mint_url: res.m,
1076 }));
1077 }
1078 }
1079
1080 async fn handle_response_message(&self, from_peer: &str, res: crate::protocol::DataResponse) {
1081 let hash_key = hash_to_key(&res.h);
1082 let hash = match crate::protocol::bytes_to_hash(&res.h) {
1083 Some(h) => h,
1084 None => return,
1085 };
1086
1087 if hashtree_core::sha256(&res.d) != hash {
1089 self.peer_selector.write().await.record_failure(from_peer);
1090 if self.debug {
1091 println!("[GenericStore] Ignoring invalid response payload for {hash_key}");
1092 }
1093 return;
1094 }
1095
1096 self.complete_pending_response(from_peer, hash_key, res.d)
1097 .await;
1098 }
1099
1100 async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
1101 let hash = match crate::protocol::bytes_to_hash(&req.h) {
1102 Some(h) => h,
1103 None => return,
1104 };
1105 let hash_key = hash_to_key(&hash);
1106
1107 {
1108 let selector = self.peer_selector.read().await;
1109 if self.should_refuse_requests_from_peer(&selector, from_peer) {
1110 if self.debug {
1111 println!(
1112 "[GenericStore] Refusing quote request from delinquent peer {}",
1113 from_peer
1114 );
1115 }
1116 return;
1117 }
1118 }
1119
1120 let chosen_mint = self.choose_quote_mint(req.m.as_deref());
1121 let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
1122 && !self.should_drop_response(&hash)
1123 && !self.should_corrupt_response(&hash);
1124
1125 let res = if can_serve {
1126 let quote_id = self
1127 .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
1128 .await;
1129 create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
1130 } else {
1131 create_quote_response_unavailable(&hash)
1132 };
1133 let response_bytes = encode_quote_response(&res);
1134 if let Some(channel) = self.signaling.get_channel(from_peer).await {
1135 let _ = channel.send(response_bytes).await;
1136 }
1137 }
1138
1139 async fn handle_request_message(&self, from_peer: &str, req: crate::protocol::DataRequest) {
1140 let hash = match crate::protocol::bytes_to_hash(&req.h) {
1141 Some(h) => h,
1142 None => return,
1143 };
1144 let hash_key = hash_to_key(&hash);
1145
1146 {
1147 let selector = self.peer_selector.read().await;
1148 if self.should_refuse_requests_from_peer(&selector, from_peer) {
1149 if self.debug {
1150 println!(
1151 "[GenericStore] Refusing request from delinquent peer {}",
1152 from_peer
1153 );
1154 }
1155 return;
1156 }
1157 }
1158
1159 if let Some(quote_id) = req.q {
1160 if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
1161 if self.debug {
1162 println!(
1163 "[GenericStore] Refusing request with invalid or expired quote {} from {}",
1164 quote_id, from_peer
1165 );
1166 }
1167 return;
1168 }
1169 }
1170
1171 if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
1173 if self.should_drop_response(&hash) {
1174 if self.debug {
1175 println!(
1176 "[GenericStore] Dropping response for {} due to actor profile",
1177 hash_to_key(&hash)
1178 );
1179 }
1180 return;
1181 }
1182
1183 let behavior = self.response_behavior();
1184 if behavior.extra_delay_ms > 0 {
1185 tokio::time::sleep(Duration::from_millis(behavior.extra_delay_ms)).await;
1186 }
1187
1188 if self.should_corrupt_response(&hash) {
1189 if data.is_empty() {
1190 data.push(0x80);
1191 } else {
1192 data[0] ^= 0x80;
1193 }
1194 }
1195
1196 let res = create_response(&hash, data);
1198 let response_bytes = encode_response(&res);
1199 if let Some(channel) = self.signaling.get_channel(from_peer).await {
1200 let _ = channel.send(response_bytes).await;
1201 }
1202 }
1203 }
1205
1206 pub async fn handle_data_message(&self, from_peer: &str, data: &[u8]) {
1208 let parsed = match parse_message(data) {
1209 Some(m) => m,
1210 None => return,
1211 };
1212
1213 match parsed {
1214 DataMessage::Request(req) => {
1215 self.handle_request_message(from_peer, req).await;
1216 }
1217 DataMessage::Response(res) => {
1218 self.handle_response_message(from_peer, res).await;
1219 }
1220 DataMessage::QuoteRequest(req) => {
1221 self.handle_quote_request_message(from_peer, req).await;
1222 }
1223 DataMessage::QuoteResponse(res) => {
1224 self.handle_quote_response_message(from_peer, res).await;
1225 }
1226 }
1227 }
1228}
1229
1230#[async_trait]
1231impl<S, R, F> Store for GenericStore<S, R, F>
1232where
1233 S: Store + Send + Sync + 'static,
1234 R: RelayTransport + Send + Sync + 'static,
1235 F: PeerConnectionFactory + Send + Sync + 'static,
1236{
1237 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
1238 self.local_store.put(hash, data).await
1239 }
1240
1241 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
1242 if let Some(data) = self.local_store.get(hash).await? {
1244 return Ok(Some(data));
1245 }
1246
1247 Ok(self.request_from_peers(hash).await)
1249 }
1250
1251 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
1252 self.local_store.has(hash).await
1253 }
1254
1255 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
1256 self.local_store.delete(hash).await
1257 }
1258}
1259
1260#[cfg(test)]
1261mod tests {
1262 use super::*;
1263 use hashtree_core::MemoryStore;
1264 use std::sync::Arc;
1265 use std::sync::OnceLock;
1266 use std::time::Duration;
1267
1268 type TestStore = GenericStore<
1269 MemoryStore,
1270 crate::mock::MockRelayTransport,
1271 crate::mock::MockConnectionFactory,
1272 >;
1273
1274 struct TestNode {
1275 store: Arc<TestStore>,
1276 local_store: Arc<MemoryStore>,
1277 transport: Arc<crate::mock::MockRelayTransport>,
1278 }
1279
1280 fn mock_network_lock() -> &'static tokio::sync::Mutex<()> {
1281 static LOCK: OnceLock<tokio::sync::Mutex<()>> = OnceLock::new();
1282 LOCK.get_or_init(|| tokio::sync::Mutex::new(()))
1283 }
1284
1285 fn make_test_store(local_store: Arc<MemoryStore>, node_id: &str) -> TestStore {
1286 make_test_store_with_routing(local_store, node_id, GenericStoreRoutingConfig::default())
1287 }
1288
1289 fn make_test_store_with_routing(
1290 local_store: Arc<MemoryStore>,
1291 node_id: &str,
1292 routing: GenericStoreRoutingConfig,
1293 ) -> TestStore {
1294 let relay = crate::mock::MockRelay::new();
1295 let transport = Arc::new(relay.create_transport(node_id.to_string(), node_id.to_string()));
1296 let conn_factory = Arc::new(crate::mock::MockConnectionFactory::new(
1297 node_id.to_string(),
1298 0,
1299 ));
1300 let signaling = Arc::new(crate::signaling::SignalingManager::new(
1301 node_id.to_string(),
1302 node_id.to_string(),
1303 transport,
1304 conn_factory,
1305 crate::types::PoolSettings::default(),
1306 false,
1307 ));
1308
1309 TestStore::new_with_routing(
1310 local_store,
1311 signaling,
1312 Duration::from_millis(200),
1313 false,
1314 routing,
1315 )
1316 }
1317
1318 fn make_shared_test_node(
1319 relay: Arc<crate::mock::MockRelay>,
1320 node_id: &str,
1321 routing: GenericStoreRoutingConfig,
1322 ) -> TestNode {
1323 let transport = Arc::new(relay.create_transport(node_id.to_string(), node_id.to_string()));
1324 let conn_factory = Arc::new(crate::mock::MockConnectionFactory::new(
1325 node_id.to_string(),
1326 0,
1327 ));
1328 let signaling = Arc::new(crate::signaling::SignalingManager::new(
1329 node_id.to_string(),
1330 node_id.to_string(),
1331 transport.clone(),
1332 conn_factory,
1333 crate::types::PoolSettings::default(),
1334 false,
1335 ));
1336 let local_store = Arc::new(MemoryStore::new());
1337 let store = Arc::new(TestStore::new_with_routing(
1338 local_store.clone(),
1339 signaling,
1340 Duration::from_millis(120),
1341 false,
1342 routing,
1343 ));
1344
1345 TestNode {
1346 store,
1347 local_store,
1348 transport,
1349 }
1350 }
1351
1352 async fn pump_test_signaling(nodes: &[&TestNode]) -> usize {
1353 let mut processed = 0usize;
1354 for node in nodes {
1355 while let Some(msg) = node.transport.try_recv() {
1356 node.store
1357 .process_signaling(msg)
1358 .await
1359 .expect("process signaling");
1360 processed += 1;
1361 }
1362 }
1363 processed
1364 }
1365
1366 async fn pump_test_data(nodes: &[&TestNode]) -> usize {
1367 let mut processed = 0usize;
1368 for node in nodes {
1369 let peer_ids = node.store.signaling().peer_ids().await;
1370 for peer_id in peer_ids {
1371 let Some(channel) = node.store.signaling().get_channel(&peer_id).await else {
1372 continue;
1373 };
1374 while let Some(data) = channel.try_recv() {
1375 node.store.handle_data_message(&peer_id, &data).await;
1376 processed += 1;
1377 }
1378 }
1379 }
1380 processed
1381 }
1382
1383 async fn pump_test_network(nodes: &[&TestNode], max_steps: usize) {
1384 for _ in 0..max_steps {
1385 let signaling = pump_test_signaling(nodes).await;
1386 let data = pump_test_data(nodes).await;
1387 if signaling + data == 0 {
1388 tokio::task::yield_now().await;
1389 }
1390 }
1391 }
1392
1393 async fn run_get_with_pumps(
1394 requester: Arc<TestStore>,
1395 hash: Hash,
1396 nodes: &[&TestNode],
1397 ) -> Option<Vec<u8>> {
1398 let task = tokio::spawn(async move { requester.get(&hash).await.ok().flatten() });
1399 let started = Instant::now();
1400
1401 loop {
1402 if task.is_finished() {
1403 return task.await.expect("request task join");
1404 }
1405
1406 if started.elapsed() > Duration::from_secs(1) {
1407 task.abort();
1408 return None;
1409 }
1410
1411 pump_test_network(nodes, 4).await;
1412 }
1413 }
1414
1415 async fn run_bad_peer_series(strategy: SelectionStrategy) -> usize {
1416 let _guard = mock_network_lock().lock().await;
1417 crate::mock::clear_channel_registry().await;
1418
1419 let relay = crate::mock::MockRelay::new();
1420 let requester = make_shared_test_node(
1421 relay.clone(),
1422 "requester-reject",
1423 GenericStoreRoutingConfig {
1424 selection_strategy: strategy,
1425 fairness_enabled: false,
1426 dispatch: RequestDispatchConfig {
1427 initial_fanout: 1,
1428 hedge_fanout: 1,
1429 max_fanout: 1,
1430 hedge_interval_ms: 5,
1431 },
1432 ..Default::default()
1433 },
1434 );
1435 let bad = make_shared_test_node(
1436 relay.clone(),
1437 "a-bad",
1438 GenericStoreRoutingConfig {
1439 response_behavior: ResponseBehaviorConfig {
1440 drop_response_prob: 1.0,
1441 ..Default::default()
1442 },
1443 ..Default::default()
1444 },
1445 );
1446 let honest = make_shared_test_node(relay, "b-honest", GenericStoreRoutingConfig::default());
1447 let nodes = [&requester, &bad, &honest];
1448
1449 for node in &nodes {
1450 node.transport
1451 .connect(&[])
1452 .await
1453 .expect("connect transport");
1454 node.store.start().await.expect("start store");
1455 }
1456 pump_test_network(&nodes, 24).await;
1457
1458 let mut successes = 0usize;
1459 for round in 0..6 {
1460 let payload = format!("payload-{round}").into_bytes();
1461 let hash = hashtree_core::sha256(&payload);
1462 let _ = bad.local_store.put(hash, payload.clone()).await;
1463 let _ = honest.local_store.put(hash, payload.clone()).await;
1464
1465 let result = run_get_with_pumps(requester.store.clone(), hash, &nodes).await;
1466 if result.as_ref() == Some(&payload) {
1467 successes += 1;
1468 }
1469 }
1470
1471 crate::mock::clear_channel_registry().await;
1472 successes
1473 }
1474
1475 #[test]
1476 fn test_hedged_wave_plan_flood_all() {
1477 let plan = build_hedged_wave_plan(7, RequestDispatchConfig::default());
1478 assert_eq!(plan, vec![7]);
1479 }
1480
1481 #[test]
1482 fn test_hedged_wave_plan_staged() {
1483 let plan = build_hedged_wave_plan(
1484 10,
1485 RequestDispatchConfig {
1486 initial_fanout: 2,
1487 hedge_fanout: 3,
1488 max_fanout: 8,
1489 hedge_interval_ms: 25,
1490 },
1491 );
1492 assert_eq!(plan, vec![2, 3, 3]);
1493 }
1494
1495 #[test]
1496 fn test_response_behavior_normalization_clamps_probs() {
1497 let raw = ResponseBehaviorConfig {
1498 drop_response_prob: -1.5,
1499 corrupt_response_prob: 9.0,
1500 extra_delay_ms: 12,
1501 };
1502 let normalized = raw.normalized();
1503 assert_eq!(normalized.drop_response_prob, 0.0);
1504 assert_eq!(normalized.corrupt_response_prob, 1.0);
1505 assert_eq!(normalized.extra_delay_ms, 12);
1506 }
1507
1508 #[test]
1509 fn test_actor_draw_is_deterministic_per_peer_hash_and_salt() {
1510 let hash = hashtree_core::sha256(b"deterministic");
1511 let a = TestStore::deterministic_actor_draw_for("peer-a", &hash, 7);
1512 let b = TestStore::deterministic_actor_draw_for("peer-a", &hash, 7);
1513 assert!((a - b).abs() < f64::EPSILON);
1514 }
1515
1516 #[tokio::test]
1517 async fn test_load_peer_metadata_returns_false_when_missing() {
1518 let local_store = Arc::new(MemoryStore::new());
1519 let store = make_test_store(local_store, "0");
1520 assert!(!store.load_peer_metadata().await.expect("load result"));
1521 }
1522
1523 #[tokio::test]
1524 async fn test_persist_and_load_peer_metadata_with_existing_store_adapter() {
1525 let local_store = Arc::new(MemoryStore::new());
1526 let writer = make_test_store(local_store.clone(), "0");
1527 {
1528 let mut selector = writer.peer_selector.write().await;
1529 selector.add_peer("npub1stable:session-a");
1530 selector.record_request("npub1stable:session-a", 64);
1531 selector.record_success("npub1stable:session-a", 35, 1024);
1532 selector.record_cashu_payment("npub1stable:session-a", 120);
1533 selector.record_cashu_receipt("npub1stable:session-a", 40);
1534 selector.record_cashu_payment_default("npub1stable:session-a");
1535 }
1536
1537 let snapshot_hash = writer
1538 .persist_peer_metadata()
1539 .await
1540 .expect("persist peer metadata");
1541 assert!(local_store
1542 .get(&snapshot_hash)
1543 .await
1544 .expect("snapshot lookup")
1545 .is_some());
1546
1547 let reader = make_test_store(local_store, "1");
1548 assert!(reader
1549 .load_peer_metadata()
1550 .await
1551 .expect("load peer metadata snapshot"));
1552
1553 let mut selector = reader.peer_selector.write().await;
1554 selector.add_peer("npub1stable:session-b");
1555 let stats = selector
1556 .get_stats("npub1stable:session-b")
1557 .expect("restored peer stats");
1558 assert_eq!(stats.requests_sent, 1);
1559 assert_eq!(stats.successes, 1);
1560 assert_eq!(stats.cashu_paid_sat, 120);
1561 assert_eq!(stats.cashu_received_sat, 40);
1562 assert_eq!(stats.cashu_payment_receipts, 1);
1563 assert_eq!(stats.cashu_payment_defaults, 1);
1564 }
1565
1566 #[tokio::test]
1567 async fn test_should_refuse_requests_from_peer_after_payment_defaults() {
1568 let local_store = Arc::new(MemoryStore::new());
1569 let store = make_test_store_with_routing(
1570 local_store,
1571 "0",
1572 GenericStoreRoutingConfig {
1573 cashu_payment_default_block_threshold: 1,
1574 ..Default::default()
1575 },
1576 );
1577 store.record_cashu_payment_default_from_peer("peer-a").await;
1578
1579 let selector = store.peer_selector.read().await;
1580 assert!(store.should_refuse_requests_from_peer(&selector, "peer-a"));
1581 assert!(!store.should_refuse_requests_from_peer(&selector, "peer-b"));
1582 }
1583
1584 #[tokio::test]
1585 async fn test_take_valid_quote_consumes_once_and_rejects_expired_quotes() {
1586 let local_store = Arc::new(MemoryStore::new());
1587 let store = make_test_store(local_store, "0");
1588 let hash = hashtree_core::sha256(b"quote-test");
1589 let hash_key = hash_to_key(&hash);
1590
1591 {
1592 let mut issued = store.issued_quotes.write().await;
1593 issued.insert(
1594 ("peer-a".to_string(), hash_key.clone(), 11),
1595 IssuedQuote {
1596 expires_at: Instant::now() + Duration::from_secs(1),
1597 payment_sat: 5,
1598 mint_url: Some("https://mint-a.example".to_string()),
1599 },
1600 );
1601 issued.insert(
1602 ("peer-a".to_string(), hash_key.clone(), 12),
1603 IssuedQuote {
1604 expires_at: Instant::now() - Duration::from_millis(1),
1605 payment_sat: 5,
1606 mint_url: Some("https://mint-a.example".to_string()),
1607 },
1608 );
1609 }
1610
1611 assert!(store.take_valid_quote("peer-a", &hash_key, 11).await);
1612 assert!(!store.take_valid_quote("peer-a", &hash_key, 11).await);
1613 assert!(!store.take_valid_quote("peer-a", &hash_key, 12).await);
1614 }
1615
1616 async fn run_quote_with_pumps(
1617 requester: Arc<TestStore>,
1618 hash: Hash,
1619 payment_sat: u64,
1620 quote_ttl: Duration,
1621 peer_ids: Vec<String>,
1622 nodes: &[&TestNode],
1623 ) -> Option<NegotiatedQuote> {
1624 let task = tokio::spawn(async move {
1625 requester
1626 .request_quote_from_peers(&hash, payment_sat, quote_ttl, &peer_ids)
1627 .await
1628 });
1629 let started = Instant::now();
1630
1631 loop {
1632 if task.is_finished() {
1633 return task.await.expect("quote task join");
1634 }
1635 if started.elapsed() > Duration::from_secs(1) {
1636 task.abort();
1637 return None;
1638 }
1639 pump_test_network(nodes, 4).await;
1640 }
1641 }
1642
1643 #[tokio::test]
1644 async fn test_request_quote_from_peers_rejects_unaccepted_mint() {
1645 let _guard = mock_network_lock().lock().await;
1646 crate::mock::clear_channel_registry().await;
1647
1648 let relay = crate::mock::MockRelay::new();
1649 let requester = make_shared_test_node(
1650 relay.clone(),
1651 "requester-reject",
1652 GenericStoreRoutingConfig {
1653 cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1654 cashu_default_mint: Some("https://mint-a.example".to_string()),
1655 dispatch: RequestDispatchConfig {
1656 initial_fanout: 1,
1657 hedge_fanout: 1,
1658 max_fanout: 1,
1659 hedge_interval_ms: 5,
1660 },
1661 ..Default::default()
1662 },
1663 );
1664 let provider = make_shared_test_node(
1665 relay,
1666 "provider-reject",
1667 GenericStoreRoutingConfig {
1668 cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1669 ..Default::default()
1670 },
1671 );
1672 let nodes = [&requester, &provider];
1673
1674 requester.transport.connect(&[]).await.expect("connect");
1675 provider.transport.connect(&[]).await.expect("connect");
1676 requester.store.start().await.expect("start");
1677 provider.store.start().await.expect("start");
1678 pump_test_network(&nodes, 24).await;
1679
1680 let payload = b"quoted-data".to_vec();
1681 let hash = hashtree_core::sha256(&payload);
1682 provider.local_store.put(hash, payload).await.expect("put");
1683
1684 let quote = run_quote_with_pumps(
1685 requester.store.clone(),
1686 hash,
1687 9,
1688 Duration::from_millis(80),
1689 vec!["provider-reject".to_string()],
1690 &nodes,
1691 )
1692 .await;
1693 assert!(
1694 quote.is_none(),
1695 "expected quote to be rejected on mint mismatch"
1696 );
1697 }
1698
1699 #[tokio::test]
1700 async fn test_request_quote_from_peers_accepts_small_peer_suggested_mint_under_cap() {
1701 let _guard = mock_network_lock().lock().await;
1702 crate::mock::clear_channel_registry().await;
1703
1704 let relay = crate::mock::MockRelay::new();
1705 let requester = make_shared_test_node(
1706 relay.clone(),
1707 "requester-suggested",
1708 GenericStoreRoutingConfig {
1709 cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1710 cashu_default_mint: Some("https://mint-a.example".to_string()),
1711 cashu_peer_suggested_mint_base_cap_sat: 3,
1712 cashu_peer_suggested_mint_max_cap_sat: 3,
1713 dispatch: RequestDispatchConfig {
1714 initial_fanout: 1,
1715 hedge_fanout: 1,
1716 max_fanout: 1,
1717 hedge_interval_ms: 5,
1718 },
1719 ..Default::default()
1720 },
1721 );
1722 let provider = make_shared_test_node(
1723 relay,
1724 "provider-suggested",
1725 GenericStoreRoutingConfig {
1726 cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1727 cashu_default_mint: Some("https://mint-b.example".to_string()),
1728 ..Default::default()
1729 },
1730 );
1731 let nodes = [&requester, &provider];
1732
1733 requester.transport.connect(&[]).await.expect("connect");
1734 provider.transport.connect(&[]).await.expect("connect");
1735 requester.store.start().await.expect("start");
1736 provider.store.start().await.expect("start");
1737 pump_test_network(&nodes, 24).await;
1738
1739 let payload = b"quoted-data".to_vec();
1740 let hash = hashtree_core::sha256(&payload);
1741 provider.local_store.put(hash, payload).await.expect("put");
1742
1743 let quote = run_quote_with_pumps(
1744 requester.store.clone(),
1745 hash,
1746 3,
1747 Duration::from_millis(80),
1748 vec!["provider-suggested".to_string()],
1749 &nodes,
1750 )
1751 .await
1752 .expect("expected bounded peer-suggested mint quote");
1753 assert_eq!(quote.mint_url.as_deref(), Some("https://mint-b.example"));
1754 }
1755
1756 #[tokio::test]
1757 async fn test_request_quote_from_peers_scales_peer_suggested_mint_cap_with_reputation() {
1758 let _guard = mock_network_lock().lock().await;
1759 crate::mock::clear_channel_registry().await;
1760
1761 let relay = crate::mock::MockRelay::new();
1762 let requester = make_shared_test_node(
1763 relay.clone(),
1764 "requester-reputation",
1765 GenericStoreRoutingConfig {
1766 cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1767 cashu_default_mint: Some("https://mint-a.example".to_string()),
1768 cashu_peer_suggested_mint_base_cap_sat: 1,
1769 cashu_peer_suggested_mint_success_step_sat: 1,
1770 cashu_peer_suggested_mint_receipt_step_sat: 2,
1771 cashu_peer_suggested_mint_max_cap_sat: 5,
1772 dispatch: RequestDispatchConfig {
1773 initial_fanout: 1,
1774 hedge_fanout: 1,
1775 max_fanout: 1,
1776 hedge_interval_ms: 5,
1777 },
1778 ..Default::default()
1779 },
1780 );
1781 let provider = make_shared_test_node(
1782 relay,
1783 "provider-reputation",
1784 GenericStoreRoutingConfig {
1785 cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1786 cashu_default_mint: Some("https://mint-b.example".to_string()),
1787 ..Default::default()
1788 },
1789 );
1790 let nodes = [&requester, &provider];
1791
1792 requester.transport.connect(&[]).await.expect("connect");
1793 provider.transport.connect(&[]).await.expect("connect");
1794 requester.store.start().await.expect("start");
1795 provider.store.start().await.expect("start");
1796 pump_test_network(&nodes, 24).await;
1797
1798 let payload = b"quoted-data".to_vec();
1799 let hash = hashtree_core::sha256(&payload);
1800 provider.local_store.put(hash, payload).await.expect("put");
1801
1802 let quote = run_quote_with_pumps(
1803 requester.store.clone(),
1804 hash,
1805 4,
1806 Duration::from_millis(80),
1807 vec!["provider-reputation".to_string()],
1808 &nodes,
1809 )
1810 .await;
1811 assert!(
1812 quote.is_none(),
1813 "new peer should not get a 4 sat untrusted-mint quote"
1814 );
1815
1816 {
1817 let mut selector = requester.store.peer_selector.write().await;
1818 selector.add_peer("provider-reputation");
1819 selector.record_success("provider-reputation", 20, 1024);
1820 selector.record_cashu_receipt("provider-reputation", 2);
1821 }
1822
1823 let quote = run_quote_with_pumps(
1824 requester.store.clone(),
1825 hash,
1826 4,
1827 Duration::from_millis(80),
1828 vec!["provider-reputation".to_string()],
1829 &nodes,
1830 )
1831 .await
1832 .expect("reputable peer should get larger bounded quote");
1833 assert_eq!(quote.mint_url.as_deref(), Some("https://mint-b.example"));
1834
1835 requester
1836 .store
1837 .record_cashu_payment_default_from_peer("provider-reputation")
1838 .await;
1839 let quote = run_quote_with_pumps(
1840 requester.store.clone(),
1841 hash,
1842 4,
1843 Duration::from_millis(80),
1844 vec!["provider-reputation".to_string()],
1845 &nodes,
1846 )
1847 .await;
1848 assert!(
1849 quote.is_none(),
1850 "peer-suggested mint exposure should drop to zero after defaults exceed receipts"
1851 );
1852 }
1853
1854 #[tokio::test]
1855 async fn test_request_quote_from_peers_returns_matching_mint() {
1856 let _guard = mock_network_lock().lock().await;
1857 crate::mock::clear_channel_registry().await;
1858
1859 let relay = crate::mock::MockRelay::new();
1860 let requester = make_shared_test_node(
1861 relay.clone(),
1862 "requester-match",
1863 GenericStoreRoutingConfig {
1864 cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1865 cashu_default_mint: Some("https://mint-a.example".to_string()),
1866 dispatch: RequestDispatchConfig {
1867 initial_fanout: 1,
1868 hedge_fanout: 1,
1869 max_fanout: 1,
1870 hedge_interval_ms: 5,
1871 },
1872 ..Default::default()
1873 },
1874 );
1875 let provider = make_shared_test_node(
1876 relay,
1877 "provider-match",
1878 GenericStoreRoutingConfig {
1879 cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1880 ..Default::default()
1881 },
1882 );
1883 let nodes = [&requester, &provider];
1884
1885 requester.transport.connect(&[]).await.expect("connect");
1886 provider.transport.connect(&[]).await.expect("connect");
1887 requester.store.start().await.expect("start");
1888 provider.store.start().await.expect("start");
1889 pump_test_network(&nodes, 24).await;
1890
1891 let payload = b"quoted-data".to_vec();
1892 let hash = hashtree_core::sha256(&payload);
1893 provider.local_store.put(hash, payload).await.expect("put");
1894
1895 let quote = run_quote_with_pumps(
1896 requester.store.clone(),
1897 hash,
1898 9,
1899 Duration::from_millis(80),
1900 vec!["provider-match".to_string()],
1901 &nodes,
1902 )
1903 .await
1904 .expect("expected quote");
1905 assert_eq!(quote.mint_url.as_deref(), Some("https://mint-a.example"));
1906 }
1907
1908 #[tokio::test]
1909 async fn test_tit_for_tat_store_path_recovers_after_bad_peer_observation() {
1910 let tit_for_tat_successes = run_bad_peer_series(SelectionStrategy::TitForTat).await;
1911
1912 assert!(
1913 tit_for_tat_successes >= 5,
1914 "expected tit-for-tat path to recover after the first consistently bad peer observation (successes={tit_for_tat_successes})"
1915 );
1916 }
1917}
1918
1919pub type SimStore<S> =
1921 GenericStore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
1922
1923pub type ProductionStore<S> = GenericStore<
1925 S,
1926 crate::nostr::NostrRelayTransport,
1927 crate::real_factory::RealPeerConnectionFactory,
1928>;