1use async_trait::async_trait;
8use std::collections::hash_map::DefaultHasher;
9use std::collections::{HashMap, HashSet};
10use std::future::Future;
11use std::hash::{Hash as _, Hasher};
12use std::ops::Range;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use tokio::sync::{oneshot, Mutex, RwLock};
16
17use hashtree_core::{Hash, Store, StoreError};
18
19use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
20use crate::protocol::{
21 create_quote_request, create_quote_response_available, create_quote_response_unavailable,
22 create_request, create_request_with_quote, create_response, encode_quote_request,
23 encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
24 DataMessage, DataQuoteRequest, DataQuoteResponse,
25};
26use crate::signaling::MeshRouter;
27use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
28use crate::types::{PeerHTLConfig, SignalingMessage, MAX_HTL};
29
30const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
33
34struct PendingRequest {
36 response_tx: oneshot::Sender<Option<Vec<u8>>>,
37 started_at: Instant,
38 queried_peers: Vec<String>,
39}
40
41struct PendingQuoteRequest {
42 response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
43 preferred_mint_url: Option<String>,
44 offered_payment_sat: u64,
45}
46
47#[derive(Debug, Clone)]
48struct NegotiatedQuote {
49 peer_id: String,
50 quote_id: u64,
51 #[allow(dead_code)]
52 mint_url: Option<String>,
53}
54
55struct IssuedQuote {
56 expires_at: Instant,
57 #[allow(dead_code)]
58 payment_sat: u64,
59 #[allow(dead_code)]
60 mint_url: Option<String>,
61}
62
63#[derive(Debug, Clone, Copy)]
69pub struct RequestDispatchConfig {
70 pub initial_fanout: usize,
72 pub hedge_fanout: usize,
74 pub max_fanout: usize,
76 pub hedge_interval_ms: u64,
78}
79
80impl Default for RequestDispatchConfig {
81 fn default() -> Self {
82 Self {
83 initial_fanout: usize::MAX,
84 hedge_fanout: usize::MAX,
85 max_fanout: usize::MAX,
86 hedge_interval_ms: 0,
87 }
88 }
89}
90
91pub fn normalize_dispatch_config(
93 dispatch: RequestDispatchConfig,
94 available_peers: usize,
95) -> RequestDispatchConfig {
96 let mut cfg = dispatch;
97 let cap = if cfg.max_fanout == 0 {
98 available_peers
99 } else {
100 cfg.max_fanout.min(available_peers)
101 };
102 cfg.max_fanout = cap;
103 cfg.initial_fanout = if cfg.initial_fanout == 0 {
104 1
105 } else {
106 cfg.initial_fanout.min(cap.max(1))
107 };
108 cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
109 1
110 } else {
111 cfg.hedge_fanout.min(cap.max(1))
112 };
113 cfg
114}
115
116pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
118 if peer_count == 0 {
119 return Vec::new();
120 }
121 let cap = dispatch.max_fanout.min(peer_count);
122 if cap == 0 {
123 return Vec::new();
124 }
125
126 let mut plan = Vec::new();
127 let mut sent = 0usize;
128 let first = dispatch.initial_fanout.min(cap).max(1);
129 plan.push(first);
130 sent += first;
131
132 while sent < cap {
133 let next = dispatch.hedge_fanout.min(cap - sent).max(1);
134 plan.push(next);
135 sent += next;
136 }
137 plan
138}
139
140#[derive(Debug)]
142pub enum HedgedWaveAction<T> {
143 Continue,
144 Success(T),
145 Abort,
146}
147
148pub async fn run_hedged_waves<T, SendWave, SendWaveFut, WaitWave, WaitWaveFut>(
153 peer_count: usize,
154 dispatch: RequestDispatchConfig,
155 request_timeout: Duration,
156 mut send_wave: SendWave,
157 mut wait_wave: WaitWave,
158) -> Option<T>
159where
160 SendWave: FnMut(Range<usize>) -> SendWaveFut,
161 SendWaveFut: Future<Output = usize>,
162 WaitWave: FnMut(Duration) -> WaitWaveFut,
163 WaitWaveFut: Future<Output = HedgedWaveAction<T>>,
164{
165 let dispatch = normalize_dispatch_config(dispatch, peer_count);
166 let wave_plan = build_hedged_wave_plan(peer_count, dispatch);
167 if wave_plan.is_empty() {
168 return None;
169 }
170
171 let deadline = Instant::now() + request_timeout;
172 let mut sent_total = 0usize;
173 let mut next_peer_idx = 0usize;
174
175 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
176 let from = next_peer_idx;
177 let to = (next_peer_idx + wave_size).min(peer_count);
178 next_peer_idx = to;
179
180 if from == to {
181 continue;
182 }
183
184 sent_total += send_wave(from..to).await;
185 if sent_total == 0 {
186 if next_peer_idx >= peer_count {
187 break;
188 }
189 continue;
190 }
191
192 let now = Instant::now();
193 if now >= deadline {
194 break;
195 }
196 let remaining = deadline.saturating_duration_since(now);
197 let is_last_wave = wave_idx + 1 == wave_plan.len() || next_peer_idx >= peer_count;
198 let wait = if is_last_wave {
199 remaining
200 } else if dispatch.hedge_interval_ms == 0 {
201 Duration::ZERO
202 } else {
203 Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
204 };
205
206 if wait.is_zero() {
207 continue;
208 }
209
210 match wait_wave(wait).await {
211 HedgedWaveAction::Continue => {}
212 HedgedWaveAction::Success(value) => return Some(value),
213 HedgedWaveAction::Abort => break,
214 }
215 }
216
217 None
218}
219
220pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
222 let mut selector = selector.write().await;
223 let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
224 let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
225 for peer_id in known {
226 if !current.contains(peer_id.as_str()) {
227 selector.remove_peer(&peer_id);
228 }
229 }
230 for peer_id in current_peer_ids {
231 selector.add_peer(peer_id.clone());
232 }
233}
234
235#[derive(Debug, Clone, Copy)]
239pub struct ResponseBehaviorConfig {
240 pub drop_response_prob: f64,
242 pub corrupt_response_prob: f64,
244 pub extra_delay_ms: u64,
246}
247
248impl Default for ResponseBehaviorConfig {
249 fn default() -> Self {
250 Self {
251 drop_response_prob: 0.0,
252 corrupt_response_prob: 0.0,
253 extra_delay_ms: 0,
254 }
255 }
256}
257
258impl ResponseBehaviorConfig {
259 fn normalized(self) -> Self {
260 Self {
261 drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
262 corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
263 extra_delay_ms: self.extra_delay_ms,
264 }
265 }
266}
267
268#[derive(Debug, Clone)]
270pub struct GenericStoreRoutingConfig {
271 pub selection_strategy: SelectionStrategy,
272 pub fairness_enabled: bool,
273 pub cashu_payment_weight: f64,
275 pub cashu_payment_default_block_threshold: u64,
278 pub cashu_accepted_mints: Vec<String>,
280 pub cashu_default_mint: Option<String>,
282 pub cashu_peer_suggested_mint_base_cap_sat: u64,
284 pub cashu_peer_suggested_mint_success_step_sat: u64,
286 pub cashu_peer_suggested_mint_receipt_step_sat: u64,
288 pub cashu_peer_suggested_mint_max_cap_sat: u64,
290 pub dispatch: RequestDispatchConfig,
291 pub response_behavior: ResponseBehaviorConfig,
292}
293
294impl Default for GenericStoreRoutingConfig {
295 fn default() -> Self {
296 Self {
297 selection_strategy: SelectionStrategy::Weighted,
298 fairness_enabled: true,
299 cashu_payment_weight: 0.0,
300 cashu_payment_default_block_threshold: 0,
301 cashu_accepted_mints: Vec::new(),
302 cashu_default_mint: None,
303 cashu_peer_suggested_mint_base_cap_sat: 0,
304 cashu_peer_suggested_mint_success_step_sat: 0,
305 cashu_peer_suggested_mint_receipt_step_sat: 0,
306 cashu_peer_suggested_mint_max_cap_sat: 0,
307 dispatch: RequestDispatchConfig::default(),
308 response_behavior: ResponseBehaviorConfig::default(),
309 }
310 }
311}
312
313pub struct GenericStore<S, R, F>
320where
321 S: Store + Send + Sync + 'static,
322 R: SignalingTransport + Send + Sync + 'static,
323 F: PeerLinkFactory + Send + Sync + 'static,
324{
325 local_store: Arc<S>,
327 signaling: Arc<MeshRouter<R, F>>,
329 htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
331 pending_requests: RwLock<HashMap<String, PendingRequest>>,
333 pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
335 issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
337 next_quote_id: RwLock<u64>,
339 peer_selector: RwLock<PeerSelector>,
341 routing: GenericStoreRoutingConfig,
343 request_timeout: Duration,
345 debug: bool,
347 running: RwLock<bool>,
349}
350
351impl<S, R, F> GenericStore<S, R, F>
352where
353 S: Store + Send + Sync + 'static,
354 R: SignalingTransport + Send + Sync + 'static,
355 F: PeerLinkFactory + Send + Sync + 'static,
356{
357 pub fn new(
359 local_store: Arc<S>,
360 signaling: Arc<MeshRouter<R, F>>,
361 request_timeout: Duration,
362 debug: bool,
363 ) -> Self {
364 Self::new_with_routing(
365 local_store,
366 signaling,
367 request_timeout,
368 debug,
369 Default::default(),
370 )
371 }
372
373 pub fn new_with_routing(
375 local_store: Arc<S>,
376 signaling: Arc<MeshRouter<R, F>>,
377 request_timeout: Duration,
378 debug: bool,
379 routing: GenericStoreRoutingConfig,
380 ) -> Self {
381 let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
382 selector.set_fairness(routing.fairness_enabled);
383 selector.set_cashu_payment_weight(routing.cashu_payment_weight);
384 Self {
385 local_store,
386 signaling,
387 htl_configs: RwLock::new(HashMap::new()),
388 pending_requests: RwLock::new(HashMap::new()),
389 pending_quotes: RwLock::new(HashMap::new()),
390 issued_quotes: RwLock::new(HashMap::new()),
391 next_quote_id: RwLock::new(1),
392 peer_selector: RwLock::new(selector),
393 routing,
394 request_timeout,
395 debug,
396 running: RwLock::new(false),
397 }
398 }
399
400 pub async fn start(&self) -> Result<(), TransportError> {
402 *self.running.write().await = true;
403
404 self.signaling.send_hello(vec![]).await?;
406
407 Ok(())
408 }
409
410 pub async fn stop(&self) {
412 *self.running.write().await = false;
413 }
414
415 pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
417 let peer_id = msg.peer_id().to_string();
419 {
420 let mut configs = self.htl_configs.write().await;
421 if !configs.contains_key(&peer_id) {
422 configs.insert(peer_id.clone(), PeerHTLConfig::random());
423 }
424 }
425 self.peer_selector.write().await.add_peer(peer_id);
426
427 self.signaling.handle_message(msg).await
428 }
429
430 pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
432 &self.signaling
433 }
434
435 fn response_behavior(&self) -> ResponseBehaviorConfig {
436 self.routing.response_behavior.normalized()
437 }
438
439 fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
440 let mut hasher = DefaultHasher::new();
441 peer_id.hash(&mut hasher);
442 hash.hash(&mut hasher);
443 salt.hash(&mut hasher);
444 let v = hasher.finish();
445 (v as f64) / (u64::MAX as f64)
446 }
447
448 fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
449 Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
450 }
451
452 fn peer_metadata_pointer_slot_hash() -> Hash {
453 hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
454 }
455
456 fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
457 let bytes = hex::decode(hash_hex)
458 .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
459 if bytes.len() != 32 {
460 return Err(StoreError::Other(format!(
461 "Invalid hash length {}, expected 32 bytes",
462 bytes.len()
463 )));
464 }
465 let mut hash = [0u8; 32];
466 hash.copy_from_slice(&bytes);
467 Ok(hash)
468 }
469
470 fn should_drop_response(&self, hash: &Hash) -> bool {
471 let p = self.response_behavior().drop_response_prob;
472 if p <= 0.0 {
473 return false;
474 }
475 self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
476 }
477
478 fn should_corrupt_response(&self, hash: &Hash) -> bool {
479 let p = self.response_behavior().corrupt_response_prob;
480 if p <= 0.0 {
481 return false;
482 }
483 self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
484 }
485
486 async fn ordered_connected_peers(&self) -> Vec<String> {
487 let current_peer_ids = self.signaling.peer_ids().await;
488 if current_peer_ids.is_empty() {
489 return Vec::new();
490 }
491
492 sync_selector_peers(&self.peer_selector, ¤t_peer_ids).await;
493 let current_set: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
494 let mut ordered_peer_ids = self.peer_selector.write().await.select_peers();
495 ordered_peer_ids.retain(|peer_id| current_set.contains(peer_id.as_str()));
496 if ordered_peer_ids.is_empty() {
497 let mut fallback = current_peer_ids;
498 fallback.sort();
499 return fallback;
500 }
501 ordered_peer_ids
502 }
503
504 fn requested_quote_mint(&self) -> Option<&str> {
505 if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
506 if self.routing.cashu_accepted_mints.is_empty()
507 || self
508 .routing
509 .cashu_accepted_mints
510 .iter()
511 .any(|mint| mint == default_mint)
512 {
513 return Some(default_mint);
514 }
515 }
516
517 self.routing
518 .cashu_accepted_mints
519 .first()
520 .map(String::as_str)
521 }
522
523 fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
524 if let Some(requested_mint) = requested_mint {
525 if self.accepts_quote_mint(Some(requested_mint)) {
526 return Some(requested_mint.to_string());
527 }
528 }
529 if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
530 return Some(default_mint.clone());
531 }
532 if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
533 return Some(first_mint.clone());
534 }
535 requested_mint.map(str::to_string)
536 }
537
538 fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
539 if self.routing.cashu_accepted_mints.is_empty() {
540 return true;
541 }
542
543 let Some(mint_url) = mint_url else {
544 return false;
545 };
546 self.routing
547 .cashu_accepted_mints
548 .iter()
549 .any(|mint| mint == mint_url)
550 }
551
552 fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
553 let Some(mint_url) = mint_url else {
554 return self.routing.cashu_default_mint.is_none()
555 && self.routing.cashu_accepted_mints.is_empty();
556 };
557 self.routing.cashu_default_mint.as_deref() == Some(mint_url)
558 || self
559 .routing
560 .cashu_accepted_mints
561 .iter()
562 .any(|mint| mint == mint_url)
563 }
564
565 async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
566 let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
567 if base == 0 {
568 return 0;
569 }
570
571 let selector = self.peer_selector.read().await;
572 let Some(stats) = selector.get_stats(peer_id) else {
573 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
574 return if max_cap > 0 { base.min(max_cap) } else { base };
575 };
576
577 if stats.cashu_payment_defaults > 0
578 && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
579 {
580 return 0;
581 }
582
583 let success_bonus = stats
584 .successes
585 .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
586 let receipt_bonus = stats
587 .cashu_payment_receipts
588 .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
589 let mut cap = base
590 .saturating_add(success_bonus)
591 .saturating_add(receipt_bonus);
592 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
593 if max_cap > 0 {
594 cap = cap.min(max_cap);
595 }
596 cap
597 }
598
599 async fn should_accept_quote_response(
600 &self,
601 from_peer: &str,
602 preferred_mint_url: Option<&str>,
603 offered_payment_sat: u64,
604 res: &DataQuoteResponse,
605 ) -> bool {
606 let Some(payment_sat) = res.p else {
607 return false;
608 };
609 if payment_sat > offered_payment_sat {
610 return false;
611 }
612
613 let response_mint = res.m.as_deref();
614 if response_mint == preferred_mint_url {
615 return true;
616 }
617 if self.trusts_quote_mint(response_mint) {
618 return true;
619 }
620 if response_mint.is_none() {
621 return false;
622 }
623
624 payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
625 }
626
627 async fn issue_quote(
628 &self,
629 peer_id: &str,
630 hash_key: &str,
631 payment_sat: u64,
632 ttl_ms: u32,
633 mint_url: Option<&str>,
634 ) -> u64 {
635 let quote_id = {
636 let mut next = self.next_quote_id.write().await;
637 let quote_id = *next;
638 *next = next.saturating_add(1);
639 quote_id
640 };
641
642 let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
643 self.issued_quotes.write().await.insert(
644 (peer_id.to_string(), hash_key.to_string(), quote_id),
645 IssuedQuote {
646 expires_at,
647 payment_sat,
648 mint_url: mint_url.map(str::to_string),
649 },
650 );
651 quote_id
652 }
653
654 async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
655 let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
656 let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
657 return false;
658 };
659 quote.expires_at > Instant::now()
660 }
661
662 async fn send_request_to_peer(
663 &self,
664 peer_id: &str,
665 hash: &Hash,
666 quote_id: Option<u64>,
667 ) -> bool {
668 let channel = match self.signaling.get_channel(peer_id).await {
669 Some(c) => c,
670 None => return false,
671 };
672
673 let htl_config = {
674 let configs = self.htl_configs.read().await;
675 configs
676 .get(peer_id)
677 .cloned()
678 .unwrap_or_else(PeerHTLConfig::random)
679 };
680
681 let send_htl = htl_config.decrement(MAX_HTL);
682 let req = match quote_id {
683 Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
684 None => create_request(hash, send_htl),
685 };
686 let request_bytes = encode_request(&req);
687
688 {
689 let mut selector = self.peer_selector.write().await;
690 selector.record_request(peer_id, request_bytes.len() as u64);
691 }
692
693 match channel.send(request_bytes).await {
694 Ok(()) => true,
695 Err(_) => {
696 self.peer_selector.write().await.record_failure(peer_id);
697 false
698 }
699 }
700 }
701
702 async fn send_quote_request_to_peer(
703 &self,
704 peer_id: &str,
705 hash: &Hash,
706 payment_sat: u64,
707 ttl_ms: u32,
708 mint_url: Option<&str>,
709 ) -> bool {
710 let channel = match self.signaling.get_channel(peer_id).await {
711 Some(c) => c,
712 None => return false,
713 };
714
715 let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
716 let request_bytes = encode_quote_request(&req);
717
718 match channel.send(request_bytes).await {
719 Ok(()) => true,
720 Err(_) => false,
721 }
722 }
723
724 pub async fn peer_count(&self) -> usize {
726 self.signaling.peer_count().await
727 }
728
729 pub async fn needs_peers(&self) -> bool {
731 self.signaling.needs_peers().await
732 }
733
734 pub async fn send_hello(&self) -> Result<(), TransportError> {
736 self.signaling.send_hello(vec![]).await
737 }
738
739 pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
741 self.peer_selector
742 .write()
743 .await
744 .record_cashu_payment(peer_id, amount_sat);
745 }
746
747 pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
749 self.peer_selector
750 .write()
751 .await
752 .record_cashu_receipt(peer_id, amount_sat);
753 }
754
755 pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
757 self.peer_selector
758 .write()
759 .await
760 .record_cashu_payment_default(peer_id);
761 }
762
763 fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
764 selector.is_peer_blocked_for_payment_defaults(
765 peer_id,
766 self.routing.cashu_payment_default_block_threshold,
767 )
768 }
769
770 pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
772 self.peer_selector
773 .read()
774 .await
775 .export_peer_metadata_snapshot()
776 }
777
778 pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
783 let snapshot = self
784 .peer_selector
785 .read()
786 .await
787 .export_peer_metadata_snapshot();
788 let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
789 StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
790 })?;
791 let snapshot_hash = hashtree_core::sha256(&bytes);
792 let _ = self.local_store.put(snapshot_hash, bytes).await?;
793
794 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
795 let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
796 let _ = self.local_store.delete(&pointer_slot).await?;
797 let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
798
799 Ok(snapshot_hash)
800 }
801
802 pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
804 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
805 let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
806 return Ok(false);
807 };
808 let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
809 StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
810 })?;
811 let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
812
813 let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
814 return Ok(false);
815 };
816 let snapshot: PeerMetadataSnapshot =
817 serde_json::from_slice(&snapshot_bytes).map_err(|e| {
818 StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
819 })?;
820 self.peer_selector
821 .write()
822 .await
823 .import_peer_metadata_snapshot(&snapshot);
824 Ok(true)
825 }
826
827 pub async fn get_with_quote(
832 &self,
833 hash: &Hash,
834 payment_sat: u64,
835 quote_ttl: Duration,
836 ) -> Result<Option<Vec<u8>>, StoreError> {
837 if let Some(data) = self.local_store.get(hash).await? {
838 return Ok(Some(data));
839 }
840 Ok(self
841 .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
842 .await)
843 }
844
845 async fn request_from_peers_with_quote(
846 &self,
847 hash: &Hash,
848 payment_sat: u64,
849 quote_ttl: Duration,
850 ) -> Option<Vec<u8>> {
851 let ordered_peer_ids = self.ordered_connected_peers().await;
852 if ordered_peer_ids.is_empty() {
853 return None;
854 }
855
856 if let Some(quote) = self
857 .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
858 .await
859 {
860 if let Some(data) = self
861 .request_from_single_peer(hash, "e.peer_id, Some(quote.quote_id))
862 .await
863 {
864 return Some(data);
865 }
866 }
867
868 self.request_from_ordered_peers(hash, &ordered_peer_ids)
869 .await
870 }
871
872 async fn request_quote_from_peers(
873 &self,
874 hash: &Hash,
875 payment_sat: u64,
876 quote_ttl: Duration,
877 ordered_peer_ids: &[String],
878 ) -> Option<NegotiatedQuote> {
879 if ordered_peer_ids.is_empty() {
880 return None;
881 }
882 let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
883 if ttl_ms == 0 {
884 return None;
885 }
886 let requested_mint = self.requested_quote_mint().map(str::to_string);
887
888 let hash_key = hash_to_key(hash);
889 let (tx, rx) = oneshot::channel();
890 self.pending_quotes.write().await.insert(
891 hash_key.clone(),
892 PendingQuoteRequest {
893 response_tx: tx,
894 preferred_mint_url: requested_mint.clone(),
895 offered_payment_sat: payment_sat,
896 },
897 );
898
899 let rx = Arc::new(Mutex::new(rx));
900 let result = run_hedged_waves(
901 ordered_peer_ids.len(),
902 self.routing.dispatch,
903 self.request_timeout,
904 |range| {
905 let wave_peer_ids = ordered_peer_ids[range].to_vec();
906 let requested_mint = requested_mint.clone();
907 let hash = *hash;
908 async move {
909 let mut sent = 0usize;
910 for peer_id in wave_peer_ids {
911 if self
912 .send_quote_request_to_peer(
913 &peer_id,
914 &hash,
915 payment_sat,
916 ttl_ms,
917 requested_mint.as_deref(),
918 )
919 .await
920 {
921 sent += 1;
922 }
923 }
924 sent
925 }
926 },
927 |wait| {
928 let rx = rx.clone();
929 async move {
930 let mut rx = rx.lock().await;
931 match tokio::time::timeout(wait, &mut *rx).await {
932 Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
933 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
934 Err(_) => HedgedWaveAction::Continue,
935 }
936 }
937 },
938 )
939 .await;
940 let _ = self.pending_quotes.write().await.remove(&hash_key);
941 result
942 }
943
944 async fn request_from_single_peer(
945 &self,
946 hash: &Hash,
947 peer_id: &str,
948 quote_id: Option<u64>,
949 ) -> Option<Vec<u8>> {
950 let hash_key = hash_to_key(hash);
951 let (tx, rx) = oneshot::channel();
952 self.pending_requests.write().await.insert(
953 hash_key.clone(),
954 PendingRequest {
955 response_tx: tx,
956 started_at: Instant::now(),
957 queried_peers: vec![peer_id.to_string()],
958 },
959 );
960
961 let mut rx = rx;
962 if !self.send_request_to_peer(peer_id, hash, quote_id).await {
963 let _ = self.pending_requests.write().await.remove(&hash_key);
964 return None;
965 }
966
967 if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
968 if hashtree_core::sha256(&data) == *hash {
969 let _ = self.local_store.put(*hash, data.clone()).await;
970 return Some(data);
971 }
972 }
973
974 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
975 for peer_id in pending.queried_peers {
976 self.peer_selector.write().await.record_timeout(&peer_id);
977 }
978 }
979 None
980 }
981
982 async fn request_from_ordered_peers(
983 &self,
984 hash: &Hash,
985 ordered_peer_ids: &[String],
986 ) -> Option<Vec<u8>> {
987 let hash_key = hash_to_key(hash);
988 let (tx, rx) = oneshot::channel();
989 self.pending_requests.write().await.insert(
990 hash_key.clone(),
991 PendingRequest {
992 response_tx: tx,
993 started_at: Instant::now(),
994 queried_peers: Vec::new(),
995 },
996 );
997
998 let rx = Arc::new(Mutex::new(rx));
999 let result = run_hedged_waves(
1000 ordered_peer_ids.len(),
1001 self.routing.dispatch,
1002 self.request_timeout,
1003 |range| {
1004 let wave_peer_ids = ordered_peer_ids[range].to_vec();
1005 let hash = *hash;
1006 let hash_key = hash_key.clone();
1007 async move {
1008 let mut sent = 0usize;
1009 for peer_id in wave_peer_ids {
1010 if self.send_request_to_peer(&peer_id, &hash, None).await {
1011 sent += 1;
1012 if let Some(pending) =
1013 self.pending_requests.write().await.get_mut(&hash_key)
1014 {
1015 pending.queried_peers.push(peer_id);
1016 }
1017 }
1018 }
1019 sent
1020 }
1021 },
1022 |wait| {
1023 let rx = rx.clone();
1024 async move {
1025 let mut rx = rx.lock().await;
1026 match tokio::time::timeout(wait, &mut *rx).await {
1027 Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == *hash => {
1028 HedgedWaveAction::Success(data)
1029 }
1030 Ok(Ok(Some(_))) => HedgedWaveAction::Continue,
1031 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1032 Err(_) => HedgedWaveAction::Continue,
1033 }
1034 }
1035 },
1036 )
1037 .await;
1038
1039 let Some(data) = result else {
1040 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1041 for peer_id in pending.queried_peers {
1042 self.peer_selector.write().await.record_timeout(&peer_id);
1043 }
1044 }
1045 return None;
1046 };
1047
1048 let _ = self.local_store.put(*hash, data.clone()).await;
1049 Some(data)
1050 }
1051
1052 async fn request_from_peers(&self, hash: &Hash) -> Option<Vec<u8>> {
1054 let ordered_peer_ids = self.ordered_connected_peers().await;
1055 if ordered_peer_ids.is_empty() {
1056 return None;
1057 }
1058 self.request_from_ordered_peers(hash, &ordered_peer_ids)
1059 .await
1060 }
1061
1062 async fn complete_pending_response(&self, from_peer: &str, hash_key: String, payload: Vec<u8>) {
1063 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1064 let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
1065 self.peer_selector.write().await.record_success(
1066 from_peer,
1067 rtt_ms,
1068 payload.len() as u64,
1069 );
1070 let _ = pending.response_tx.send(Some(payload));
1071 }
1072 }
1073
1074 async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
1075 if !res.a {
1076 return;
1077 }
1078
1079 let Some(quote_id) = res.q else {
1080 return;
1081 };
1082
1083 let hash_key = hash_to_key(&res.h);
1084 let (preferred_mint_url, offered_payment_sat) = {
1085 let pending_quotes = self.pending_quotes.read().await;
1086 let Some(pending) = pending_quotes.get(&hash_key) else {
1087 return;
1088 };
1089 (
1090 pending.preferred_mint_url.clone(),
1091 pending.offered_payment_sat,
1092 )
1093 };
1094 if !self
1095 .should_accept_quote_response(
1096 from_peer,
1097 preferred_mint_url.as_deref(),
1098 offered_payment_sat,
1099 &res,
1100 )
1101 .await
1102 {
1103 return;
1104 }
1105 let mut pending_quotes = self.pending_quotes.write().await;
1106 if let Some(pending) = pending_quotes.remove(&hash_key) {
1107 let _ = pending.response_tx.send(Some(NegotiatedQuote {
1108 peer_id: from_peer.to_string(),
1109 quote_id,
1110 mint_url: res.m,
1111 }));
1112 }
1113 }
1114
1115 async fn handle_response_message(&self, from_peer: &str, res: crate::protocol::DataResponse) {
1116 let hash_key = hash_to_key(&res.h);
1117 let hash = match crate::protocol::bytes_to_hash(&res.h) {
1118 Some(h) => h,
1119 None => return,
1120 };
1121
1122 if hashtree_core::sha256(&res.d) != hash {
1124 self.peer_selector.write().await.record_failure(from_peer);
1125 if self.debug {
1126 println!("[GenericStore] Ignoring invalid response payload for {hash_key}");
1127 }
1128 return;
1129 }
1130
1131 self.complete_pending_response(from_peer, hash_key, res.d)
1132 .await;
1133 }
1134
1135 async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
1136 let hash = match crate::protocol::bytes_to_hash(&req.h) {
1137 Some(h) => h,
1138 None => return,
1139 };
1140 let hash_key = hash_to_key(&hash);
1141
1142 {
1143 let selector = self.peer_selector.read().await;
1144 if self.should_refuse_requests_from_peer(&selector, from_peer) {
1145 if self.debug {
1146 println!(
1147 "[GenericStore] Refusing quote request from delinquent peer {}",
1148 from_peer
1149 );
1150 }
1151 return;
1152 }
1153 }
1154
1155 let chosen_mint = self.choose_quote_mint(req.m.as_deref());
1156 let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
1157 && !self.should_drop_response(&hash)
1158 && !self.should_corrupt_response(&hash);
1159
1160 let res = if can_serve {
1161 let quote_id = self
1162 .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
1163 .await;
1164 create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
1165 } else {
1166 create_quote_response_unavailable(&hash)
1167 };
1168 let response_bytes = encode_quote_response(&res);
1169 if let Some(channel) = self.signaling.get_channel(from_peer).await {
1170 let _ = channel.send(response_bytes).await;
1171 }
1172 }
1173
1174 async fn handle_request_message(&self, from_peer: &str, req: crate::protocol::DataRequest) {
1175 let hash = match crate::protocol::bytes_to_hash(&req.h) {
1176 Some(h) => h,
1177 None => return,
1178 };
1179 let hash_key = hash_to_key(&hash);
1180
1181 {
1182 let selector = self.peer_selector.read().await;
1183 if self.should_refuse_requests_from_peer(&selector, from_peer) {
1184 if self.debug {
1185 println!(
1186 "[GenericStore] Refusing request from delinquent peer {}",
1187 from_peer
1188 );
1189 }
1190 return;
1191 }
1192 }
1193
1194 if let Some(quote_id) = req.q {
1195 if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
1196 if self.debug {
1197 println!(
1198 "[GenericStore] Refusing request with invalid or expired quote {} from {}",
1199 quote_id, from_peer
1200 );
1201 }
1202 return;
1203 }
1204 }
1205
1206 if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
1208 if self.should_drop_response(&hash) {
1209 if self.debug {
1210 println!(
1211 "[GenericStore] Dropping response for {} due to actor profile",
1212 hash_to_key(&hash)
1213 );
1214 }
1215 return;
1216 }
1217
1218 let behavior = self.response_behavior();
1219 if behavior.extra_delay_ms > 0 {
1220 tokio::time::sleep(Duration::from_millis(behavior.extra_delay_ms)).await;
1221 }
1222
1223 if self.should_corrupt_response(&hash) {
1224 if data.is_empty() {
1225 data.push(0x80);
1226 } else {
1227 data[0] ^= 0x80;
1228 }
1229 }
1230
1231 let res = create_response(&hash, data);
1233 let response_bytes = encode_response(&res);
1234 if let Some(channel) = self.signaling.get_channel(from_peer).await {
1235 let _ = channel.send(response_bytes).await;
1236 }
1237 }
1238 }
1240
1241 pub async fn handle_data_message(&self, from_peer: &str, data: &[u8]) {
1243 let parsed = match parse_message(data) {
1244 Some(m) => m,
1245 None => return,
1246 };
1247
1248 match parsed {
1249 DataMessage::Request(req) => {
1250 self.handle_request_message(from_peer, req).await;
1251 }
1252 DataMessage::Response(res) => {
1253 self.handle_response_message(from_peer, res).await;
1254 }
1255 DataMessage::QuoteRequest(req) => {
1256 self.handle_quote_request_message(from_peer, req).await;
1257 }
1258 DataMessage::QuoteResponse(res) => {
1259 self.handle_quote_response_message(from_peer, res).await;
1260 }
1261 }
1262 }
1263}
1264
1265#[async_trait]
1266impl<S, R, F> Store for GenericStore<S, R, F>
1267where
1268 S: Store + Send + Sync + 'static,
1269 R: SignalingTransport + Send + Sync + 'static,
1270 F: PeerLinkFactory + Send + Sync + 'static,
1271{
1272 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
1273 self.local_store.put(hash, data).await
1274 }
1275
1276 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
1277 if let Some(data) = self.local_store.get(hash).await? {
1279 return Ok(Some(data));
1280 }
1281
1282 Ok(self.request_from_peers(hash).await)
1284 }
1285
1286 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
1287 self.local_store.has(hash).await
1288 }
1289
1290 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
1291 self.local_store.delete(hash).await
1292 }
1293}
1294
1295#[cfg(test)]
1296mod tests {
1297 use super::*;
1298 use hashtree_core::MemoryStore;
1299 use std::sync::Arc;
1300 use std::sync::OnceLock;
1301 use std::time::Duration;
1302
1303 type TestStore = GenericStore<
1304 MemoryStore,
1305 crate::mock::MockRelayTransport,
1306 crate::mock::MockConnectionFactory,
1307 >;
1308
1309 struct TestNode {
1310 store: Arc<TestStore>,
1311 local_store: Arc<MemoryStore>,
1312 transport: Arc<crate::mock::MockRelayTransport>,
1313 }
1314
1315 fn mock_network_lock() -> &'static tokio::sync::Mutex<()> {
1316 static LOCK: OnceLock<tokio::sync::Mutex<()>> = OnceLock::new();
1317 LOCK.get_or_init(|| tokio::sync::Mutex::new(()))
1318 }
1319
1320 fn make_test_store(local_store: Arc<MemoryStore>, node_id: &str) -> TestStore {
1321 make_test_store_with_routing(local_store, node_id, GenericStoreRoutingConfig::default())
1322 }
1323
1324 fn make_test_store_with_routing(
1325 local_store: Arc<MemoryStore>,
1326 node_id: &str,
1327 routing: GenericStoreRoutingConfig,
1328 ) -> TestStore {
1329 let relay = crate::mock::MockRelay::new();
1330 let transport = Arc::new(relay.create_transport(node_id.to_string(), node_id.to_string()));
1331 let conn_factory = Arc::new(crate::mock::MockConnectionFactory::new(
1332 node_id.to_string(),
1333 0,
1334 ));
1335 let signaling = Arc::new(crate::signaling::MeshRouter::new(
1336 node_id.to_string(),
1337 node_id.to_string(),
1338 transport,
1339 conn_factory,
1340 crate::types::PoolSettings::default(),
1341 false,
1342 ));
1343
1344 TestStore::new_with_routing(
1345 local_store,
1346 signaling,
1347 Duration::from_millis(200),
1348 false,
1349 routing,
1350 )
1351 }
1352
1353 fn make_shared_test_node(
1354 relay: Arc<crate::mock::MockRelay>,
1355 node_id: &str,
1356 routing: GenericStoreRoutingConfig,
1357 ) -> TestNode {
1358 let transport = Arc::new(relay.create_transport(node_id.to_string(), node_id.to_string()));
1359 let conn_factory = Arc::new(crate::mock::MockConnectionFactory::new(
1360 node_id.to_string(),
1361 0,
1362 ));
1363 let signaling = Arc::new(crate::signaling::MeshRouter::new(
1364 node_id.to_string(),
1365 node_id.to_string(),
1366 transport.clone(),
1367 conn_factory,
1368 crate::types::PoolSettings::default(),
1369 false,
1370 ));
1371 let local_store = Arc::new(MemoryStore::new());
1372 let store = Arc::new(TestStore::new_with_routing(
1373 local_store.clone(),
1374 signaling,
1375 Duration::from_millis(120),
1376 false,
1377 routing,
1378 ));
1379
1380 TestNode {
1381 store,
1382 local_store,
1383 transport,
1384 }
1385 }
1386
1387 async fn pump_test_signaling(nodes: &[&TestNode]) -> usize {
1388 let mut processed = 0usize;
1389 for node in nodes {
1390 while let Some(msg) = node.transport.try_recv() {
1391 node.store
1392 .process_signaling(msg)
1393 .await
1394 .expect("process signaling");
1395 processed += 1;
1396 }
1397 }
1398 processed
1399 }
1400
1401 async fn pump_test_data(nodes: &[&TestNode]) -> usize {
1402 let mut processed = 0usize;
1403 for node in nodes {
1404 let peer_ids = node.store.signaling().peer_ids().await;
1405 for peer_id in peer_ids {
1406 let Some(channel) = node.store.signaling().get_channel(&peer_id).await else {
1407 continue;
1408 };
1409 while let Some(data) = channel.try_recv() {
1410 node.store.handle_data_message(&peer_id, &data).await;
1411 processed += 1;
1412 }
1413 }
1414 }
1415 processed
1416 }
1417
1418 async fn pump_test_network(nodes: &[&TestNode], max_steps: usize) {
1419 for _ in 0..max_steps {
1420 let signaling = pump_test_signaling(nodes).await;
1421 let data = pump_test_data(nodes).await;
1422 if signaling + data == 0 {
1423 tokio::task::yield_now().await;
1424 }
1425 }
1426 }
1427
1428 async fn run_get_with_pumps(
1429 requester: Arc<TestStore>,
1430 hash: Hash,
1431 nodes: &[&TestNode],
1432 ) -> Option<Vec<u8>> {
1433 let task = tokio::spawn(async move { requester.get(&hash).await.ok().flatten() });
1434 let started = Instant::now();
1435
1436 loop {
1437 if task.is_finished() {
1438 return task.await.expect("request task join");
1439 }
1440
1441 if started.elapsed() > Duration::from_secs(1) {
1442 task.abort();
1443 return None;
1444 }
1445
1446 pump_test_network(nodes, 4).await;
1447 }
1448 }
1449
1450 async fn run_bad_peer_series(strategy: SelectionStrategy) -> usize {
1451 let _guard = mock_network_lock().lock().await;
1452 crate::mock::clear_channel_registry().await;
1453
1454 let relay = crate::mock::MockRelay::new();
1455 let requester = make_shared_test_node(
1456 relay.clone(),
1457 "requester-reject",
1458 GenericStoreRoutingConfig {
1459 selection_strategy: strategy,
1460 fairness_enabled: false,
1461 dispatch: RequestDispatchConfig {
1462 initial_fanout: 1,
1463 hedge_fanout: 1,
1464 max_fanout: 1,
1465 hedge_interval_ms: 5,
1466 },
1467 ..Default::default()
1468 },
1469 );
1470 let bad = make_shared_test_node(
1471 relay.clone(),
1472 "a-bad",
1473 GenericStoreRoutingConfig {
1474 response_behavior: ResponseBehaviorConfig {
1475 drop_response_prob: 1.0,
1476 ..Default::default()
1477 },
1478 ..Default::default()
1479 },
1480 );
1481 let honest = make_shared_test_node(relay, "b-honest", GenericStoreRoutingConfig::default());
1482 let nodes = [&requester, &bad, &honest];
1483
1484 for node in &nodes {
1485 node.transport
1486 .connect(&[])
1487 .await
1488 .expect("connect transport");
1489 node.store.start().await.expect("start store");
1490 }
1491 pump_test_network(&nodes, 24).await;
1492
1493 let mut successes = 0usize;
1494 for round in 0..6 {
1495 let payload = format!("payload-{round}").into_bytes();
1496 let hash = hashtree_core::sha256(&payload);
1497 let _ = bad.local_store.put(hash, payload.clone()).await;
1498 let _ = honest.local_store.put(hash, payload.clone()).await;
1499
1500 let result = run_get_with_pumps(requester.store.clone(), hash, &nodes).await;
1501 if result.as_ref() == Some(&payload) {
1502 successes += 1;
1503 }
1504 }
1505
1506 crate::mock::clear_channel_registry().await;
1507 successes
1508 }
1509
1510 #[test]
1511 fn test_hedged_wave_plan_flood_all() {
1512 let plan = build_hedged_wave_plan(7, RequestDispatchConfig::default());
1513 assert_eq!(plan, vec![7]);
1514 }
1515
1516 #[test]
1517 fn test_hedged_wave_plan_staged() {
1518 let plan = build_hedged_wave_plan(
1519 10,
1520 RequestDispatchConfig {
1521 initial_fanout: 2,
1522 hedge_fanout: 3,
1523 max_fanout: 8,
1524 hedge_interval_ms: 25,
1525 },
1526 );
1527 assert_eq!(plan, vec![2, 3, 3]);
1528 }
1529
1530 #[tokio::test]
1531 async fn test_run_hedged_waves_uses_zero_interval_as_immediate_hedge() {
1532 let waits = Arc::new(std::sync::Mutex::new(Vec::new()));
1533 let waits_clone = waits.clone();
1534
1535 let result = run_hedged_waves(
1536 3,
1537 RequestDispatchConfig {
1538 initial_fanout: 1,
1539 hedge_fanout: 1,
1540 max_fanout: 3,
1541 hedge_interval_ms: 0,
1542 },
1543 Duration::from_millis(25),
1544 |_range| async { 1 },
1545 move |wait| {
1546 let waits = waits_clone.clone();
1547 async move {
1548 waits.lock().expect("wait lock").push(wait);
1549 HedgedWaveAction::<()>::Continue
1550 }
1551 },
1552 )
1553 .await;
1554
1555 assert!(result.is_none());
1556 let waits = waits.lock().expect("wait lock");
1557 assert_eq!(waits.len(), 1);
1558 assert!(waits[0] <= Duration::from_millis(25));
1559 }
1560
1561 #[test]
1562 fn test_response_behavior_normalization_clamps_probs() {
1563 let raw = ResponseBehaviorConfig {
1564 drop_response_prob: -1.5,
1565 corrupt_response_prob: 9.0,
1566 extra_delay_ms: 12,
1567 };
1568 let normalized = raw.normalized();
1569 assert_eq!(normalized.drop_response_prob, 0.0);
1570 assert_eq!(normalized.corrupt_response_prob, 1.0);
1571 assert_eq!(normalized.extra_delay_ms, 12);
1572 }
1573
1574 #[test]
1575 fn test_actor_draw_is_deterministic_per_peer_hash_and_salt() {
1576 let hash = hashtree_core::sha256(b"deterministic");
1577 let a = TestStore::deterministic_actor_draw_for("peer-a", &hash, 7);
1578 let b = TestStore::deterministic_actor_draw_for("peer-a", &hash, 7);
1579 assert!((a - b).abs() < f64::EPSILON);
1580 }
1581
1582 #[tokio::test]
1583 async fn test_load_peer_metadata_returns_false_when_missing() {
1584 let local_store = Arc::new(MemoryStore::new());
1585 let store = make_test_store(local_store, "0");
1586 assert!(!store.load_peer_metadata().await.expect("load result"));
1587 }
1588
1589 #[tokio::test]
1590 async fn test_persist_and_load_peer_metadata_with_existing_store_adapter() {
1591 let local_store = Arc::new(MemoryStore::new());
1592 let writer = make_test_store(local_store.clone(), "0");
1593 {
1594 let mut selector = writer.peer_selector.write().await;
1595 selector.add_peer("npub1stable:session-a");
1596 selector.record_request("npub1stable:session-a", 64);
1597 selector.record_success("npub1stable:session-a", 35, 1024);
1598 selector.record_cashu_payment("npub1stable:session-a", 120);
1599 selector.record_cashu_receipt("npub1stable:session-a", 40);
1600 selector.record_cashu_payment_default("npub1stable:session-a");
1601 }
1602
1603 let snapshot_hash = writer
1604 .persist_peer_metadata()
1605 .await
1606 .expect("persist peer metadata");
1607 assert!(local_store
1608 .get(&snapshot_hash)
1609 .await
1610 .expect("snapshot lookup")
1611 .is_some());
1612
1613 let reader = make_test_store(local_store, "1");
1614 assert!(reader
1615 .load_peer_metadata()
1616 .await
1617 .expect("load peer metadata snapshot"));
1618
1619 let mut selector = reader.peer_selector.write().await;
1620 selector.add_peer("npub1stable:session-b");
1621 let stats = selector
1622 .get_stats("npub1stable:session-b")
1623 .expect("restored peer stats");
1624 assert_eq!(stats.requests_sent, 1);
1625 assert_eq!(stats.successes, 1);
1626 assert_eq!(stats.cashu_paid_sat, 120);
1627 assert_eq!(stats.cashu_received_sat, 40);
1628 assert_eq!(stats.cashu_payment_receipts, 1);
1629 assert_eq!(stats.cashu_payment_defaults, 1);
1630 }
1631
1632 #[tokio::test]
1633 async fn test_should_refuse_requests_from_peer_after_payment_defaults() {
1634 let local_store = Arc::new(MemoryStore::new());
1635 let store = make_test_store_with_routing(
1636 local_store,
1637 "0",
1638 GenericStoreRoutingConfig {
1639 cashu_payment_default_block_threshold: 1,
1640 ..Default::default()
1641 },
1642 );
1643 store.record_cashu_payment_default_from_peer("peer-a").await;
1644
1645 let selector = store.peer_selector.read().await;
1646 assert!(store.should_refuse_requests_from_peer(&selector, "peer-a"));
1647 assert!(!store.should_refuse_requests_from_peer(&selector, "peer-b"));
1648 }
1649
1650 #[tokio::test]
1651 async fn test_take_valid_quote_consumes_once_and_rejects_expired_quotes() {
1652 let local_store = Arc::new(MemoryStore::new());
1653 let store = make_test_store(local_store, "0");
1654 let hash = hashtree_core::sha256(b"quote-test");
1655 let hash_key = hash_to_key(&hash);
1656
1657 {
1658 let mut issued = store.issued_quotes.write().await;
1659 issued.insert(
1660 ("peer-a".to_string(), hash_key.clone(), 11),
1661 IssuedQuote {
1662 expires_at: Instant::now() + Duration::from_secs(1),
1663 payment_sat: 5,
1664 mint_url: Some("https://mint-a.example".to_string()),
1665 },
1666 );
1667 issued.insert(
1668 ("peer-a".to_string(), hash_key.clone(), 12),
1669 IssuedQuote {
1670 expires_at: Instant::now() - Duration::from_millis(1),
1671 payment_sat: 5,
1672 mint_url: Some("https://mint-a.example".to_string()),
1673 },
1674 );
1675 }
1676
1677 assert!(store.take_valid_quote("peer-a", &hash_key, 11).await);
1678 assert!(!store.take_valid_quote("peer-a", &hash_key, 11).await);
1679 assert!(!store.take_valid_quote("peer-a", &hash_key, 12).await);
1680 }
1681
1682 async fn run_quote_with_pumps(
1683 requester: Arc<TestStore>,
1684 hash: Hash,
1685 payment_sat: u64,
1686 quote_ttl: Duration,
1687 peer_ids: Vec<String>,
1688 nodes: &[&TestNode],
1689 ) -> Option<NegotiatedQuote> {
1690 let task = tokio::spawn(async move {
1691 requester
1692 .request_quote_from_peers(&hash, payment_sat, quote_ttl, &peer_ids)
1693 .await
1694 });
1695 let started = Instant::now();
1696
1697 loop {
1698 if task.is_finished() {
1699 return task.await.expect("quote task join");
1700 }
1701 if started.elapsed() > Duration::from_secs(1) {
1702 task.abort();
1703 return None;
1704 }
1705 pump_test_network(nodes, 4).await;
1706 }
1707 }
1708
1709 #[tokio::test]
1710 async fn test_request_quote_from_peers_rejects_unaccepted_mint() {
1711 let _guard = mock_network_lock().lock().await;
1712 crate::mock::clear_channel_registry().await;
1713
1714 let relay = crate::mock::MockRelay::new();
1715 let requester = make_shared_test_node(
1716 relay.clone(),
1717 "requester-reject",
1718 GenericStoreRoutingConfig {
1719 cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1720 cashu_default_mint: Some("https://mint-a.example".to_string()),
1721 dispatch: RequestDispatchConfig {
1722 initial_fanout: 1,
1723 hedge_fanout: 1,
1724 max_fanout: 1,
1725 hedge_interval_ms: 5,
1726 },
1727 ..Default::default()
1728 },
1729 );
1730 let provider = make_shared_test_node(
1731 relay,
1732 "provider-reject",
1733 GenericStoreRoutingConfig {
1734 cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1735 ..Default::default()
1736 },
1737 );
1738 let nodes = [&requester, &provider];
1739
1740 requester.transport.connect(&[]).await.expect("connect");
1741 provider.transport.connect(&[]).await.expect("connect");
1742 requester.store.start().await.expect("start");
1743 provider.store.start().await.expect("start");
1744 pump_test_network(&nodes, 24).await;
1745
1746 let payload = b"quoted-data".to_vec();
1747 let hash = hashtree_core::sha256(&payload);
1748 provider.local_store.put(hash, payload).await.expect("put");
1749
1750 let quote = run_quote_with_pumps(
1751 requester.store.clone(),
1752 hash,
1753 9,
1754 Duration::from_millis(80),
1755 vec!["provider-reject".to_string()],
1756 &nodes,
1757 )
1758 .await;
1759 assert!(
1760 quote.is_none(),
1761 "expected quote to be rejected on mint mismatch"
1762 );
1763 }
1764
1765 #[tokio::test]
1766 async fn test_request_quote_from_peers_accepts_small_peer_suggested_mint_under_cap() {
1767 let _guard = mock_network_lock().lock().await;
1768 crate::mock::clear_channel_registry().await;
1769
1770 let relay = crate::mock::MockRelay::new();
1771 let requester = make_shared_test_node(
1772 relay.clone(),
1773 "requester-suggested",
1774 GenericStoreRoutingConfig {
1775 cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1776 cashu_default_mint: Some("https://mint-a.example".to_string()),
1777 cashu_peer_suggested_mint_base_cap_sat: 3,
1778 cashu_peer_suggested_mint_max_cap_sat: 3,
1779 dispatch: RequestDispatchConfig {
1780 initial_fanout: 1,
1781 hedge_fanout: 1,
1782 max_fanout: 1,
1783 hedge_interval_ms: 5,
1784 },
1785 ..Default::default()
1786 },
1787 );
1788 let provider = make_shared_test_node(
1789 relay,
1790 "provider-suggested",
1791 GenericStoreRoutingConfig {
1792 cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1793 cashu_default_mint: Some("https://mint-b.example".to_string()),
1794 ..Default::default()
1795 },
1796 );
1797 let nodes = [&requester, &provider];
1798
1799 requester.transport.connect(&[]).await.expect("connect");
1800 provider.transport.connect(&[]).await.expect("connect");
1801 requester.store.start().await.expect("start");
1802 provider.store.start().await.expect("start");
1803 pump_test_network(&nodes, 24).await;
1804
1805 let payload = b"quoted-data".to_vec();
1806 let hash = hashtree_core::sha256(&payload);
1807 provider.local_store.put(hash, payload).await.expect("put");
1808
1809 let quote = run_quote_with_pumps(
1810 requester.store.clone(),
1811 hash,
1812 3,
1813 Duration::from_millis(80),
1814 vec!["provider-suggested".to_string()],
1815 &nodes,
1816 )
1817 .await
1818 .expect("expected bounded peer-suggested mint quote");
1819 assert_eq!(quote.mint_url.as_deref(), Some("https://mint-b.example"));
1820 }
1821
1822 #[tokio::test]
1823 async fn test_request_quote_from_peers_scales_peer_suggested_mint_cap_with_reputation() {
1824 let _guard = mock_network_lock().lock().await;
1825 crate::mock::clear_channel_registry().await;
1826
1827 let relay = crate::mock::MockRelay::new();
1828 let requester = make_shared_test_node(
1829 relay.clone(),
1830 "requester-reputation",
1831 GenericStoreRoutingConfig {
1832 cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1833 cashu_default_mint: Some("https://mint-a.example".to_string()),
1834 cashu_peer_suggested_mint_base_cap_sat: 1,
1835 cashu_peer_suggested_mint_success_step_sat: 1,
1836 cashu_peer_suggested_mint_receipt_step_sat: 2,
1837 cashu_peer_suggested_mint_max_cap_sat: 5,
1838 dispatch: RequestDispatchConfig {
1839 initial_fanout: 1,
1840 hedge_fanout: 1,
1841 max_fanout: 1,
1842 hedge_interval_ms: 5,
1843 },
1844 ..Default::default()
1845 },
1846 );
1847 let provider = make_shared_test_node(
1848 relay,
1849 "provider-reputation",
1850 GenericStoreRoutingConfig {
1851 cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1852 cashu_default_mint: Some("https://mint-b.example".to_string()),
1853 ..Default::default()
1854 },
1855 );
1856 let nodes = [&requester, &provider];
1857
1858 requester.transport.connect(&[]).await.expect("connect");
1859 provider.transport.connect(&[]).await.expect("connect");
1860 requester.store.start().await.expect("start");
1861 provider.store.start().await.expect("start");
1862 pump_test_network(&nodes, 24).await;
1863
1864 let payload = b"quoted-data".to_vec();
1865 let hash = hashtree_core::sha256(&payload);
1866 provider.local_store.put(hash, payload).await.expect("put");
1867
1868 let quote = run_quote_with_pumps(
1869 requester.store.clone(),
1870 hash,
1871 4,
1872 Duration::from_millis(80),
1873 vec!["provider-reputation".to_string()],
1874 &nodes,
1875 )
1876 .await;
1877 assert!(
1878 quote.is_none(),
1879 "new peer should not get a 4 sat untrusted-mint quote"
1880 );
1881
1882 {
1883 let mut selector = requester.store.peer_selector.write().await;
1884 selector.add_peer("provider-reputation");
1885 selector.record_success("provider-reputation", 20, 1024);
1886 selector.record_cashu_receipt("provider-reputation", 2);
1887 }
1888
1889 let quote = run_quote_with_pumps(
1890 requester.store.clone(),
1891 hash,
1892 4,
1893 Duration::from_millis(80),
1894 vec!["provider-reputation".to_string()],
1895 &nodes,
1896 )
1897 .await
1898 .expect("reputable peer should get larger bounded quote");
1899 assert_eq!(quote.mint_url.as_deref(), Some("https://mint-b.example"));
1900
1901 requester
1902 .store
1903 .record_cashu_payment_default_from_peer("provider-reputation")
1904 .await;
1905 let quote = run_quote_with_pumps(
1906 requester.store.clone(),
1907 hash,
1908 4,
1909 Duration::from_millis(80),
1910 vec!["provider-reputation".to_string()],
1911 &nodes,
1912 )
1913 .await;
1914 assert!(
1915 quote.is_none(),
1916 "peer-suggested mint exposure should drop to zero after defaults exceed receipts"
1917 );
1918 }
1919
1920 #[tokio::test]
1921 async fn test_request_quote_from_peers_returns_matching_mint() {
1922 let _guard = mock_network_lock().lock().await;
1923 crate::mock::clear_channel_registry().await;
1924
1925 let relay = crate::mock::MockRelay::new();
1926 let requester = make_shared_test_node(
1927 relay.clone(),
1928 "requester-match",
1929 GenericStoreRoutingConfig {
1930 cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1931 cashu_default_mint: Some("https://mint-a.example".to_string()),
1932 dispatch: RequestDispatchConfig {
1933 initial_fanout: 1,
1934 hedge_fanout: 1,
1935 max_fanout: 1,
1936 hedge_interval_ms: 5,
1937 },
1938 ..Default::default()
1939 },
1940 );
1941 let provider = make_shared_test_node(
1942 relay,
1943 "provider-match",
1944 GenericStoreRoutingConfig {
1945 cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1946 ..Default::default()
1947 },
1948 );
1949 let nodes = [&requester, &provider];
1950
1951 requester.transport.connect(&[]).await.expect("connect");
1952 provider.transport.connect(&[]).await.expect("connect");
1953 requester.store.start().await.expect("start");
1954 provider.store.start().await.expect("start");
1955 pump_test_network(&nodes, 24).await;
1956
1957 let payload = b"quoted-data".to_vec();
1958 let hash = hashtree_core::sha256(&payload);
1959 provider.local_store.put(hash, payload).await.expect("put");
1960
1961 let quote = run_quote_with_pumps(
1962 requester.store.clone(),
1963 hash,
1964 9,
1965 Duration::from_millis(80),
1966 vec!["provider-match".to_string()],
1967 &nodes,
1968 )
1969 .await
1970 .expect("expected quote");
1971 assert_eq!(quote.mint_url.as_deref(), Some("https://mint-a.example"));
1972 }
1973
1974 #[tokio::test]
1975 async fn test_tit_for_tat_store_path_recovers_after_bad_peer_observation() {
1976 let tit_for_tat_successes = run_bad_peer_series(SelectionStrategy::TitForTat).await;
1977
1978 assert!(
1979 tit_for_tat_successes >= 5,
1980 "expected tit-for-tat path to recover after the first consistently bad peer observation (successes={tit_for_tat_successes})"
1981 );
1982 }
1983}
1984
1985pub type SimStore<S> =
1987 GenericStore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
1988
1989pub type ProductionStore<S> = GenericStore<
1991 S,
1992 crate::nostr::NostrRelayTransport,
1993 crate::real_factory::RealPeerConnectionFactory,
1994>;