1use async_trait::async_trait;
8use std::collections::hash_map::DefaultHasher;
9use std::collections::{HashMap, HashSet};
10use std::hash::{Hash as _, Hasher};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::{oneshot, RwLock};
14
15use hashtree_core::{Hash, Store, StoreError};
16
17use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
18use crate::protocol::{
19 create_quote_request, create_quote_response_available, create_quote_response_unavailable,
20 create_request, create_request_with_quote, create_response, encode_quote_request,
21 encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
22 DataMessage, DataQuoteRequest, DataQuoteResponse,
23};
24use crate::signaling::MeshRouter;
25use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
26use crate::types::{PeerHTLConfig, SignalingMessage, MAX_HTL};
27
28const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
29
30struct PendingRequest {
32 response_tx: oneshot::Sender<Option<Vec<u8>>>,
33 started_at: Instant,
34 queried_peers: Vec<String>,
35}
36
37struct PendingQuoteRequest {
38 response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
39 preferred_mint_url: Option<String>,
40 offered_payment_sat: u64,
41}
42
43#[derive(Debug, Clone)]
44struct NegotiatedQuote {
45 peer_id: String,
46 quote_id: u64,
47 #[allow(dead_code)]
48 mint_url: Option<String>,
49}
50
51struct IssuedQuote {
52 expires_at: Instant,
53 #[allow(dead_code)]
54 payment_sat: u64,
55 #[allow(dead_code)]
56 mint_url: Option<String>,
57}
58
59#[derive(Debug, Clone, Copy)]
65pub struct RequestDispatchConfig {
66 pub initial_fanout: usize,
68 pub hedge_fanout: usize,
70 pub max_fanout: usize,
72 pub hedge_interval_ms: u64,
74}
75
76impl Default for RequestDispatchConfig {
77 fn default() -> Self {
78 Self {
79 initial_fanout: usize::MAX,
80 hedge_fanout: usize::MAX,
81 max_fanout: usize::MAX,
82 hedge_interval_ms: 0,
83 }
84 }
85}
86
87pub fn normalize_dispatch_config(
89 dispatch: RequestDispatchConfig,
90 available_peers: usize,
91) -> RequestDispatchConfig {
92 let mut cfg = dispatch;
93 let cap = if cfg.max_fanout == 0 {
94 available_peers
95 } else {
96 cfg.max_fanout.min(available_peers)
97 };
98 cfg.max_fanout = cap;
99 cfg.initial_fanout = if cfg.initial_fanout == 0 {
100 1
101 } else {
102 cfg.initial_fanout.min(cap.max(1))
103 };
104 cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
105 1
106 } else {
107 cfg.hedge_fanout.min(cap.max(1))
108 };
109 cfg
110}
111
112pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
114 if peer_count == 0 {
115 return Vec::new();
116 }
117 let cap = dispatch.max_fanout.min(peer_count);
118 if cap == 0 {
119 return Vec::new();
120 }
121
122 let mut plan = Vec::new();
123 let mut sent = 0usize;
124 let first = dispatch.initial_fanout.min(cap).max(1);
125 plan.push(first);
126 sent += first;
127
128 while sent < cap {
129 let next = dispatch.hedge_fanout.min(cap - sent).max(1);
130 plan.push(next);
131 sent += next;
132 }
133 plan
134}
135
136pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
138 let mut selector = selector.write().await;
139 let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
140 let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
141 for peer_id in known {
142 if !current.contains(peer_id.as_str()) {
143 selector.remove_peer(&peer_id);
144 }
145 }
146 for peer_id in current_peer_ids {
147 selector.add_peer(peer_id.clone());
148 }
149}
150
151#[derive(Debug, Clone, Copy)]
155pub struct ResponseBehaviorConfig {
156 pub drop_response_prob: f64,
158 pub corrupt_response_prob: f64,
160 pub extra_delay_ms: u64,
162}
163
164impl Default for ResponseBehaviorConfig {
165 fn default() -> Self {
166 Self {
167 drop_response_prob: 0.0,
168 corrupt_response_prob: 0.0,
169 extra_delay_ms: 0,
170 }
171 }
172}
173
174impl ResponseBehaviorConfig {
175 fn normalized(self) -> Self {
176 Self {
177 drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
178 corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
179 extra_delay_ms: self.extra_delay_ms,
180 }
181 }
182}
183
184#[derive(Debug, Clone)]
186pub struct GenericStoreRoutingConfig {
187 pub selection_strategy: SelectionStrategy,
188 pub fairness_enabled: bool,
189 pub cashu_payment_weight: f64,
191 pub cashu_payment_default_block_threshold: u64,
194 pub cashu_accepted_mints: Vec<String>,
196 pub cashu_default_mint: Option<String>,
198 pub cashu_peer_suggested_mint_base_cap_sat: u64,
200 pub cashu_peer_suggested_mint_success_step_sat: u64,
202 pub cashu_peer_suggested_mint_receipt_step_sat: u64,
204 pub cashu_peer_suggested_mint_max_cap_sat: u64,
206 pub dispatch: RequestDispatchConfig,
207 pub response_behavior: ResponseBehaviorConfig,
208}
209
210impl Default for GenericStoreRoutingConfig {
211 fn default() -> Self {
212 Self {
213 selection_strategy: SelectionStrategy::Weighted,
214 fairness_enabled: true,
215 cashu_payment_weight: 0.0,
216 cashu_payment_default_block_threshold: 0,
217 cashu_accepted_mints: Vec::new(),
218 cashu_default_mint: None,
219 cashu_peer_suggested_mint_base_cap_sat: 0,
220 cashu_peer_suggested_mint_success_step_sat: 0,
221 cashu_peer_suggested_mint_receipt_step_sat: 0,
222 cashu_peer_suggested_mint_max_cap_sat: 0,
223 dispatch: RequestDispatchConfig::default(),
224 response_behavior: ResponseBehaviorConfig::default(),
225 }
226 }
227}
228
229pub struct GenericStore<S, R, F>
236where
237 S: Store + Send + Sync + 'static,
238 R: SignalingTransport + Send + Sync + 'static,
239 F: PeerLinkFactory + Send + Sync + 'static,
240{
241 local_store: Arc<S>,
243 signaling: Arc<MeshRouter<R, F>>,
245 htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
247 pending_requests: RwLock<HashMap<String, PendingRequest>>,
249 pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
251 issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
253 next_quote_id: RwLock<u64>,
255 peer_selector: RwLock<PeerSelector>,
257 routing: GenericStoreRoutingConfig,
259 request_timeout: Duration,
261 debug: bool,
263 running: RwLock<bool>,
265}
266
267impl<S, R, F> GenericStore<S, R, F>
268where
269 S: Store + Send + Sync + 'static,
270 R: SignalingTransport + Send + Sync + 'static,
271 F: PeerLinkFactory + Send + Sync + 'static,
272{
273 pub fn new(
275 local_store: Arc<S>,
276 signaling: Arc<MeshRouter<R, F>>,
277 request_timeout: Duration,
278 debug: bool,
279 ) -> Self {
280 Self::new_with_routing(
281 local_store,
282 signaling,
283 request_timeout,
284 debug,
285 Default::default(),
286 )
287 }
288
289 pub fn new_with_routing(
291 local_store: Arc<S>,
292 signaling: Arc<MeshRouter<R, F>>,
293 request_timeout: Duration,
294 debug: bool,
295 routing: GenericStoreRoutingConfig,
296 ) -> Self {
297 let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
298 selector.set_fairness(routing.fairness_enabled);
299 selector.set_cashu_payment_weight(routing.cashu_payment_weight);
300 Self {
301 local_store,
302 signaling,
303 htl_configs: RwLock::new(HashMap::new()),
304 pending_requests: RwLock::new(HashMap::new()),
305 pending_quotes: RwLock::new(HashMap::new()),
306 issued_quotes: RwLock::new(HashMap::new()),
307 next_quote_id: RwLock::new(1),
308 peer_selector: RwLock::new(selector),
309 routing,
310 request_timeout,
311 debug,
312 running: RwLock::new(false),
313 }
314 }
315
316 pub async fn start(&self) -> Result<(), TransportError> {
318 *self.running.write().await = true;
319
320 self.signaling.send_hello(vec![]).await?;
322
323 Ok(())
324 }
325
326 pub async fn stop(&self) {
328 *self.running.write().await = false;
329 }
330
331 pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
333 let peer_id = msg.peer_id().to_string();
335 {
336 let mut configs = self.htl_configs.write().await;
337 if !configs.contains_key(&peer_id) {
338 configs.insert(peer_id.clone(), PeerHTLConfig::random());
339 }
340 }
341 self.peer_selector.write().await.add_peer(peer_id);
342
343 self.signaling.handle_message(msg).await
344 }
345
346 pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
348 &self.signaling
349 }
350
351 fn response_behavior(&self) -> ResponseBehaviorConfig {
352 self.routing.response_behavior.normalized()
353 }
354
355 fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
356 let mut hasher = DefaultHasher::new();
357 peer_id.hash(&mut hasher);
358 hash.hash(&mut hasher);
359 salt.hash(&mut hasher);
360 let v = hasher.finish();
361 (v as f64) / (u64::MAX as f64)
362 }
363
364 fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
365 Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
366 }
367
368 fn peer_metadata_pointer_slot_hash() -> Hash {
369 hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
370 }
371
372 fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
373 let bytes = hex::decode(hash_hex)
374 .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
375 if bytes.len() != 32 {
376 return Err(StoreError::Other(format!(
377 "Invalid hash length {}, expected 32 bytes",
378 bytes.len()
379 )));
380 }
381 let mut hash = [0u8; 32];
382 hash.copy_from_slice(&bytes);
383 Ok(hash)
384 }
385
386 fn should_drop_response(&self, hash: &Hash) -> bool {
387 let p = self.response_behavior().drop_response_prob;
388 if p <= 0.0 {
389 return false;
390 }
391 self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
392 }
393
394 fn should_corrupt_response(&self, hash: &Hash) -> bool {
395 let p = self.response_behavior().corrupt_response_prob;
396 if p <= 0.0 {
397 return false;
398 }
399 self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
400 }
401
402 async fn ordered_connected_peers(&self) -> Vec<String> {
403 let current_peer_ids = self.signaling.peer_ids().await;
404 if current_peer_ids.is_empty() {
405 return Vec::new();
406 }
407
408 sync_selector_peers(&self.peer_selector, ¤t_peer_ids).await;
409 let current_set: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
410 let mut ordered_peer_ids = self.peer_selector.write().await.select_peers();
411 ordered_peer_ids.retain(|peer_id| current_set.contains(peer_id.as_str()));
412 if ordered_peer_ids.is_empty() {
413 let mut fallback = current_peer_ids;
414 fallback.sort();
415 return fallback;
416 }
417 ordered_peer_ids
418 }
419
420 fn requested_quote_mint(&self) -> Option<&str> {
421 if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
422 if self.routing.cashu_accepted_mints.is_empty()
423 || self
424 .routing
425 .cashu_accepted_mints
426 .iter()
427 .any(|mint| mint == default_mint)
428 {
429 return Some(default_mint);
430 }
431 }
432
433 self.routing
434 .cashu_accepted_mints
435 .first()
436 .map(String::as_str)
437 }
438
439 fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
440 if let Some(requested_mint) = requested_mint {
441 if self.accepts_quote_mint(Some(requested_mint)) {
442 return Some(requested_mint.to_string());
443 }
444 }
445 if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
446 return Some(default_mint.clone());
447 }
448 if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
449 return Some(first_mint.clone());
450 }
451 requested_mint.map(str::to_string)
452 }
453
454 fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
455 if self.routing.cashu_accepted_mints.is_empty() {
456 return true;
457 }
458
459 let Some(mint_url) = mint_url else {
460 return false;
461 };
462 self.routing
463 .cashu_accepted_mints
464 .iter()
465 .any(|mint| mint == mint_url)
466 }
467
468 fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
469 let Some(mint_url) = mint_url else {
470 return self.routing.cashu_default_mint.is_none()
471 && self.routing.cashu_accepted_mints.is_empty();
472 };
473 self.routing.cashu_default_mint.as_deref() == Some(mint_url)
474 || self
475 .routing
476 .cashu_accepted_mints
477 .iter()
478 .any(|mint| mint == mint_url)
479 }
480
481 async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
482 let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
483 if base == 0 {
484 return 0;
485 }
486
487 let selector = self.peer_selector.read().await;
488 let Some(stats) = selector.get_stats(peer_id) else {
489 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
490 return if max_cap > 0 { base.min(max_cap) } else { base };
491 };
492
493 if stats.cashu_payment_defaults > 0
494 && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
495 {
496 return 0;
497 }
498
499 let success_bonus = stats
500 .successes
501 .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
502 let receipt_bonus = stats
503 .cashu_payment_receipts
504 .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
505 let mut cap = base
506 .saturating_add(success_bonus)
507 .saturating_add(receipt_bonus);
508 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
509 if max_cap > 0 {
510 cap = cap.min(max_cap);
511 }
512 cap
513 }
514
515 async fn should_accept_quote_response(
516 &self,
517 from_peer: &str,
518 preferred_mint_url: Option<&str>,
519 offered_payment_sat: u64,
520 res: &DataQuoteResponse,
521 ) -> bool {
522 let Some(payment_sat) = res.p else {
523 return false;
524 };
525 if payment_sat > offered_payment_sat {
526 return false;
527 }
528
529 let response_mint = res.m.as_deref();
530 if response_mint == preferred_mint_url {
531 return true;
532 }
533 if self.trusts_quote_mint(response_mint) {
534 return true;
535 }
536 if response_mint.is_none() {
537 return false;
538 }
539
540 payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
541 }
542
543 async fn issue_quote(
544 &self,
545 peer_id: &str,
546 hash_key: &str,
547 payment_sat: u64,
548 ttl_ms: u32,
549 mint_url: Option<&str>,
550 ) -> u64 {
551 let quote_id = {
552 let mut next = self.next_quote_id.write().await;
553 let quote_id = *next;
554 *next = next.saturating_add(1);
555 quote_id
556 };
557
558 let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
559 self.issued_quotes.write().await.insert(
560 (peer_id.to_string(), hash_key.to_string(), quote_id),
561 IssuedQuote {
562 expires_at,
563 payment_sat,
564 mint_url: mint_url.map(str::to_string),
565 },
566 );
567 quote_id
568 }
569
570 async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
571 let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
572 let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
573 return false;
574 };
575 quote.expires_at > Instant::now()
576 }
577
578 async fn send_request_to_peer(
579 &self,
580 peer_id: &str,
581 hash: &Hash,
582 quote_id: Option<u64>,
583 ) -> bool {
584 let channel = match self.signaling.get_channel(peer_id).await {
585 Some(c) => c,
586 None => return false,
587 };
588
589 let htl_config = {
590 let configs = self.htl_configs.read().await;
591 configs
592 .get(peer_id)
593 .cloned()
594 .unwrap_or_else(PeerHTLConfig::random)
595 };
596
597 let send_htl = htl_config.decrement(MAX_HTL);
598 let req = match quote_id {
599 Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
600 None => create_request(hash, send_htl),
601 };
602 let request_bytes = encode_request(&req);
603
604 {
605 let mut selector = self.peer_selector.write().await;
606 selector.record_request(peer_id, request_bytes.len() as u64);
607 }
608
609 match channel.send(request_bytes).await {
610 Ok(()) => true,
611 Err(_) => {
612 self.peer_selector.write().await.record_failure(peer_id);
613 false
614 }
615 }
616 }
617
618 async fn send_quote_request_to_peer(
619 &self,
620 peer_id: &str,
621 hash: &Hash,
622 payment_sat: u64,
623 ttl_ms: u32,
624 mint_url: Option<&str>,
625 ) -> bool {
626 let channel = match self.signaling.get_channel(peer_id).await {
627 Some(c) => c,
628 None => return false,
629 };
630
631 let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
632 let request_bytes = encode_quote_request(&req);
633
634 match channel.send(request_bytes).await {
635 Ok(()) => true,
636 Err(_) => false,
637 }
638 }
639
640 pub async fn peer_count(&self) -> usize {
642 self.signaling.peer_count().await
643 }
644
645 pub async fn needs_peers(&self) -> bool {
647 self.signaling.needs_peers().await
648 }
649
650 pub async fn send_hello(&self) -> Result<(), TransportError> {
652 self.signaling.send_hello(vec![]).await
653 }
654
655 pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
657 self.peer_selector
658 .write()
659 .await
660 .record_cashu_payment(peer_id, amount_sat);
661 }
662
663 pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
665 self.peer_selector
666 .write()
667 .await
668 .record_cashu_receipt(peer_id, amount_sat);
669 }
670
671 pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
673 self.peer_selector
674 .write()
675 .await
676 .record_cashu_payment_default(peer_id);
677 }
678
679 fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
680 selector.is_peer_blocked_for_payment_defaults(
681 peer_id,
682 self.routing.cashu_payment_default_block_threshold,
683 )
684 }
685
686 pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
688 self.peer_selector
689 .read()
690 .await
691 .export_peer_metadata_snapshot()
692 }
693
694 pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
699 let snapshot = self
700 .peer_selector
701 .read()
702 .await
703 .export_peer_metadata_snapshot();
704 let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
705 StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
706 })?;
707 let snapshot_hash = hashtree_core::sha256(&bytes);
708 let _ = self.local_store.put(snapshot_hash, bytes).await?;
709
710 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
711 let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
712 let _ = self.local_store.delete(&pointer_slot).await?;
713 let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
714
715 Ok(snapshot_hash)
716 }
717
718 pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
720 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
721 let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
722 return Ok(false);
723 };
724 let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
725 StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
726 })?;
727 let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
728
729 let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
730 return Ok(false);
731 };
732 let snapshot: PeerMetadataSnapshot =
733 serde_json::from_slice(&snapshot_bytes).map_err(|e| {
734 StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
735 })?;
736 self.peer_selector
737 .write()
738 .await
739 .import_peer_metadata_snapshot(&snapshot);
740 Ok(true)
741 }
742
743 pub async fn get_with_quote(
748 &self,
749 hash: &Hash,
750 payment_sat: u64,
751 quote_ttl: Duration,
752 ) -> Result<Option<Vec<u8>>, StoreError> {
753 if let Some(data) = self.local_store.get(hash).await? {
754 return Ok(Some(data));
755 }
756 Ok(self
757 .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
758 .await)
759 }
760
761 async fn request_from_peers_with_quote(
762 &self,
763 hash: &Hash,
764 payment_sat: u64,
765 quote_ttl: Duration,
766 ) -> Option<Vec<u8>> {
767 let ordered_peer_ids = self.ordered_connected_peers().await;
768 if ordered_peer_ids.is_empty() {
769 return None;
770 }
771
772 if let Some(quote) = self
773 .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
774 .await
775 {
776 if let Some(data) = self
777 .request_from_single_peer(hash, "e.peer_id, Some(quote.quote_id))
778 .await
779 {
780 return Some(data);
781 }
782 }
783
784 self.request_from_ordered_peers(hash, &ordered_peer_ids)
785 .await
786 }
787
788 async fn request_quote_from_peers(
789 &self,
790 hash: &Hash,
791 payment_sat: u64,
792 quote_ttl: Duration,
793 ordered_peer_ids: &[String],
794 ) -> Option<NegotiatedQuote> {
795 if ordered_peer_ids.is_empty() {
796 return None;
797 }
798 let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
799 if ttl_ms == 0 {
800 return None;
801 }
802 let requested_mint = self.requested_quote_mint().map(str::to_string);
803
804 let dispatch = normalize_dispatch_config(self.routing.dispatch, ordered_peer_ids.len());
805 let wave_plan = build_hedged_wave_plan(ordered_peer_ids.len(), dispatch);
806 if wave_plan.is_empty() {
807 return None;
808 }
809
810 let hash_key = hash_to_key(hash);
811 let (tx, rx) = oneshot::channel();
812 self.pending_quotes.write().await.insert(
813 hash_key.clone(),
814 PendingQuoteRequest {
815 response_tx: tx,
816 preferred_mint_url: requested_mint.clone(),
817 offered_payment_sat: payment_sat,
818 },
819 );
820
821 let mut sent_total = 0usize;
822 let mut next_peer_idx = 0usize;
823 let mut rx = rx;
824 let deadline = Instant::now() + self.request_timeout;
825
826 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
827 let from = next_peer_idx;
828 let to = (next_peer_idx + wave_size).min(ordered_peer_ids.len());
829 for peer_id in &ordered_peer_ids[from..to] {
830 if self
831 .send_quote_request_to_peer(
832 peer_id,
833 hash,
834 payment_sat,
835 ttl_ms,
836 requested_mint.as_deref(),
837 )
838 .await
839 {
840 sent_total += 1;
841 }
842 }
843 next_peer_idx = to;
844
845 if sent_total == 0 {
846 if next_peer_idx >= ordered_peer_ids.len() {
847 break;
848 }
849 continue;
850 }
851
852 let now = Instant::now();
853 if now >= deadline {
854 break;
855 }
856 let remaining = deadline.saturating_duration_since(now);
857 let is_last_wave =
858 wave_idx + 1 == wave_plan.len() || next_peer_idx >= ordered_peer_ids.len();
859 let wait = if is_last_wave {
860 remaining
861 } else if dispatch.hedge_interval_ms == 0 {
862 Duration::ZERO
863 } else {
864 Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
865 };
866
867 if wait.is_zero() {
868 continue;
869 }
870
871 match tokio::time::timeout(wait, &mut rx).await {
872 Ok(Ok(Some(quote))) => {
873 let _ = self.pending_quotes.write().await.remove(&hash_key);
874 return Some(quote);
875 }
876 Ok(Ok(None)) => break,
877 Ok(Err(_)) => break,
878 Err(_) => {}
879 }
880 }
881
882 let _ = self.pending_quotes.write().await.remove(&hash_key);
883 None
884 }
885
886 async fn request_from_single_peer(
887 &self,
888 hash: &Hash,
889 peer_id: &str,
890 quote_id: Option<u64>,
891 ) -> Option<Vec<u8>> {
892 let hash_key = hash_to_key(hash);
893 let (tx, rx) = oneshot::channel();
894 self.pending_requests.write().await.insert(
895 hash_key.clone(),
896 PendingRequest {
897 response_tx: tx,
898 started_at: Instant::now(),
899 queried_peers: vec![peer_id.to_string()],
900 },
901 );
902
903 let mut rx = rx;
904 if !self.send_request_to_peer(peer_id, hash, quote_id).await {
905 let _ = self.pending_requests.write().await.remove(&hash_key);
906 return None;
907 }
908
909 if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
910 if hashtree_core::sha256(&data) == *hash {
911 let _ = self.local_store.put(*hash, data.clone()).await;
912 return Some(data);
913 }
914 }
915
916 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
917 for peer_id in pending.queried_peers {
918 self.peer_selector.write().await.record_timeout(&peer_id);
919 }
920 }
921 None
922 }
923
924 async fn request_from_ordered_peers(
925 &self,
926 hash: &Hash,
927 ordered_peer_ids: &[String],
928 ) -> Option<Vec<u8>> {
929 let dispatch = normalize_dispatch_config(self.routing.dispatch, ordered_peer_ids.len());
930 let wave_plan = build_hedged_wave_plan(ordered_peer_ids.len(), dispatch);
931 if wave_plan.is_empty() {
932 return None;
933 }
934
935 let hash_key = hash_to_key(hash);
936 let (tx, rx) = oneshot::channel();
937 self.pending_requests.write().await.insert(
938 hash_key.clone(),
939 PendingRequest {
940 response_tx: tx,
941 started_at: Instant::now(),
942 queried_peers: Vec::new(),
943 },
944 );
945
946 let mut sent_total = 0usize;
947 let mut next_peer_idx = 0usize;
948 let mut rx = rx;
949 let deadline = Instant::now() + self.request_timeout;
950
951 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
952 let from = next_peer_idx;
953 let to = (next_peer_idx + wave_size).min(ordered_peer_ids.len());
954 for peer_id in &ordered_peer_ids[from..to] {
955 if self.send_request_to_peer(peer_id, hash, None).await {
956 sent_total += 1;
957 if let Some(pending) = self.pending_requests.write().await.get_mut(&hash_key) {
958 pending.queried_peers.push(peer_id.clone());
959 }
960 }
961 }
962 next_peer_idx = to;
963
964 if sent_total == 0 {
965 if next_peer_idx >= ordered_peer_ids.len() {
966 break;
967 }
968 continue;
969 }
970
971 let now = Instant::now();
972 if now >= deadline {
973 break;
974 }
975 let remaining = deadline.saturating_duration_since(now);
976 let is_last_wave =
977 wave_idx + 1 == wave_plan.len() || next_peer_idx >= ordered_peer_ids.len();
978 let wait = if is_last_wave {
979 remaining
980 } else if dispatch.hedge_interval_ms == 0 {
981 Duration::ZERO
982 } else {
983 Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
984 };
985
986 if wait.is_zero() {
987 continue;
988 }
989
990 match tokio::time::timeout(wait, &mut rx).await {
991 Ok(Ok(Some(data))) => {
992 if hashtree_core::sha256(&data) == *hash {
993 let _ = self.local_store.put(*hash, data.clone()).await;
994 return Some(data);
995 }
996 }
997 Ok(Ok(None)) => break,
998 Ok(Err(_)) => break,
999 Err(_) => {
1000 }
1002 }
1003 }
1004
1005 if sent_total == 0 {
1006 let _ = self.pending_requests.write().await.remove(&hash_key);
1007 return None;
1008 }
1009
1010 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1011 for peer_id in pending.queried_peers {
1012 self.peer_selector.write().await.record_timeout(&peer_id);
1013 }
1014 }
1015 None
1016 }
1017
1018 async fn request_from_peers(&self, hash: &Hash) -> Option<Vec<u8>> {
1020 let ordered_peer_ids = self.ordered_connected_peers().await;
1021 if ordered_peer_ids.is_empty() {
1022 return None;
1023 }
1024 self.request_from_ordered_peers(hash, &ordered_peer_ids)
1025 .await
1026 }
1027
1028 async fn complete_pending_response(&self, from_peer: &str, hash_key: String, payload: Vec<u8>) {
1029 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1030 let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
1031 self.peer_selector.write().await.record_success(
1032 from_peer,
1033 rtt_ms,
1034 payload.len() as u64,
1035 );
1036 let _ = pending.response_tx.send(Some(payload));
1037 }
1038 }
1039
1040 async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
1041 if !res.a {
1042 return;
1043 }
1044
1045 let Some(quote_id) = res.q else {
1046 return;
1047 };
1048
1049 let hash_key = hash_to_key(&res.h);
1050 let (preferred_mint_url, offered_payment_sat) = {
1051 let pending_quotes = self.pending_quotes.read().await;
1052 let Some(pending) = pending_quotes.get(&hash_key) else {
1053 return;
1054 };
1055 (
1056 pending.preferred_mint_url.clone(),
1057 pending.offered_payment_sat,
1058 )
1059 };
1060 if !self
1061 .should_accept_quote_response(
1062 from_peer,
1063 preferred_mint_url.as_deref(),
1064 offered_payment_sat,
1065 &res,
1066 )
1067 .await
1068 {
1069 return;
1070 }
1071 let mut pending_quotes = self.pending_quotes.write().await;
1072 if let Some(pending) = pending_quotes.remove(&hash_key) {
1073 let _ = pending.response_tx.send(Some(NegotiatedQuote {
1074 peer_id: from_peer.to_string(),
1075 quote_id,
1076 mint_url: res.m,
1077 }));
1078 }
1079 }
1080
1081 async fn handle_response_message(&self, from_peer: &str, res: crate::protocol::DataResponse) {
1082 let hash_key = hash_to_key(&res.h);
1083 let hash = match crate::protocol::bytes_to_hash(&res.h) {
1084 Some(h) => h,
1085 None => return,
1086 };
1087
1088 if hashtree_core::sha256(&res.d) != hash {
1090 self.peer_selector.write().await.record_failure(from_peer);
1091 if self.debug {
1092 println!("[GenericStore] Ignoring invalid response payload for {hash_key}");
1093 }
1094 return;
1095 }
1096
1097 self.complete_pending_response(from_peer, hash_key, res.d)
1098 .await;
1099 }
1100
1101 async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
1102 let hash = match crate::protocol::bytes_to_hash(&req.h) {
1103 Some(h) => h,
1104 None => return,
1105 };
1106 let hash_key = hash_to_key(&hash);
1107
1108 {
1109 let selector = self.peer_selector.read().await;
1110 if self.should_refuse_requests_from_peer(&selector, from_peer) {
1111 if self.debug {
1112 println!(
1113 "[GenericStore] Refusing quote request from delinquent peer {}",
1114 from_peer
1115 );
1116 }
1117 return;
1118 }
1119 }
1120
1121 let chosen_mint = self.choose_quote_mint(req.m.as_deref());
1122 let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
1123 && !self.should_drop_response(&hash)
1124 && !self.should_corrupt_response(&hash);
1125
1126 let res = if can_serve {
1127 let quote_id = self
1128 .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
1129 .await;
1130 create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
1131 } else {
1132 create_quote_response_unavailable(&hash)
1133 };
1134 let response_bytes = encode_quote_response(&res);
1135 if let Some(channel) = self.signaling.get_channel(from_peer).await {
1136 let _ = channel.send(response_bytes).await;
1137 }
1138 }
1139
1140 async fn handle_request_message(&self, from_peer: &str, req: crate::protocol::DataRequest) {
1141 let hash = match crate::protocol::bytes_to_hash(&req.h) {
1142 Some(h) => h,
1143 None => return,
1144 };
1145 let hash_key = hash_to_key(&hash);
1146
1147 {
1148 let selector = self.peer_selector.read().await;
1149 if self.should_refuse_requests_from_peer(&selector, from_peer) {
1150 if self.debug {
1151 println!(
1152 "[GenericStore] Refusing request from delinquent peer {}",
1153 from_peer
1154 );
1155 }
1156 return;
1157 }
1158 }
1159
1160 if let Some(quote_id) = req.q {
1161 if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
1162 if self.debug {
1163 println!(
1164 "[GenericStore] Refusing request with invalid or expired quote {} from {}",
1165 quote_id, from_peer
1166 );
1167 }
1168 return;
1169 }
1170 }
1171
1172 if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
1174 if self.should_drop_response(&hash) {
1175 if self.debug {
1176 println!(
1177 "[GenericStore] Dropping response for {} due to actor profile",
1178 hash_to_key(&hash)
1179 );
1180 }
1181 return;
1182 }
1183
1184 let behavior = self.response_behavior();
1185 if behavior.extra_delay_ms > 0 {
1186 tokio::time::sleep(Duration::from_millis(behavior.extra_delay_ms)).await;
1187 }
1188
1189 if self.should_corrupt_response(&hash) {
1190 if data.is_empty() {
1191 data.push(0x80);
1192 } else {
1193 data[0] ^= 0x80;
1194 }
1195 }
1196
1197 let res = create_response(&hash, data);
1199 let response_bytes = encode_response(&res);
1200 if let Some(channel) = self.signaling.get_channel(from_peer).await {
1201 let _ = channel.send(response_bytes).await;
1202 }
1203 }
1204 }
1206
1207 pub async fn handle_data_message(&self, from_peer: &str, data: &[u8]) {
1209 let parsed = match parse_message(data) {
1210 Some(m) => m,
1211 None => return,
1212 };
1213
1214 match parsed {
1215 DataMessage::Request(req) => {
1216 self.handle_request_message(from_peer, req).await;
1217 }
1218 DataMessage::Response(res) => {
1219 self.handle_response_message(from_peer, res).await;
1220 }
1221 DataMessage::QuoteRequest(req) => {
1222 self.handle_quote_request_message(from_peer, req).await;
1223 }
1224 DataMessage::QuoteResponse(res) => {
1225 self.handle_quote_response_message(from_peer, res).await;
1226 }
1227 }
1228 }
1229}
1230
1231#[async_trait]
1232impl<S, R, F> Store for GenericStore<S, R, F>
1233where
1234 S: Store + Send + Sync + 'static,
1235 R: SignalingTransport + Send + Sync + 'static,
1236 F: PeerLinkFactory + Send + Sync + 'static,
1237{
1238 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
1239 self.local_store.put(hash, data).await
1240 }
1241
1242 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
1243 if let Some(data) = self.local_store.get(hash).await? {
1245 return Ok(Some(data));
1246 }
1247
1248 Ok(self.request_from_peers(hash).await)
1250 }
1251
1252 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
1253 self.local_store.has(hash).await
1254 }
1255
1256 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
1257 self.local_store.delete(hash).await
1258 }
1259}
1260
1261#[cfg(test)]
1262mod tests {
1263 use super::*;
1264 use hashtree_core::MemoryStore;
1265 use std::sync::Arc;
1266 use std::sync::OnceLock;
1267 use std::time::Duration;
1268
1269 type TestStore = GenericStore<
1270 MemoryStore,
1271 crate::mock::MockRelayTransport,
1272 crate::mock::MockConnectionFactory,
1273 >;
1274
1275 struct TestNode {
1276 store: Arc<TestStore>,
1277 local_store: Arc<MemoryStore>,
1278 transport: Arc<crate::mock::MockRelayTransport>,
1279 }
1280
1281 fn mock_network_lock() -> &'static tokio::sync::Mutex<()> {
1282 static LOCK: OnceLock<tokio::sync::Mutex<()>> = OnceLock::new();
1283 LOCK.get_or_init(|| tokio::sync::Mutex::new(()))
1284 }
1285
1286 fn make_test_store(local_store: Arc<MemoryStore>, node_id: &str) -> TestStore {
1287 make_test_store_with_routing(local_store, node_id, GenericStoreRoutingConfig::default())
1288 }
1289
1290 fn make_test_store_with_routing(
1291 local_store: Arc<MemoryStore>,
1292 node_id: &str,
1293 routing: GenericStoreRoutingConfig,
1294 ) -> TestStore {
1295 let relay = crate::mock::MockRelay::new();
1296 let transport = Arc::new(relay.create_transport(node_id.to_string(), node_id.to_string()));
1297 let conn_factory = Arc::new(crate::mock::MockConnectionFactory::new(
1298 node_id.to_string(),
1299 0,
1300 ));
1301 let signaling = Arc::new(crate::signaling::MeshRouter::new(
1302 node_id.to_string(),
1303 node_id.to_string(),
1304 transport,
1305 conn_factory,
1306 crate::types::PoolSettings::default(),
1307 false,
1308 ));
1309
1310 TestStore::new_with_routing(
1311 local_store,
1312 signaling,
1313 Duration::from_millis(200),
1314 false,
1315 routing,
1316 )
1317 }
1318
1319 fn make_shared_test_node(
1320 relay: Arc<crate::mock::MockRelay>,
1321 node_id: &str,
1322 routing: GenericStoreRoutingConfig,
1323 ) -> TestNode {
1324 let transport = Arc::new(relay.create_transport(node_id.to_string(), node_id.to_string()));
1325 let conn_factory = Arc::new(crate::mock::MockConnectionFactory::new(
1326 node_id.to_string(),
1327 0,
1328 ));
1329 let signaling = Arc::new(crate::signaling::MeshRouter::new(
1330 node_id.to_string(),
1331 node_id.to_string(),
1332 transport.clone(),
1333 conn_factory,
1334 crate::types::PoolSettings::default(),
1335 false,
1336 ));
1337 let local_store = Arc::new(MemoryStore::new());
1338 let store = Arc::new(TestStore::new_with_routing(
1339 local_store.clone(),
1340 signaling,
1341 Duration::from_millis(120),
1342 false,
1343 routing,
1344 ));
1345
1346 TestNode {
1347 store,
1348 local_store,
1349 transport,
1350 }
1351 }
1352
1353 async fn pump_test_signaling(nodes: &[&TestNode]) -> usize {
1354 let mut processed = 0usize;
1355 for node in nodes {
1356 while let Some(msg) = node.transport.try_recv() {
1357 node.store
1358 .process_signaling(msg)
1359 .await
1360 .expect("process signaling");
1361 processed += 1;
1362 }
1363 }
1364 processed
1365 }
1366
1367 async fn pump_test_data(nodes: &[&TestNode]) -> usize {
1368 let mut processed = 0usize;
1369 for node in nodes {
1370 let peer_ids = node.store.signaling().peer_ids().await;
1371 for peer_id in peer_ids {
1372 let Some(channel) = node.store.signaling().get_channel(&peer_id).await else {
1373 continue;
1374 };
1375 while let Some(data) = channel.try_recv() {
1376 node.store.handle_data_message(&peer_id, &data).await;
1377 processed += 1;
1378 }
1379 }
1380 }
1381 processed
1382 }
1383
1384 async fn pump_test_network(nodes: &[&TestNode], max_steps: usize) {
1385 for _ in 0..max_steps {
1386 let signaling = pump_test_signaling(nodes).await;
1387 let data = pump_test_data(nodes).await;
1388 if signaling + data == 0 {
1389 tokio::task::yield_now().await;
1390 }
1391 }
1392 }
1393
1394 async fn run_get_with_pumps(
1395 requester: Arc<TestStore>,
1396 hash: Hash,
1397 nodes: &[&TestNode],
1398 ) -> Option<Vec<u8>> {
1399 let task = tokio::spawn(async move { requester.get(&hash).await.ok().flatten() });
1400 let started = Instant::now();
1401
1402 loop {
1403 if task.is_finished() {
1404 return task.await.expect("request task join");
1405 }
1406
1407 if started.elapsed() > Duration::from_secs(1) {
1408 task.abort();
1409 return None;
1410 }
1411
1412 pump_test_network(nodes, 4).await;
1413 }
1414 }
1415
1416 async fn run_bad_peer_series(strategy: SelectionStrategy) -> usize {
1417 let _guard = mock_network_lock().lock().await;
1418 crate::mock::clear_channel_registry().await;
1419
1420 let relay = crate::mock::MockRelay::new();
1421 let requester = make_shared_test_node(
1422 relay.clone(),
1423 "requester-reject",
1424 GenericStoreRoutingConfig {
1425 selection_strategy: strategy,
1426 fairness_enabled: false,
1427 dispatch: RequestDispatchConfig {
1428 initial_fanout: 1,
1429 hedge_fanout: 1,
1430 max_fanout: 1,
1431 hedge_interval_ms: 5,
1432 },
1433 ..Default::default()
1434 },
1435 );
1436 let bad = make_shared_test_node(
1437 relay.clone(),
1438 "a-bad",
1439 GenericStoreRoutingConfig {
1440 response_behavior: ResponseBehaviorConfig {
1441 drop_response_prob: 1.0,
1442 ..Default::default()
1443 },
1444 ..Default::default()
1445 },
1446 );
1447 let honest = make_shared_test_node(relay, "b-honest", GenericStoreRoutingConfig::default());
1448 let nodes = [&requester, &bad, &honest];
1449
1450 for node in &nodes {
1451 node.transport
1452 .connect(&[])
1453 .await
1454 .expect("connect transport");
1455 node.store.start().await.expect("start store");
1456 }
1457 pump_test_network(&nodes, 24).await;
1458
1459 let mut successes = 0usize;
1460 for round in 0..6 {
1461 let payload = format!("payload-{round}").into_bytes();
1462 let hash = hashtree_core::sha256(&payload);
1463 let _ = bad.local_store.put(hash, payload.clone()).await;
1464 let _ = honest.local_store.put(hash, payload.clone()).await;
1465
1466 let result = run_get_with_pumps(requester.store.clone(), hash, &nodes).await;
1467 if result.as_ref() == Some(&payload) {
1468 successes += 1;
1469 }
1470 }
1471
1472 crate::mock::clear_channel_registry().await;
1473 successes
1474 }
1475
1476 #[test]
1477 fn test_hedged_wave_plan_flood_all() {
1478 let plan = build_hedged_wave_plan(7, RequestDispatchConfig::default());
1479 assert_eq!(plan, vec![7]);
1480 }
1481
1482 #[test]
1483 fn test_hedged_wave_plan_staged() {
1484 let plan = build_hedged_wave_plan(
1485 10,
1486 RequestDispatchConfig {
1487 initial_fanout: 2,
1488 hedge_fanout: 3,
1489 max_fanout: 8,
1490 hedge_interval_ms: 25,
1491 },
1492 );
1493 assert_eq!(plan, vec![2, 3, 3]);
1494 }
1495
1496 #[test]
1497 fn test_response_behavior_normalization_clamps_probs() {
1498 let raw = ResponseBehaviorConfig {
1499 drop_response_prob: -1.5,
1500 corrupt_response_prob: 9.0,
1501 extra_delay_ms: 12,
1502 };
1503 let normalized = raw.normalized();
1504 assert_eq!(normalized.drop_response_prob, 0.0);
1505 assert_eq!(normalized.corrupt_response_prob, 1.0);
1506 assert_eq!(normalized.extra_delay_ms, 12);
1507 }
1508
1509 #[test]
1510 fn test_actor_draw_is_deterministic_per_peer_hash_and_salt() {
1511 let hash = hashtree_core::sha256(b"deterministic");
1512 let a = TestStore::deterministic_actor_draw_for("peer-a", &hash, 7);
1513 let b = TestStore::deterministic_actor_draw_for("peer-a", &hash, 7);
1514 assert!((a - b).abs() < f64::EPSILON);
1515 }
1516
1517 #[tokio::test]
1518 async fn test_load_peer_metadata_returns_false_when_missing() {
1519 let local_store = Arc::new(MemoryStore::new());
1520 let store = make_test_store(local_store, "0");
1521 assert!(!store.load_peer_metadata().await.expect("load result"));
1522 }
1523
1524 #[tokio::test]
1525 async fn test_persist_and_load_peer_metadata_with_existing_store_adapter() {
1526 let local_store = Arc::new(MemoryStore::new());
1527 let writer = make_test_store(local_store.clone(), "0");
1528 {
1529 let mut selector = writer.peer_selector.write().await;
1530 selector.add_peer("npub1stable:session-a");
1531 selector.record_request("npub1stable:session-a", 64);
1532 selector.record_success("npub1stable:session-a", 35, 1024);
1533 selector.record_cashu_payment("npub1stable:session-a", 120);
1534 selector.record_cashu_receipt("npub1stable:session-a", 40);
1535 selector.record_cashu_payment_default("npub1stable:session-a");
1536 }
1537
1538 let snapshot_hash = writer
1539 .persist_peer_metadata()
1540 .await
1541 .expect("persist peer metadata");
1542 assert!(local_store
1543 .get(&snapshot_hash)
1544 .await
1545 .expect("snapshot lookup")
1546 .is_some());
1547
1548 let reader = make_test_store(local_store, "1");
1549 assert!(reader
1550 .load_peer_metadata()
1551 .await
1552 .expect("load peer metadata snapshot"));
1553
1554 let mut selector = reader.peer_selector.write().await;
1555 selector.add_peer("npub1stable:session-b");
1556 let stats = selector
1557 .get_stats("npub1stable:session-b")
1558 .expect("restored peer stats");
1559 assert_eq!(stats.requests_sent, 1);
1560 assert_eq!(stats.successes, 1);
1561 assert_eq!(stats.cashu_paid_sat, 120);
1562 assert_eq!(stats.cashu_received_sat, 40);
1563 assert_eq!(stats.cashu_payment_receipts, 1);
1564 assert_eq!(stats.cashu_payment_defaults, 1);
1565 }
1566
1567 #[tokio::test]
1568 async fn test_should_refuse_requests_from_peer_after_payment_defaults() {
1569 let local_store = Arc::new(MemoryStore::new());
1570 let store = make_test_store_with_routing(
1571 local_store,
1572 "0",
1573 GenericStoreRoutingConfig {
1574 cashu_payment_default_block_threshold: 1,
1575 ..Default::default()
1576 },
1577 );
1578 store.record_cashu_payment_default_from_peer("peer-a").await;
1579
1580 let selector = store.peer_selector.read().await;
1581 assert!(store.should_refuse_requests_from_peer(&selector, "peer-a"));
1582 assert!(!store.should_refuse_requests_from_peer(&selector, "peer-b"));
1583 }
1584
1585 #[tokio::test]
1586 async fn test_take_valid_quote_consumes_once_and_rejects_expired_quotes() {
1587 let local_store = Arc::new(MemoryStore::new());
1588 let store = make_test_store(local_store, "0");
1589 let hash = hashtree_core::sha256(b"quote-test");
1590 let hash_key = hash_to_key(&hash);
1591
1592 {
1593 let mut issued = store.issued_quotes.write().await;
1594 issued.insert(
1595 ("peer-a".to_string(), hash_key.clone(), 11),
1596 IssuedQuote {
1597 expires_at: Instant::now() + Duration::from_secs(1),
1598 payment_sat: 5,
1599 mint_url: Some("https://mint-a.example".to_string()),
1600 },
1601 );
1602 issued.insert(
1603 ("peer-a".to_string(), hash_key.clone(), 12),
1604 IssuedQuote {
1605 expires_at: Instant::now() - Duration::from_millis(1),
1606 payment_sat: 5,
1607 mint_url: Some("https://mint-a.example".to_string()),
1608 },
1609 );
1610 }
1611
1612 assert!(store.take_valid_quote("peer-a", &hash_key, 11).await);
1613 assert!(!store.take_valid_quote("peer-a", &hash_key, 11).await);
1614 assert!(!store.take_valid_quote("peer-a", &hash_key, 12).await);
1615 }
1616
1617 async fn run_quote_with_pumps(
1618 requester: Arc<TestStore>,
1619 hash: Hash,
1620 payment_sat: u64,
1621 quote_ttl: Duration,
1622 peer_ids: Vec<String>,
1623 nodes: &[&TestNode],
1624 ) -> Option<NegotiatedQuote> {
1625 let task = tokio::spawn(async move {
1626 requester
1627 .request_quote_from_peers(&hash, payment_sat, quote_ttl, &peer_ids)
1628 .await
1629 });
1630 let started = Instant::now();
1631
1632 loop {
1633 if task.is_finished() {
1634 return task.await.expect("quote task join");
1635 }
1636 if started.elapsed() > Duration::from_secs(1) {
1637 task.abort();
1638 return None;
1639 }
1640 pump_test_network(nodes, 4).await;
1641 }
1642 }
1643
1644 #[tokio::test]
1645 async fn test_request_quote_from_peers_rejects_unaccepted_mint() {
1646 let _guard = mock_network_lock().lock().await;
1647 crate::mock::clear_channel_registry().await;
1648
1649 let relay = crate::mock::MockRelay::new();
1650 let requester = make_shared_test_node(
1651 relay.clone(),
1652 "requester-reject",
1653 GenericStoreRoutingConfig {
1654 cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1655 cashu_default_mint: Some("https://mint-a.example".to_string()),
1656 dispatch: RequestDispatchConfig {
1657 initial_fanout: 1,
1658 hedge_fanout: 1,
1659 max_fanout: 1,
1660 hedge_interval_ms: 5,
1661 },
1662 ..Default::default()
1663 },
1664 );
1665 let provider = make_shared_test_node(
1666 relay,
1667 "provider-reject",
1668 GenericStoreRoutingConfig {
1669 cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1670 ..Default::default()
1671 },
1672 );
1673 let nodes = [&requester, &provider];
1674
1675 requester.transport.connect(&[]).await.expect("connect");
1676 provider.transport.connect(&[]).await.expect("connect");
1677 requester.store.start().await.expect("start");
1678 provider.store.start().await.expect("start");
1679 pump_test_network(&nodes, 24).await;
1680
1681 let payload = b"quoted-data".to_vec();
1682 let hash = hashtree_core::sha256(&payload);
1683 provider.local_store.put(hash, payload).await.expect("put");
1684
1685 let quote = run_quote_with_pumps(
1686 requester.store.clone(),
1687 hash,
1688 9,
1689 Duration::from_millis(80),
1690 vec!["provider-reject".to_string()],
1691 &nodes,
1692 )
1693 .await;
1694 assert!(
1695 quote.is_none(),
1696 "expected quote to be rejected on mint mismatch"
1697 );
1698 }
1699
1700 #[tokio::test]
1701 async fn test_request_quote_from_peers_accepts_small_peer_suggested_mint_under_cap() {
1702 let _guard = mock_network_lock().lock().await;
1703 crate::mock::clear_channel_registry().await;
1704
1705 let relay = crate::mock::MockRelay::new();
1706 let requester = make_shared_test_node(
1707 relay.clone(),
1708 "requester-suggested",
1709 GenericStoreRoutingConfig {
1710 cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1711 cashu_default_mint: Some("https://mint-a.example".to_string()),
1712 cashu_peer_suggested_mint_base_cap_sat: 3,
1713 cashu_peer_suggested_mint_max_cap_sat: 3,
1714 dispatch: RequestDispatchConfig {
1715 initial_fanout: 1,
1716 hedge_fanout: 1,
1717 max_fanout: 1,
1718 hedge_interval_ms: 5,
1719 },
1720 ..Default::default()
1721 },
1722 );
1723 let provider = make_shared_test_node(
1724 relay,
1725 "provider-suggested",
1726 GenericStoreRoutingConfig {
1727 cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1728 cashu_default_mint: Some("https://mint-b.example".to_string()),
1729 ..Default::default()
1730 },
1731 );
1732 let nodes = [&requester, &provider];
1733
1734 requester.transport.connect(&[]).await.expect("connect");
1735 provider.transport.connect(&[]).await.expect("connect");
1736 requester.store.start().await.expect("start");
1737 provider.store.start().await.expect("start");
1738 pump_test_network(&nodes, 24).await;
1739
1740 let payload = b"quoted-data".to_vec();
1741 let hash = hashtree_core::sha256(&payload);
1742 provider.local_store.put(hash, payload).await.expect("put");
1743
1744 let quote = run_quote_with_pumps(
1745 requester.store.clone(),
1746 hash,
1747 3,
1748 Duration::from_millis(80),
1749 vec!["provider-suggested".to_string()],
1750 &nodes,
1751 )
1752 .await
1753 .expect("expected bounded peer-suggested mint quote");
1754 assert_eq!(quote.mint_url.as_deref(), Some("https://mint-b.example"));
1755 }
1756
1757 #[tokio::test]
1758 async fn test_request_quote_from_peers_scales_peer_suggested_mint_cap_with_reputation() {
1759 let _guard = mock_network_lock().lock().await;
1760 crate::mock::clear_channel_registry().await;
1761
1762 let relay = crate::mock::MockRelay::new();
1763 let requester = make_shared_test_node(
1764 relay.clone(),
1765 "requester-reputation",
1766 GenericStoreRoutingConfig {
1767 cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1768 cashu_default_mint: Some("https://mint-a.example".to_string()),
1769 cashu_peer_suggested_mint_base_cap_sat: 1,
1770 cashu_peer_suggested_mint_success_step_sat: 1,
1771 cashu_peer_suggested_mint_receipt_step_sat: 2,
1772 cashu_peer_suggested_mint_max_cap_sat: 5,
1773 dispatch: RequestDispatchConfig {
1774 initial_fanout: 1,
1775 hedge_fanout: 1,
1776 max_fanout: 1,
1777 hedge_interval_ms: 5,
1778 },
1779 ..Default::default()
1780 },
1781 );
1782 let provider = make_shared_test_node(
1783 relay,
1784 "provider-reputation",
1785 GenericStoreRoutingConfig {
1786 cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1787 cashu_default_mint: Some("https://mint-b.example".to_string()),
1788 ..Default::default()
1789 },
1790 );
1791 let nodes = [&requester, &provider];
1792
1793 requester.transport.connect(&[]).await.expect("connect");
1794 provider.transport.connect(&[]).await.expect("connect");
1795 requester.store.start().await.expect("start");
1796 provider.store.start().await.expect("start");
1797 pump_test_network(&nodes, 24).await;
1798
1799 let payload = b"quoted-data".to_vec();
1800 let hash = hashtree_core::sha256(&payload);
1801 provider.local_store.put(hash, payload).await.expect("put");
1802
1803 let quote = run_quote_with_pumps(
1804 requester.store.clone(),
1805 hash,
1806 4,
1807 Duration::from_millis(80),
1808 vec!["provider-reputation".to_string()],
1809 &nodes,
1810 )
1811 .await;
1812 assert!(
1813 quote.is_none(),
1814 "new peer should not get a 4 sat untrusted-mint quote"
1815 );
1816
1817 {
1818 let mut selector = requester.store.peer_selector.write().await;
1819 selector.add_peer("provider-reputation");
1820 selector.record_success("provider-reputation", 20, 1024);
1821 selector.record_cashu_receipt("provider-reputation", 2);
1822 }
1823
1824 let quote = run_quote_with_pumps(
1825 requester.store.clone(),
1826 hash,
1827 4,
1828 Duration::from_millis(80),
1829 vec!["provider-reputation".to_string()],
1830 &nodes,
1831 )
1832 .await
1833 .expect("reputable peer should get larger bounded quote");
1834 assert_eq!(quote.mint_url.as_deref(), Some("https://mint-b.example"));
1835
1836 requester
1837 .store
1838 .record_cashu_payment_default_from_peer("provider-reputation")
1839 .await;
1840 let quote = run_quote_with_pumps(
1841 requester.store.clone(),
1842 hash,
1843 4,
1844 Duration::from_millis(80),
1845 vec!["provider-reputation".to_string()],
1846 &nodes,
1847 )
1848 .await;
1849 assert!(
1850 quote.is_none(),
1851 "peer-suggested mint exposure should drop to zero after defaults exceed receipts"
1852 );
1853 }
1854
1855 #[tokio::test]
1856 async fn test_request_quote_from_peers_returns_matching_mint() {
1857 let _guard = mock_network_lock().lock().await;
1858 crate::mock::clear_channel_registry().await;
1859
1860 let relay = crate::mock::MockRelay::new();
1861 let requester = make_shared_test_node(
1862 relay.clone(),
1863 "requester-match",
1864 GenericStoreRoutingConfig {
1865 cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1866 cashu_default_mint: Some("https://mint-a.example".to_string()),
1867 dispatch: RequestDispatchConfig {
1868 initial_fanout: 1,
1869 hedge_fanout: 1,
1870 max_fanout: 1,
1871 hedge_interval_ms: 5,
1872 },
1873 ..Default::default()
1874 },
1875 );
1876 let provider = make_shared_test_node(
1877 relay,
1878 "provider-match",
1879 GenericStoreRoutingConfig {
1880 cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1881 ..Default::default()
1882 },
1883 );
1884 let nodes = [&requester, &provider];
1885
1886 requester.transport.connect(&[]).await.expect("connect");
1887 provider.transport.connect(&[]).await.expect("connect");
1888 requester.store.start().await.expect("start");
1889 provider.store.start().await.expect("start");
1890 pump_test_network(&nodes, 24).await;
1891
1892 let payload = b"quoted-data".to_vec();
1893 let hash = hashtree_core::sha256(&payload);
1894 provider.local_store.put(hash, payload).await.expect("put");
1895
1896 let quote = run_quote_with_pumps(
1897 requester.store.clone(),
1898 hash,
1899 9,
1900 Duration::from_millis(80),
1901 vec!["provider-match".to_string()],
1902 &nodes,
1903 )
1904 .await
1905 .expect("expected quote");
1906 assert_eq!(quote.mint_url.as_deref(), Some("https://mint-a.example"));
1907 }
1908
1909 #[tokio::test]
1910 async fn test_tit_for_tat_store_path_recovers_after_bad_peer_observation() {
1911 let tit_for_tat_successes = run_bad_peer_series(SelectionStrategy::TitForTat).await;
1912
1913 assert!(
1914 tit_for_tat_successes >= 5,
1915 "expected tit-for-tat path to recover after the first consistently bad peer observation (successes={tit_for_tat_successes})"
1916 );
1917 }
1918}
1919
1920pub type SimStore<S> =
1922 GenericStore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
1923
1924pub type ProductionStore<S> = GenericStore<
1926 S,
1927 crate::nostr::NostrRelayTransport,
1928 crate::real_factory::RealPeerConnectionFactory,
1929>;