1use async_trait::async_trait;
8use std::collections::hash_map::DefaultHasher;
9use std::collections::{HashMap, HashSet};
10use std::future::Future;
11use std::hash::{Hash as _, Hasher};
12use std::ops::Range;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use tokio::sync::{oneshot, Mutex, RwLock};
16
17use hashtree_core::{Hash, Store, StoreError};
18
19use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
20use crate::protocol::{
21 create_quote_request, create_quote_response_available, create_quote_response_unavailable,
22 create_request, create_request_with_quote, create_response, encode_quote_request,
23 encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
24 DataMessage, DataQuoteRequest, DataQuoteResponse,
25};
26use crate::signaling::MeshRouter;
27use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
28use crate::types::{PeerHTLConfig, SignalingMessage, MAX_HTL};
29
30const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
33
34struct PendingRequest {
36 response_tx: oneshot::Sender<Option<Vec<u8>>>,
37 started_at: Instant,
38 queried_peers: Vec<String>,
39}
40
41struct PendingQuoteRequest {
42 response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
43 preferred_mint_url: Option<String>,
44 offered_payment_sat: u64,
45}
46
47#[derive(Debug, Clone)]
48struct NegotiatedQuote {
49 peer_id: String,
50 quote_id: u64,
51 #[allow(dead_code)]
52 mint_url: Option<String>,
53}
54
55struct IssuedQuote {
56 expires_at: Instant,
57 #[allow(dead_code)]
58 payment_sat: u64,
59 #[allow(dead_code)]
60 mint_url: Option<String>,
61}
62
63#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
65pub struct DataPumpStats {
66 pub processed: usize,
67 pub request_messages: usize,
68 pub response_messages: usize,
69 pub quote_request_messages: u64,
70 pub quote_response_messages: u64,
71 pub processed_bytes: u64,
72}
73
74#[derive(Debug, Clone, Copy)]
80pub struct RequestDispatchConfig {
81 pub initial_fanout: usize,
83 pub hedge_fanout: usize,
85 pub max_fanout: usize,
87 pub hedge_interval_ms: u64,
89}
90
91impl Default for RequestDispatchConfig {
92 fn default() -> Self {
93 Self {
94 initial_fanout: usize::MAX,
95 hedge_fanout: usize::MAX,
96 max_fanout: usize::MAX,
97 hedge_interval_ms: 0,
98 }
99 }
100}
101
102pub fn normalize_dispatch_config(
104 dispatch: RequestDispatchConfig,
105 available_peers: usize,
106) -> RequestDispatchConfig {
107 let mut cfg = dispatch;
108 let cap = if cfg.max_fanout == 0 {
109 available_peers
110 } else {
111 cfg.max_fanout.min(available_peers)
112 };
113 cfg.max_fanout = cap;
114 cfg.initial_fanout = if cfg.initial_fanout == 0 {
115 1
116 } else {
117 cfg.initial_fanout.min(cap.max(1))
118 };
119 cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
120 1
121 } else {
122 cfg.hedge_fanout.min(cap.max(1))
123 };
124 cfg
125}
126
127pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
129 if peer_count == 0 {
130 return Vec::new();
131 }
132 let cap = dispatch.max_fanout.min(peer_count);
133 if cap == 0 {
134 return Vec::new();
135 }
136
137 let mut plan = Vec::new();
138 let mut sent = 0usize;
139 let first = dispatch.initial_fanout.min(cap).max(1);
140 plan.push(first);
141 sent += first;
142
143 while sent < cap {
144 let next = dispatch.hedge_fanout.min(cap - sent).max(1);
145 plan.push(next);
146 sent += next;
147 }
148 plan
149}
150
151#[derive(Debug)]
153pub enum HedgedWaveAction<T> {
154 Continue,
155 Success(T),
156 Abort,
157}
158
159pub async fn run_hedged_waves<T, SendWave, SendWaveFut, WaitWave, WaitWaveFut>(
164 peer_count: usize,
165 dispatch: RequestDispatchConfig,
166 request_timeout: Duration,
167 mut send_wave: SendWave,
168 mut wait_wave: WaitWave,
169) -> Option<T>
170where
171 SendWave: FnMut(Range<usize>) -> SendWaveFut,
172 SendWaveFut: Future<Output = usize>,
173 WaitWave: FnMut(Duration) -> WaitWaveFut,
174 WaitWaveFut: Future<Output = HedgedWaveAction<T>>,
175{
176 let dispatch = normalize_dispatch_config(dispatch, peer_count);
177 let wave_plan = build_hedged_wave_plan(peer_count, dispatch);
178 if wave_plan.is_empty() {
179 return None;
180 }
181
182 let deadline = Instant::now() + request_timeout;
183 let mut sent_total = 0usize;
184 let mut next_peer_idx = 0usize;
185
186 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
187 let from = next_peer_idx;
188 let to = (next_peer_idx + wave_size).min(peer_count);
189 next_peer_idx = to;
190
191 if from == to {
192 continue;
193 }
194
195 sent_total += send_wave(from..to).await;
196 if sent_total == 0 {
197 if next_peer_idx >= peer_count {
198 break;
199 }
200 continue;
201 }
202
203 let now = Instant::now();
204 if now >= deadline {
205 break;
206 }
207 let remaining = deadline.saturating_duration_since(now);
208 let is_last_wave = wave_idx + 1 == wave_plan.len() || next_peer_idx >= peer_count;
209 let wait = if is_last_wave {
210 remaining
211 } else if dispatch.hedge_interval_ms == 0 {
212 Duration::ZERO
213 } else {
214 Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
215 };
216
217 if wait.is_zero() {
218 continue;
219 }
220
221 match wait_wave(wait).await {
222 HedgedWaveAction::Continue => {}
223 HedgedWaveAction::Success(value) => return Some(value),
224 HedgedWaveAction::Abort => break,
225 }
226 }
227
228 None
229}
230
231pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
233 let mut selector = selector.write().await;
234 let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
235 let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
236 for peer_id in known {
237 if !current.contains(peer_id.as_str()) {
238 selector.remove_peer(&peer_id);
239 }
240 }
241 for peer_id in current_peer_ids {
242 selector.add_peer(peer_id.clone());
243 }
244}
245
246#[derive(Debug, Clone, Copy)]
250pub struct ResponseBehaviorConfig {
251 pub drop_response_prob: f64,
253 pub corrupt_response_prob: f64,
255 pub extra_delay_ms: u64,
257}
258
259impl Default for ResponseBehaviorConfig {
260 fn default() -> Self {
261 Self {
262 drop_response_prob: 0.0,
263 corrupt_response_prob: 0.0,
264 extra_delay_ms: 0,
265 }
266 }
267}
268
269impl ResponseBehaviorConfig {
270 fn normalized(self) -> Self {
271 Self {
272 drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
273 corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
274 extra_delay_ms: self.extra_delay_ms,
275 }
276 }
277}
278
279#[derive(Debug, Clone)]
281pub struct MeshRoutingConfig {
282 pub selection_strategy: SelectionStrategy,
283 pub fairness_enabled: bool,
284 pub cashu_payment_weight: f64,
286 pub cashu_payment_default_block_threshold: u64,
289 pub cashu_accepted_mints: Vec<String>,
291 pub cashu_default_mint: Option<String>,
293 pub cashu_peer_suggested_mint_base_cap_sat: u64,
295 pub cashu_peer_suggested_mint_success_step_sat: u64,
297 pub cashu_peer_suggested_mint_receipt_step_sat: u64,
299 pub cashu_peer_suggested_mint_max_cap_sat: u64,
301 pub dispatch: RequestDispatchConfig,
302 pub response_behavior: ResponseBehaviorConfig,
303}
304
305impl Default for MeshRoutingConfig {
306 fn default() -> Self {
307 Self {
308 selection_strategy: SelectionStrategy::Weighted,
309 fairness_enabled: true,
310 cashu_payment_weight: 0.0,
311 cashu_payment_default_block_threshold: 0,
312 cashu_accepted_mints: Vec::new(),
313 cashu_default_mint: None,
314 cashu_peer_suggested_mint_base_cap_sat: 0,
315 cashu_peer_suggested_mint_success_step_sat: 0,
316 cashu_peer_suggested_mint_receipt_step_sat: 0,
317 cashu_peer_suggested_mint_max_cap_sat: 0,
318 dispatch: RequestDispatchConfig::default(),
319 response_behavior: ResponseBehaviorConfig::default(),
320 }
321 }
322}
323
324pub struct MeshStoreCore<S, R, F>
331where
332 S: Store + Send + Sync + 'static,
333 R: SignalingTransport + Send + Sync + 'static,
334 F: PeerLinkFactory + Send + Sync + 'static,
335{
336 local_store: Arc<S>,
338 signaling: Arc<MeshRouter<R, F>>,
340 htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
342 pending_requests: RwLock<HashMap<String, PendingRequest>>,
344 pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
346 issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
348 next_quote_id: RwLock<u64>,
350 peer_selector: RwLock<PeerSelector>,
352 routing: MeshRoutingConfig,
354 request_timeout: Duration,
356 debug: bool,
358 running: RwLock<bool>,
360}
361
362impl<S, R, F> MeshStoreCore<S, R, F>
363where
364 S: Store + Send + Sync + 'static,
365 R: SignalingTransport + Send + Sync + 'static,
366 F: PeerLinkFactory + Send + Sync + 'static,
367{
368 pub fn new(
370 local_store: Arc<S>,
371 signaling: Arc<MeshRouter<R, F>>,
372 request_timeout: Duration,
373 debug: bool,
374 ) -> Self {
375 Self::new_with_routing(
376 local_store,
377 signaling,
378 request_timeout,
379 debug,
380 Default::default(),
381 )
382 }
383
384 pub fn new_with_routing(
386 local_store: Arc<S>,
387 signaling: Arc<MeshRouter<R, F>>,
388 request_timeout: Duration,
389 debug: bool,
390 routing: MeshRoutingConfig,
391 ) -> Self {
392 let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
393 selector.set_fairness(routing.fairness_enabled);
394 selector.set_cashu_payment_weight(routing.cashu_payment_weight);
395 Self {
396 local_store,
397 signaling,
398 htl_configs: RwLock::new(HashMap::new()),
399 pending_requests: RwLock::new(HashMap::new()),
400 pending_quotes: RwLock::new(HashMap::new()),
401 issued_quotes: RwLock::new(HashMap::new()),
402 next_quote_id: RwLock::new(1),
403 peer_selector: RwLock::new(selector),
404 routing,
405 request_timeout,
406 debug,
407 running: RwLock::new(false),
408 }
409 }
410
411 pub async fn start(&self) -> Result<(), TransportError> {
413 *self.running.write().await = true;
414
415 self.signaling.send_hello(vec![]).await?;
417
418 Ok(())
419 }
420
421 pub async fn stop(&self) {
423 *self.running.write().await = false;
424 }
425
426 pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
428 let peer_id = msg.peer_id().to_string();
430 {
431 let mut configs = self.htl_configs.write().await;
432 if !configs.contains_key(&peer_id) {
433 configs.insert(peer_id.clone(), PeerHTLConfig::random());
434 }
435 }
436 self.peer_selector.write().await.add_peer(peer_id);
437
438 self.signaling.handle_message(msg).await
439 }
440
441 pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
443 &self.signaling
444 }
445
446 fn response_behavior(&self) -> ResponseBehaviorConfig {
447 self.routing.response_behavior.normalized()
448 }
449
450 fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
451 let mut hasher = DefaultHasher::new();
452 peer_id.hash(&mut hasher);
453 hash.hash(&mut hasher);
454 salt.hash(&mut hasher);
455 let v = hasher.finish();
456 (v as f64) / (u64::MAX as f64)
457 }
458
459 fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
460 Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
461 }
462
463 fn peer_metadata_pointer_slot_hash() -> Hash {
464 hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
465 }
466
467 fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
468 let bytes = hex::decode(hash_hex)
469 .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
470 if bytes.len() != 32 {
471 return Err(StoreError::Other(format!(
472 "Invalid hash length {}, expected 32 bytes",
473 bytes.len()
474 )));
475 }
476 let mut hash = [0u8; 32];
477 hash.copy_from_slice(&bytes);
478 Ok(hash)
479 }
480
481 fn should_drop_response(&self, hash: &Hash) -> bool {
482 let p = self.response_behavior().drop_response_prob;
483 if p <= 0.0 {
484 return false;
485 }
486 self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
487 }
488
489 fn should_corrupt_response(&self, hash: &Hash) -> bool {
490 let p = self.response_behavior().corrupt_response_prob;
491 if p <= 0.0 {
492 return false;
493 }
494 self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
495 }
496
497 async fn ordered_connected_peers(&self) -> Vec<String> {
498 let current_peer_ids = self.signaling.peer_ids().await;
499 if current_peer_ids.is_empty() {
500 return Vec::new();
501 }
502
503 sync_selector_peers(&self.peer_selector, ¤t_peer_ids).await;
504 let current_set: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
505 let mut ordered_peer_ids = self.peer_selector.write().await.select_peers();
506 ordered_peer_ids.retain(|peer_id| current_set.contains(peer_id.as_str()));
507 if ordered_peer_ids.is_empty() {
508 let mut fallback = current_peer_ids;
509 fallback.sort();
510 return fallback;
511 }
512 ordered_peer_ids
513 }
514
515 fn requested_quote_mint(&self) -> Option<&str> {
516 if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
517 if self.routing.cashu_accepted_mints.is_empty()
518 || self
519 .routing
520 .cashu_accepted_mints
521 .iter()
522 .any(|mint| mint == default_mint)
523 {
524 return Some(default_mint);
525 }
526 }
527
528 self.routing
529 .cashu_accepted_mints
530 .first()
531 .map(String::as_str)
532 }
533
534 fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
535 if let Some(requested_mint) = requested_mint {
536 if self.accepts_quote_mint(Some(requested_mint)) {
537 return Some(requested_mint.to_string());
538 }
539 }
540 if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
541 return Some(default_mint.clone());
542 }
543 if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
544 return Some(first_mint.clone());
545 }
546 requested_mint.map(str::to_string)
547 }
548
549 fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
550 if self.routing.cashu_accepted_mints.is_empty() {
551 return true;
552 }
553
554 let Some(mint_url) = mint_url else {
555 return false;
556 };
557 self.routing
558 .cashu_accepted_mints
559 .iter()
560 .any(|mint| mint == mint_url)
561 }
562
563 fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
564 let Some(mint_url) = mint_url else {
565 return self.routing.cashu_default_mint.is_none()
566 && self.routing.cashu_accepted_mints.is_empty();
567 };
568 self.routing.cashu_default_mint.as_deref() == Some(mint_url)
569 || self
570 .routing
571 .cashu_accepted_mints
572 .iter()
573 .any(|mint| mint == mint_url)
574 }
575
576 async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
577 let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
578 if base == 0 {
579 return 0;
580 }
581
582 let selector = self.peer_selector.read().await;
583 let Some(stats) = selector.get_stats(peer_id) else {
584 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
585 return if max_cap > 0 { base.min(max_cap) } else { base };
586 };
587
588 if stats.cashu_payment_defaults > 0
589 && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
590 {
591 return 0;
592 }
593
594 let success_bonus = stats
595 .successes
596 .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
597 let receipt_bonus = stats
598 .cashu_payment_receipts
599 .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
600 let mut cap = base
601 .saturating_add(success_bonus)
602 .saturating_add(receipt_bonus);
603 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
604 if max_cap > 0 {
605 cap = cap.min(max_cap);
606 }
607 cap
608 }
609
610 async fn should_accept_quote_response(
611 &self,
612 from_peer: &str,
613 preferred_mint_url: Option<&str>,
614 offered_payment_sat: u64,
615 res: &DataQuoteResponse,
616 ) -> bool {
617 let Some(payment_sat) = res.p else {
618 return false;
619 };
620 if payment_sat > offered_payment_sat {
621 return false;
622 }
623
624 let response_mint = res.m.as_deref();
625 if response_mint == preferred_mint_url {
626 return true;
627 }
628 if self.trusts_quote_mint(response_mint) {
629 return true;
630 }
631 if response_mint.is_none() {
632 return false;
633 }
634
635 payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
636 }
637
638 async fn issue_quote(
639 &self,
640 peer_id: &str,
641 hash_key: &str,
642 payment_sat: u64,
643 ttl_ms: u32,
644 mint_url: Option<&str>,
645 ) -> u64 {
646 let quote_id = {
647 let mut next = self.next_quote_id.write().await;
648 let quote_id = *next;
649 *next = next.saturating_add(1);
650 quote_id
651 };
652
653 let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
654 self.issued_quotes.write().await.insert(
655 (peer_id.to_string(), hash_key.to_string(), quote_id),
656 IssuedQuote {
657 expires_at,
658 payment_sat,
659 mint_url: mint_url.map(str::to_string),
660 },
661 );
662 quote_id
663 }
664
665 async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
666 let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
667 let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
668 return false;
669 };
670 quote.expires_at > Instant::now()
671 }
672
673 async fn send_request_to_peer(
674 &self,
675 peer_id: &str,
676 hash: &Hash,
677 quote_id: Option<u64>,
678 ) -> bool {
679 let channel = match self.signaling.get_channel(peer_id).await {
680 Some(c) => c,
681 None => return false,
682 };
683
684 let htl_config = {
685 let configs = self.htl_configs.read().await;
686 configs
687 .get(peer_id)
688 .cloned()
689 .unwrap_or_else(PeerHTLConfig::random)
690 };
691
692 let send_htl = htl_config.decrement(MAX_HTL);
693 let req = match quote_id {
694 Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
695 None => create_request(hash, send_htl),
696 };
697 let request_bytes = encode_request(&req);
698
699 {
700 let mut selector = self.peer_selector.write().await;
701 selector.record_request(peer_id, request_bytes.len() as u64);
702 }
703
704 match channel.send(request_bytes).await {
705 Ok(()) => true,
706 Err(_) => {
707 self.peer_selector.write().await.record_failure(peer_id);
708 false
709 }
710 }
711 }
712
713 async fn send_quote_request_to_peer(
714 &self,
715 peer_id: &str,
716 hash: &Hash,
717 payment_sat: u64,
718 ttl_ms: u32,
719 mint_url: Option<&str>,
720 ) -> bool {
721 let channel = match self.signaling.get_channel(peer_id).await {
722 Some(c) => c,
723 None => return false,
724 };
725
726 let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
727 let request_bytes = encode_quote_request(&req);
728
729 match channel.send(request_bytes).await {
730 Ok(()) => true,
731 Err(_) => false,
732 }
733 }
734
735 pub async fn peer_count(&self) -> usize {
737 self.signaling.peer_count().await
738 }
739
740 pub async fn needs_peers(&self) -> bool {
742 self.signaling.needs_peers().await
743 }
744
745 pub async fn send_hello(&self) -> Result<(), TransportError> {
747 self.signaling.send_hello(vec![]).await
748 }
749
750 pub async fn drain_available_data_messages(&self) -> DataPumpStats {
755 let mut stats = DataPumpStats::default();
756 let peer_ids = self.signaling.peer_ids().await;
757 for peer_id in peer_ids {
758 let Some(channel) = self.signaling.get_channel(&peer_id).await else {
759 continue;
760 };
761
762 while let Some(data) = channel.try_recv() {
763 stats.processed += 1;
764 stats.processed_bytes += data.len() as u64;
765 if let Some(msg) = parse_message(&data) {
766 match msg {
767 DataMessage::Request(_) => stats.request_messages += 1,
768 DataMessage::Response(_) => stats.response_messages += 1,
769 DataMessage::QuoteRequest(_) => stats.quote_request_messages += 1,
770 DataMessage::QuoteResponse(_) => stats.quote_response_messages += 1,
771 }
772 }
773 self.handle_data_message(&peer_id, &data).await;
774 }
775 }
776 stats
777 }
778
779 pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
781 self.peer_selector
782 .write()
783 .await
784 .record_cashu_payment(peer_id, amount_sat);
785 }
786
787 pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
789 self.peer_selector
790 .write()
791 .await
792 .record_cashu_receipt(peer_id, amount_sat);
793 }
794
795 pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
797 self.peer_selector
798 .write()
799 .await
800 .record_cashu_payment_default(peer_id);
801 }
802
803 pub async fn selector_summary(&self) -> crate::peer_selector::SelectorSummary {
805 self.peer_selector.read().await.summary()
806 }
807
808 fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
809 selector.is_peer_blocked_for_payment_defaults(
810 peer_id,
811 self.routing.cashu_payment_default_block_threshold,
812 )
813 }
814
815 pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
817 self.peer_selector
818 .read()
819 .await
820 .export_peer_metadata_snapshot()
821 }
822
823 pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
828 let snapshot = self
829 .peer_selector
830 .read()
831 .await
832 .export_peer_metadata_snapshot();
833 let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
834 StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
835 })?;
836 let snapshot_hash = hashtree_core::sha256(&bytes);
837 let _ = self.local_store.put(snapshot_hash, bytes).await?;
838
839 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
840 let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
841 let _ = self.local_store.delete(&pointer_slot).await?;
842 let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
843
844 Ok(snapshot_hash)
845 }
846
847 pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
849 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
850 let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
851 return Ok(false);
852 };
853 let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
854 StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
855 })?;
856 let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
857
858 let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
859 return Ok(false);
860 };
861 let snapshot: PeerMetadataSnapshot =
862 serde_json::from_slice(&snapshot_bytes).map_err(|e| {
863 StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
864 })?;
865 self.peer_selector
866 .write()
867 .await
868 .import_peer_metadata_snapshot(&snapshot);
869 Ok(true)
870 }
871
872 pub async fn get_with_quote(
877 &self,
878 hash: &Hash,
879 payment_sat: u64,
880 quote_ttl: Duration,
881 ) -> Result<Option<Vec<u8>>, StoreError> {
882 if let Some(data) = self.local_store.get(hash).await? {
883 return Ok(Some(data));
884 }
885 Ok(self
886 .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
887 .await)
888 }
889
890 async fn request_from_peers_with_quote(
891 &self,
892 hash: &Hash,
893 payment_sat: u64,
894 quote_ttl: Duration,
895 ) -> Option<Vec<u8>> {
896 let ordered_peer_ids = self.ordered_connected_peers().await;
897 if ordered_peer_ids.is_empty() {
898 return None;
899 }
900
901 if let Some(quote) = self
902 .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
903 .await
904 {
905 if let Some(data) = self
906 .request_from_single_peer(hash, "e.peer_id, Some(quote.quote_id))
907 .await
908 {
909 return Some(data);
910 }
911 }
912
913 self.request_from_ordered_peers(hash, &ordered_peer_ids)
914 .await
915 }
916
917 async fn request_quote_from_peers(
918 &self,
919 hash: &Hash,
920 payment_sat: u64,
921 quote_ttl: Duration,
922 ordered_peer_ids: &[String],
923 ) -> Option<NegotiatedQuote> {
924 if ordered_peer_ids.is_empty() {
925 return None;
926 }
927 let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
928 if ttl_ms == 0 {
929 return None;
930 }
931 let requested_mint = self.requested_quote_mint().map(str::to_string);
932
933 let hash_key = hash_to_key(hash);
934 let (tx, rx) = oneshot::channel();
935 self.pending_quotes.write().await.insert(
936 hash_key.clone(),
937 PendingQuoteRequest {
938 response_tx: tx,
939 preferred_mint_url: requested_mint.clone(),
940 offered_payment_sat: payment_sat,
941 },
942 );
943
944 let rx = Arc::new(Mutex::new(rx));
945 let result = run_hedged_waves(
946 ordered_peer_ids.len(),
947 self.routing.dispatch,
948 self.request_timeout,
949 |range| {
950 let wave_peer_ids = ordered_peer_ids[range].to_vec();
951 let requested_mint = requested_mint.clone();
952 let hash = *hash;
953 async move {
954 let mut sent = 0usize;
955 for peer_id in wave_peer_ids {
956 if self
957 .send_quote_request_to_peer(
958 &peer_id,
959 &hash,
960 payment_sat,
961 ttl_ms,
962 requested_mint.as_deref(),
963 )
964 .await
965 {
966 sent += 1;
967 }
968 }
969 sent
970 }
971 },
972 |wait| {
973 let rx = rx.clone();
974 async move {
975 let mut rx = rx.lock().await;
976 match tokio::time::timeout(wait, &mut *rx).await {
977 Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
978 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
979 Err(_) => HedgedWaveAction::Continue,
980 }
981 }
982 },
983 )
984 .await;
985 let _ = self.pending_quotes.write().await.remove(&hash_key);
986 result
987 }
988
989 async fn request_from_single_peer(
990 &self,
991 hash: &Hash,
992 peer_id: &str,
993 quote_id: Option<u64>,
994 ) -> Option<Vec<u8>> {
995 let hash_key = hash_to_key(hash);
996 let (tx, rx) = oneshot::channel();
997 self.pending_requests.write().await.insert(
998 hash_key.clone(),
999 PendingRequest {
1000 response_tx: tx,
1001 started_at: Instant::now(),
1002 queried_peers: vec![peer_id.to_string()],
1003 },
1004 );
1005
1006 let mut rx = rx;
1007 if !self.send_request_to_peer(peer_id, hash, quote_id).await {
1008 let _ = self.pending_requests.write().await.remove(&hash_key);
1009 return None;
1010 }
1011
1012 if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
1013 if hashtree_core::sha256(&data) == *hash {
1014 let _ = self.local_store.put(*hash, data.clone()).await;
1015 return Some(data);
1016 }
1017 }
1018
1019 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1020 for peer_id in pending.queried_peers {
1021 self.peer_selector.write().await.record_timeout(&peer_id);
1022 }
1023 }
1024 None
1025 }
1026
1027 async fn request_from_ordered_peers(
1028 &self,
1029 hash: &Hash,
1030 ordered_peer_ids: &[String],
1031 ) -> Option<Vec<u8>> {
1032 let hash_key = hash_to_key(hash);
1033 let (tx, rx) = oneshot::channel();
1034 self.pending_requests.write().await.insert(
1035 hash_key.clone(),
1036 PendingRequest {
1037 response_tx: tx,
1038 started_at: Instant::now(),
1039 queried_peers: Vec::new(),
1040 },
1041 );
1042
1043 let rx = Arc::new(Mutex::new(rx));
1044 let result = run_hedged_waves(
1045 ordered_peer_ids.len(),
1046 self.routing.dispatch,
1047 self.request_timeout,
1048 |range| {
1049 let wave_peer_ids = ordered_peer_ids[range].to_vec();
1050 let hash = *hash;
1051 let hash_key = hash_key.clone();
1052 async move {
1053 let mut sent = 0usize;
1054 for peer_id in wave_peer_ids {
1055 if self.send_request_to_peer(&peer_id, &hash, None).await {
1056 sent += 1;
1057 if let Some(pending) =
1058 self.pending_requests.write().await.get_mut(&hash_key)
1059 {
1060 pending.queried_peers.push(peer_id);
1061 }
1062 }
1063 }
1064 sent
1065 }
1066 },
1067 |wait| {
1068 let rx = rx.clone();
1069 async move {
1070 let mut rx = rx.lock().await;
1071 match tokio::time::timeout(wait, &mut *rx).await {
1072 Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == *hash => {
1073 HedgedWaveAction::Success(data)
1074 }
1075 Ok(Ok(Some(_))) => HedgedWaveAction::Continue,
1076 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1077 Err(_) => HedgedWaveAction::Continue,
1078 }
1079 }
1080 },
1081 )
1082 .await;
1083
1084 let Some(data) = result else {
1085 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1086 for peer_id in pending.queried_peers {
1087 self.peer_selector.write().await.record_timeout(&peer_id);
1088 }
1089 }
1090 return None;
1091 };
1092
1093 let _ = self.local_store.put(*hash, data.clone()).await;
1094 Some(data)
1095 }
1096
1097 async fn request_from_peers(&self, hash: &Hash) -> Option<Vec<u8>> {
1099 let ordered_peer_ids = self.ordered_connected_peers().await;
1100 if ordered_peer_ids.is_empty() {
1101 return None;
1102 }
1103 self.request_from_ordered_peers(hash, &ordered_peer_ids)
1104 .await
1105 }
1106
1107 async fn complete_pending_response(&self, from_peer: &str, hash_key: String, payload: Vec<u8>) {
1108 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1109 let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
1110 self.peer_selector.write().await.record_success(
1111 from_peer,
1112 rtt_ms,
1113 payload.len() as u64,
1114 );
1115 let _ = pending.response_tx.send(Some(payload));
1116 }
1117 }
1118
1119 async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
1120 if !res.a {
1121 return;
1122 }
1123
1124 let Some(quote_id) = res.q else {
1125 return;
1126 };
1127
1128 let hash_key = hash_to_key(&res.h);
1129 let (preferred_mint_url, offered_payment_sat) = {
1130 let pending_quotes = self.pending_quotes.read().await;
1131 let Some(pending) = pending_quotes.get(&hash_key) else {
1132 return;
1133 };
1134 (
1135 pending.preferred_mint_url.clone(),
1136 pending.offered_payment_sat,
1137 )
1138 };
1139 if !self
1140 .should_accept_quote_response(
1141 from_peer,
1142 preferred_mint_url.as_deref(),
1143 offered_payment_sat,
1144 &res,
1145 )
1146 .await
1147 {
1148 return;
1149 }
1150 let mut pending_quotes = self.pending_quotes.write().await;
1151 if let Some(pending) = pending_quotes.remove(&hash_key) {
1152 let _ = pending.response_tx.send(Some(NegotiatedQuote {
1153 peer_id: from_peer.to_string(),
1154 quote_id,
1155 mint_url: res.m,
1156 }));
1157 }
1158 }
1159
1160 async fn handle_response_message(&self, from_peer: &str, res: crate::protocol::DataResponse) {
1161 let hash_key = hash_to_key(&res.h);
1162 let hash = match crate::protocol::bytes_to_hash(&res.h) {
1163 Some(h) => h,
1164 None => return,
1165 };
1166
1167 if hashtree_core::sha256(&res.d) != hash {
1169 self.peer_selector.write().await.record_failure(from_peer);
1170 if self.debug {
1171 println!("[MeshStoreCore] Ignoring invalid response payload for {hash_key}");
1172 }
1173 return;
1174 }
1175
1176 self.complete_pending_response(from_peer, hash_key, res.d)
1177 .await;
1178 }
1179
1180 async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
1181 let hash = match crate::protocol::bytes_to_hash(&req.h) {
1182 Some(h) => h,
1183 None => return,
1184 };
1185 let hash_key = hash_to_key(&hash);
1186
1187 {
1188 let selector = self.peer_selector.read().await;
1189 if self.should_refuse_requests_from_peer(&selector, from_peer) {
1190 if self.debug {
1191 println!(
1192 "[MeshStoreCore] Refusing quote request from delinquent peer {}",
1193 from_peer
1194 );
1195 }
1196 return;
1197 }
1198 }
1199
1200 let chosen_mint = self.choose_quote_mint(req.m.as_deref());
1201 let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
1202 && !self.should_drop_response(&hash)
1203 && !self.should_corrupt_response(&hash);
1204
1205 let res = if can_serve {
1206 let quote_id = self
1207 .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
1208 .await;
1209 create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
1210 } else {
1211 create_quote_response_unavailable(&hash)
1212 };
1213 let response_bytes = encode_quote_response(&res);
1214 if let Some(channel) = self.signaling.get_channel(from_peer).await {
1215 let _ = channel.send(response_bytes).await;
1216 }
1217 }
1218
1219 async fn handle_request_message(&self, from_peer: &str, req: crate::protocol::DataRequest) {
1220 let hash = match crate::protocol::bytes_to_hash(&req.h) {
1221 Some(h) => h,
1222 None => return,
1223 };
1224 let hash_key = hash_to_key(&hash);
1225
1226 {
1227 let selector = self.peer_selector.read().await;
1228 if self.should_refuse_requests_from_peer(&selector, from_peer) {
1229 if self.debug {
1230 println!(
1231 "[MeshStoreCore] Refusing request from delinquent peer {}",
1232 from_peer
1233 );
1234 }
1235 return;
1236 }
1237 }
1238
1239 if let Some(quote_id) = req.q {
1240 if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
1241 if self.debug {
1242 println!(
1243 "[MeshStoreCore] Refusing request with invalid or expired quote {} from {}",
1244 quote_id, from_peer
1245 );
1246 }
1247 return;
1248 }
1249 }
1250
1251 if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
1253 if self.should_drop_response(&hash) {
1254 if self.debug {
1255 println!(
1256 "[MeshStoreCore] Dropping response for {} due to actor profile",
1257 hash_to_key(&hash)
1258 );
1259 }
1260 return;
1261 }
1262
1263 let behavior = self.response_behavior();
1264 if behavior.extra_delay_ms > 0 {
1265 tokio::time::sleep(Duration::from_millis(behavior.extra_delay_ms)).await;
1266 }
1267
1268 if self.should_corrupt_response(&hash) {
1269 if data.is_empty() {
1270 data.push(0x80);
1271 } else {
1272 data[0] ^= 0x80;
1273 }
1274 }
1275
1276 let res = create_response(&hash, data);
1278 let response_bytes = encode_response(&res);
1279 if let Some(channel) = self.signaling.get_channel(from_peer).await {
1280 let _ = channel.send(response_bytes).await;
1281 }
1282 }
1283 }
1285
1286 pub async fn handle_data_message(&self, from_peer: &str, data: &[u8]) {
1288 let parsed = match parse_message(data) {
1289 Some(m) => m,
1290 None => return,
1291 };
1292
1293 match parsed {
1294 DataMessage::Request(req) => {
1295 self.handle_request_message(from_peer, req).await;
1296 }
1297 DataMessage::Response(res) => {
1298 self.handle_response_message(from_peer, res).await;
1299 }
1300 DataMessage::QuoteRequest(req) => {
1301 self.handle_quote_request_message(from_peer, req).await;
1302 }
1303 DataMessage::QuoteResponse(res) => {
1304 self.handle_quote_response_message(from_peer, res).await;
1305 }
1306 }
1307 }
1308}
1309
1310#[async_trait]
1311impl<S, R, F> Store for MeshStoreCore<S, R, F>
1312where
1313 S: Store + Send + Sync + 'static,
1314 R: SignalingTransport + Send + Sync + 'static,
1315 F: PeerLinkFactory + Send + Sync + 'static,
1316{
1317 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
1318 self.local_store.put(hash, data).await
1319 }
1320
1321 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
1322 if let Some(data) = self.local_store.get(hash).await? {
1324 return Ok(Some(data));
1325 }
1326
1327 Ok(self.request_from_peers(hash).await)
1329 }
1330
1331 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
1332 self.local_store.has(hash).await
1333 }
1334
1335 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
1336 self.local_store.delete(hash).await
1337 }
1338}
1339
1340#[cfg(test)]
1341mod tests {
1342 use super::*;
1343 use hashtree_core::MemoryStore;
1344 use std::sync::Arc;
1345 use std::sync::OnceLock;
1346 use std::time::Duration;
1347
1348 type TestStore = MeshStoreCore<
1349 MemoryStore,
1350 crate::mock::MockRelayTransport,
1351 crate::mock::MockConnectionFactory,
1352 >;
1353
1354 struct TestNode {
1355 store: Arc<TestStore>,
1356 local_store: Arc<MemoryStore>,
1357 transport: Arc<crate::mock::MockRelayTransport>,
1358 }
1359
1360 fn mock_network_lock() -> &'static tokio::sync::Mutex<()> {
1361 static LOCK: OnceLock<tokio::sync::Mutex<()>> = OnceLock::new();
1362 LOCK.get_or_init(|| tokio::sync::Mutex::new(()))
1363 }
1364
1365 fn make_test_store(local_store: Arc<MemoryStore>, node_id: &str) -> TestStore {
1366 make_test_store_with_routing(local_store, node_id, MeshRoutingConfig::default())
1367 }
1368
1369 fn make_test_store_with_routing(
1370 local_store: Arc<MemoryStore>,
1371 node_id: &str,
1372 routing: MeshRoutingConfig,
1373 ) -> TestStore {
1374 let relay = crate::mock::MockRelay::new();
1375 let transport = Arc::new(relay.create_transport(node_id.to_string()));
1376 let conn_factory = Arc::new(crate::mock::MockConnectionFactory::new(
1377 node_id.to_string(),
1378 0,
1379 ));
1380 let signaling = Arc::new(crate::signaling::MeshRouter::new(
1381 node_id.to_string(),
1382 transport,
1383 conn_factory,
1384 crate::types::PoolSettings::default(),
1385 false,
1386 ));
1387
1388 TestStore::new_with_routing(
1389 local_store,
1390 signaling,
1391 Duration::from_millis(200),
1392 false,
1393 routing,
1394 )
1395 }
1396
1397 fn make_shared_test_node(
1398 relay: Arc<crate::mock::MockRelay>,
1399 node_id: &str,
1400 routing: MeshRoutingConfig,
1401 ) -> TestNode {
1402 let transport = Arc::new(relay.create_transport(node_id.to_string()));
1403 let conn_factory = Arc::new(crate::mock::MockConnectionFactory::new(
1404 node_id.to_string(),
1405 0,
1406 ));
1407 let signaling = Arc::new(crate::signaling::MeshRouter::new(
1408 node_id.to_string(),
1409 transport.clone(),
1410 conn_factory,
1411 crate::types::PoolSettings::default(),
1412 false,
1413 ));
1414 let local_store = Arc::new(MemoryStore::new());
1415 let store = Arc::new(TestStore::new_with_routing(
1416 local_store.clone(),
1417 signaling,
1418 Duration::from_millis(120),
1419 false,
1420 routing,
1421 ));
1422
1423 TestNode {
1424 store,
1425 local_store,
1426 transport,
1427 }
1428 }
1429
1430 async fn pump_test_signaling(nodes: &[&TestNode]) -> usize {
1431 let mut processed = 0usize;
1432 for node in nodes {
1433 while let Some(msg) = node.transport.try_recv() {
1434 node.store
1435 .process_signaling(msg)
1436 .await
1437 .expect("process signaling");
1438 processed += 1;
1439 }
1440 }
1441 processed
1442 }
1443
1444 async fn pump_test_data(nodes: &[&TestNode]) -> usize {
1445 let mut processed = 0usize;
1446 for node in nodes {
1447 let peer_ids = node.store.signaling().peer_ids().await;
1448 for peer_id in peer_ids {
1449 let Some(channel) = node.store.signaling().get_channel(&peer_id).await else {
1450 continue;
1451 };
1452 while let Some(data) = channel.try_recv() {
1453 node.store.handle_data_message(&peer_id, &data).await;
1454 processed += 1;
1455 }
1456 }
1457 }
1458 processed
1459 }
1460
1461 async fn pump_test_network(nodes: &[&TestNode], max_steps: usize) {
1462 for _ in 0..max_steps {
1463 let signaling = pump_test_signaling(nodes).await;
1464 let data = pump_test_data(nodes).await;
1465 if signaling + data == 0 {
1466 tokio::task::yield_now().await;
1467 }
1468 }
1469 }
1470
1471 async fn run_get_with_pumps(
1472 requester: Arc<TestStore>,
1473 hash: Hash,
1474 nodes: &[&TestNode],
1475 ) -> Option<Vec<u8>> {
1476 let task = tokio::spawn(async move { requester.get(&hash).await.ok().flatten() });
1477 let started = Instant::now();
1478
1479 loop {
1480 if task.is_finished() {
1481 return task.await.expect("request task join");
1482 }
1483
1484 if started.elapsed() > Duration::from_secs(1) {
1485 task.abort();
1486 return None;
1487 }
1488
1489 pump_test_network(nodes, 4).await;
1490 }
1491 }
1492
1493 async fn run_bad_peer_series(strategy: SelectionStrategy) -> usize {
1494 let _guard = mock_network_lock().lock().await;
1495 crate::mock::clear_channel_registry().await;
1496
1497 let relay = crate::mock::MockRelay::new();
1498 let requester = make_shared_test_node(
1499 relay.clone(),
1500 "requester-reject",
1501 MeshRoutingConfig {
1502 selection_strategy: strategy,
1503 fairness_enabled: false,
1504 dispatch: RequestDispatchConfig {
1505 initial_fanout: 1,
1506 hedge_fanout: 1,
1507 max_fanout: 1,
1508 hedge_interval_ms: 5,
1509 },
1510 ..Default::default()
1511 },
1512 );
1513 let bad = make_shared_test_node(
1514 relay.clone(),
1515 "a-bad",
1516 MeshRoutingConfig {
1517 response_behavior: ResponseBehaviorConfig {
1518 drop_response_prob: 1.0,
1519 ..Default::default()
1520 },
1521 ..Default::default()
1522 },
1523 );
1524 let honest = make_shared_test_node(relay, "b-honest", MeshRoutingConfig::default());
1525 let nodes = [&requester, &bad, &honest];
1526
1527 for node in &nodes {
1528 node.transport
1529 .connect(&[])
1530 .await
1531 .expect("connect transport");
1532 node.store.start().await.expect("start store");
1533 }
1534 pump_test_network(&nodes, 24).await;
1535
1536 let mut successes = 0usize;
1537 for round in 0..6 {
1538 let payload = format!("payload-{round}").into_bytes();
1539 let hash = hashtree_core::sha256(&payload);
1540 let _ = bad.local_store.put(hash, payload.clone()).await;
1541 let _ = honest.local_store.put(hash, payload.clone()).await;
1542
1543 let result = run_get_with_pumps(requester.store.clone(), hash, &nodes).await;
1544 if result.as_ref() == Some(&payload) {
1545 successes += 1;
1546 }
1547 }
1548
1549 crate::mock::clear_channel_registry().await;
1550 successes
1551 }
1552
1553 #[test]
1554 fn test_hedged_wave_plan_flood_all() {
1555 let plan = build_hedged_wave_plan(7, RequestDispatchConfig::default());
1556 assert_eq!(plan, vec![7]);
1557 }
1558
1559 #[test]
1560 fn test_hedged_wave_plan_staged() {
1561 let plan = build_hedged_wave_plan(
1562 10,
1563 RequestDispatchConfig {
1564 initial_fanout: 2,
1565 hedge_fanout: 3,
1566 max_fanout: 8,
1567 hedge_interval_ms: 25,
1568 },
1569 );
1570 assert_eq!(plan, vec![2, 3, 3]);
1571 }
1572
1573 #[tokio::test]
1574 async fn test_run_hedged_waves_uses_zero_interval_as_immediate_hedge() {
1575 let waits = Arc::new(std::sync::Mutex::new(Vec::new()));
1576 let waits_clone = waits.clone();
1577
1578 let result = run_hedged_waves(
1579 3,
1580 RequestDispatchConfig {
1581 initial_fanout: 1,
1582 hedge_fanout: 1,
1583 max_fanout: 3,
1584 hedge_interval_ms: 0,
1585 },
1586 Duration::from_millis(25),
1587 |_range| async { 1 },
1588 move |wait| {
1589 let waits = waits_clone.clone();
1590 async move {
1591 waits.lock().expect("wait lock").push(wait);
1592 HedgedWaveAction::<()>::Continue
1593 }
1594 },
1595 )
1596 .await;
1597
1598 assert!(result.is_none());
1599 let waits = waits.lock().expect("wait lock");
1600 assert_eq!(waits.len(), 1);
1601 assert!(waits[0] <= Duration::from_millis(25));
1602 }
1603
1604 #[test]
1605 fn test_response_behavior_normalization_clamps_probs() {
1606 let raw = ResponseBehaviorConfig {
1607 drop_response_prob: -1.5,
1608 corrupt_response_prob: 9.0,
1609 extra_delay_ms: 12,
1610 };
1611 let normalized = raw.normalized();
1612 assert_eq!(normalized.drop_response_prob, 0.0);
1613 assert_eq!(normalized.corrupt_response_prob, 1.0);
1614 assert_eq!(normalized.extra_delay_ms, 12);
1615 }
1616
1617 #[test]
1618 fn test_actor_draw_is_deterministic_per_peer_hash_and_salt() {
1619 let hash = hashtree_core::sha256(b"deterministic");
1620 let a = TestStore::deterministic_actor_draw_for("peer-a", &hash, 7);
1621 let b = TestStore::deterministic_actor_draw_for("peer-a", &hash, 7);
1622 assert!((a - b).abs() < f64::EPSILON);
1623 }
1624
1625 #[tokio::test]
1626 async fn test_load_peer_metadata_returns_false_when_missing() {
1627 let local_store = Arc::new(MemoryStore::new());
1628 let store = make_test_store(local_store, "0");
1629 assert!(!store.load_peer_metadata().await.expect("load result"));
1630 }
1631
1632 #[tokio::test]
1633 async fn test_persist_and_load_peer_metadata_with_existing_store_adapter() {
1634 let local_store = Arc::new(MemoryStore::new());
1635 let writer = make_test_store(local_store.clone(), "0");
1636 {
1637 let mut selector = writer.peer_selector.write().await;
1638 selector.add_peer("npub1stable");
1639 selector.record_request("npub1stable", 64);
1640 selector.record_success("npub1stable", 35, 1024);
1641 selector.record_cashu_payment("npub1stable", 120);
1642 selector.record_cashu_receipt("npub1stable", 40);
1643 selector.record_cashu_payment_default("npub1stable");
1644 }
1645
1646 let snapshot_hash = writer
1647 .persist_peer_metadata()
1648 .await
1649 .expect("persist peer metadata");
1650 assert!(local_store
1651 .get(&snapshot_hash)
1652 .await
1653 .expect("snapshot lookup")
1654 .is_some());
1655
1656 let reader = make_test_store(local_store, "1");
1657 assert!(reader
1658 .load_peer_metadata()
1659 .await
1660 .expect("load peer metadata snapshot"));
1661
1662 let mut selector = reader.peer_selector.write().await;
1663 selector.add_peer("npub1stable");
1664 let stats = selector
1665 .get_stats("npub1stable")
1666 .expect("restored peer stats");
1667 assert_eq!(stats.requests_sent, 1);
1668 assert_eq!(stats.successes, 1);
1669 assert_eq!(stats.cashu_paid_sat, 120);
1670 assert_eq!(stats.cashu_received_sat, 40);
1671 assert_eq!(stats.cashu_payment_receipts, 1);
1672 assert_eq!(stats.cashu_payment_defaults, 1);
1673 }
1674
1675 #[tokio::test]
1676 async fn test_should_refuse_requests_from_peer_after_payment_defaults() {
1677 let local_store = Arc::new(MemoryStore::new());
1678 let store = make_test_store_with_routing(
1679 local_store,
1680 "0",
1681 MeshRoutingConfig {
1682 cashu_payment_default_block_threshold: 1,
1683 ..Default::default()
1684 },
1685 );
1686 store.record_cashu_payment_default_from_peer("peer-a").await;
1687
1688 let selector = store.peer_selector.read().await;
1689 assert!(store.should_refuse_requests_from_peer(&selector, "peer-a"));
1690 assert!(!store.should_refuse_requests_from_peer(&selector, "peer-b"));
1691 }
1692
1693 #[tokio::test]
1694 async fn test_take_valid_quote_consumes_once_and_rejects_expired_quotes() {
1695 let local_store = Arc::new(MemoryStore::new());
1696 let store = make_test_store(local_store, "0");
1697 let hash = hashtree_core::sha256(b"quote-test");
1698 let hash_key = hash_to_key(&hash);
1699
1700 {
1701 let mut issued = store.issued_quotes.write().await;
1702 issued.insert(
1703 ("peer-a".to_string(), hash_key.clone(), 11),
1704 IssuedQuote {
1705 expires_at: Instant::now() + Duration::from_secs(1),
1706 payment_sat: 5,
1707 mint_url: Some("https://mint-a.example".to_string()),
1708 },
1709 );
1710 issued.insert(
1711 ("peer-a".to_string(), hash_key.clone(), 12),
1712 IssuedQuote {
1713 expires_at: Instant::now() - Duration::from_millis(1),
1714 payment_sat: 5,
1715 mint_url: Some("https://mint-a.example".to_string()),
1716 },
1717 );
1718 }
1719
1720 assert!(store.take_valid_quote("peer-a", &hash_key, 11).await);
1721 assert!(!store.take_valid_quote("peer-a", &hash_key, 11).await);
1722 assert!(!store.take_valid_quote("peer-a", &hash_key, 12).await);
1723 }
1724
1725 async fn run_quote_with_pumps(
1726 requester: Arc<TestStore>,
1727 hash: Hash,
1728 payment_sat: u64,
1729 quote_ttl: Duration,
1730 peer_ids: Vec<String>,
1731 nodes: &[&TestNode],
1732 ) -> Option<NegotiatedQuote> {
1733 let task = tokio::spawn(async move {
1734 requester
1735 .request_quote_from_peers(&hash, payment_sat, quote_ttl, &peer_ids)
1736 .await
1737 });
1738 let started = Instant::now();
1739
1740 loop {
1741 if task.is_finished() {
1742 return task.await.expect("quote task join");
1743 }
1744 if started.elapsed() > Duration::from_secs(1) {
1745 task.abort();
1746 return None;
1747 }
1748 pump_test_network(nodes, 4).await;
1749 }
1750 }
1751
1752 #[tokio::test]
1753 async fn test_request_quote_from_peers_rejects_unaccepted_mint() {
1754 let _guard = mock_network_lock().lock().await;
1755 crate::mock::clear_channel_registry().await;
1756
1757 let relay = crate::mock::MockRelay::new();
1758 let requester = make_shared_test_node(
1759 relay.clone(),
1760 "requester-reject",
1761 MeshRoutingConfig {
1762 cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1763 cashu_default_mint: Some("https://mint-a.example".to_string()),
1764 dispatch: RequestDispatchConfig {
1765 initial_fanout: 1,
1766 hedge_fanout: 1,
1767 max_fanout: 1,
1768 hedge_interval_ms: 5,
1769 },
1770 ..Default::default()
1771 },
1772 );
1773 let provider = make_shared_test_node(
1774 relay,
1775 "provider-reject",
1776 MeshRoutingConfig {
1777 cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1778 ..Default::default()
1779 },
1780 );
1781 let nodes = [&requester, &provider];
1782
1783 requester.transport.connect(&[]).await.expect("connect");
1784 provider.transport.connect(&[]).await.expect("connect");
1785 requester.store.start().await.expect("start");
1786 provider.store.start().await.expect("start");
1787 pump_test_network(&nodes, 24).await;
1788
1789 let payload = b"quoted-data".to_vec();
1790 let hash = hashtree_core::sha256(&payload);
1791 provider.local_store.put(hash, payload).await.expect("put");
1792
1793 let quote = run_quote_with_pumps(
1794 requester.store.clone(),
1795 hash,
1796 9,
1797 Duration::from_millis(80),
1798 vec!["provider-reject".to_string()],
1799 &nodes,
1800 )
1801 .await;
1802 assert!(
1803 quote.is_none(),
1804 "expected quote to be rejected on mint mismatch"
1805 );
1806 }
1807
1808 #[tokio::test]
1809 async fn test_request_quote_from_peers_accepts_small_peer_suggested_mint_under_cap() {
1810 let _guard = mock_network_lock().lock().await;
1811 crate::mock::clear_channel_registry().await;
1812
1813 let relay = crate::mock::MockRelay::new();
1814 let requester = make_shared_test_node(
1815 relay.clone(),
1816 "requester-suggested",
1817 MeshRoutingConfig {
1818 cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1819 cashu_default_mint: Some("https://mint-a.example".to_string()),
1820 cashu_peer_suggested_mint_base_cap_sat: 3,
1821 cashu_peer_suggested_mint_max_cap_sat: 3,
1822 dispatch: RequestDispatchConfig {
1823 initial_fanout: 1,
1824 hedge_fanout: 1,
1825 max_fanout: 1,
1826 hedge_interval_ms: 5,
1827 },
1828 ..Default::default()
1829 },
1830 );
1831 let provider = make_shared_test_node(
1832 relay,
1833 "provider-suggested",
1834 MeshRoutingConfig {
1835 cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1836 cashu_default_mint: Some("https://mint-b.example".to_string()),
1837 ..Default::default()
1838 },
1839 );
1840 let nodes = [&requester, &provider];
1841
1842 requester.transport.connect(&[]).await.expect("connect");
1843 provider.transport.connect(&[]).await.expect("connect");
1844 requester.store.start().await.expect("start");
1845 provider.store.start().await.expect("start");
1846 pump_test_network(&nodes, 24).await;
1847
1848 let payload = b"quoted-data".to_vec();
1849 let hash = hashtree_core::sha256(&payload);
1850 provider.local_store.put(hash, payload).await.expect("put");
1851
1852 let quote = run_quote_with_pumps(
1853 requester.store.clone(),
1854 hash,
1855 3,
1856 Duration::from_millis(80),
1857 vec!["provider-suggested".to_string()],
1858 &nodes,
1859 )
1860 .await
1861 .expect("expected bounded peer-suggested mint quote");
1862 assert_eq!(quote.mint_url.as_deref(), Some("https://mint-b.example"));
1863 }
1864
1865 #[tokio::test]
1866 async fn test_request_quote_from_peers_scales_peer_suggested_mint_cap_with_reputation() {
1867 let _guard = mock_network_lock().lock().await;
1868 crate::mock::clear_channel_registry().await;
1869
1870 let relay = crate::mock::MockRelay::new();
1871 let requester = make_shared_test_node(
1872 relay.clone(),
1873 "requester-reputation",
1874 MeshRoutingConfig {
1875 cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1876 cashu_default_mint: Some("https://mint-a.example".to_string()),
1877 cashu_peer_suggested_mint_base_cap_sat: 1,
1878 cashu_peer_suggested_mint_success_step_sat: 1,
1879 cashu_peer_suggested_mint_receipt_step_sat: 2,
1880 cashu_peer_suggested_mint_max_cap_sat: 5,
1881 dispatch: RequestDispatchConfig {
1882 initial_fanout: 1,
1883 hedge_fanout: 1,
1884 max_fanout: 1,
1885 hedge_interval_ms: 5,
1886 },
1887 ..Default::default()
1888 },
1889 );
1890 let provider = make_shared_test_node(
1891 relay,
1892 "provider-reputation",
1893 MeshRoutingConfig {
1894 cashu_accepted_mints: vec!["https://mint-b.example".to_string()],
1895 cashu_default_mint: Some("https://mint-b.example".to_string()),
1896 ..Default::default()
1897 },
1898 );
1899 let nodes = [&requester, &provider];
1900
1901 requester.transport.connect(&[]).await.expect("connect");
1902 provider.transport.connect(&[]).await.expect("connect");
1903 requester.store.start().await.expect("start");
1904 provider.store.start().await.expect("start");
1905 pump_test_network(&nodes, 24).await;
1906
1907 let payload = b"quoted-data".to_vec();
1908 let hash = hashtree_core::sha256(&payload);
1909 provider.local_store.put(hash, payload).await.expect("put");
1910
1911 let quote = run_quote_with_pumps(
1912 requester.store.clone(),
1913 hash,
1914 4,
1915 Duration::from_millis(80),
1916 vec!["provider-reputation".to_string()],
1917 &nodes,
1918 )
1919 .await;
1920 assert!(
1921 quote.is_none(),
1922 "new peer should not get a 4 sat untrusted-mint quote"
1923 );
1924
1925 {
1926 let mut selector = requester.store.peer_selector.write().await;
1927 selector.add_peer("provider-reputation");
1928 selector.record_success("provider-reputation", 20, 1024);
1929 selector.record_cashu_receipt("provider-reputation", 2);
1930 }
1931
1932 let quote = run_quote_with_pumps(
1933 requester.store.clone(),
1934 hash,
1935 4,
1936 Duration::from_millis(80),
1937 vec!["provider-reputation".to_string()],
1938 &nodes,
1939 )
1940 .await
1941 .expect("reputable peer should get larger bounded quote");
1942 assert_eq!(quote.mint_url.as_deref(), Some("https://mint-b.example"));
1943
1944 requester
1945 .store
1946 .record_cashu_payment_default_from_peer("provider-reputation")
1947 .await;
1948 let quote = run_quote_with_pumps(
1949 requester.store.clone(),
1950 hash,
1951 4,
1952 Duration::from_millis(80),
1953 vec!["provider-reputation".to_string()],
1954 &nodes,
1955 )
1956 .await;
1957 assert!(
1958 quote.is_none(),
1959 "peer-suggested mint exposure should drop to zero after defaults exceed receipts"
1960 );
1961 }
1962
1963 #[tokio::test]
1964 async fn test_request_quote_from_peers_returns_matching_mint() {
1965 let _guard = mock_network_lock().lock().await;
1966 crate::mock::clear_channel_registry().await;
1967
1968 let relay = crate::mock::MockRelay::new();
1969 let requester = make_shared_test_node(
1970 relay.clone(),
1971 "requester-match",
1972 MeshRoutingConfig {
1973 cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1974 cashu_default_mint: Some("https://mint-a.example".to_string()),
1975 dispatch: RequestDispatchConfig {
1976 initial_fanout: 1,
1977 hedge_fanout: 1,
1978 max_fanout: 1,
1979 hedge_interval_ms: 5,
1980 },
1981 ..Default::default()
1982 },
1983 );
1984 let provider = make_shared_test_node(
1985 relay,
1986 "provider-match",
1987 MeshRoutingConfig {
1988 cashu_accepted_mints: vec!["https://mint-a.example".to_string()],
1989 ..Default::default()
1990 },
1991 );
1992 let nodes = [&requester, &provider];
1993
1994 requester.transport.connect(&[]).await.expect("connect");
1995 provider.transport.connect(&[]).await.expect("connect");
1996 requester.store.start().await.expect("start");
1997 provider.store.start().await.expect("start");
1998 pump_test_network(&nodes, 24).await;
1999
2000 let payload = b"quoted-data".to_vec();
2001 let hash = hashtree_core::sha256(&payload);
2002 provider.local_store.put(hash, payload).await.expect("put");
2003
2004 let quote = run_quote_with_pumps(
2005 requester.store.clone(),
2006 hash,
2007 9,
2008 Duration::from_millis(80),
2009 vec!["provider-match".to_string()],
2010 &nodes,
2011 )
2012 .await
2013 .expect("expected quote");
2014 assert_eq!(quote.mint_url.as_deref(), Some("https://mint-a.example"));
2015 }
2016
2017 #[tokio::test]
2018 async fn test_tit_for_tat_store_path_recovers_after_bad_peer_observation() {
2019 let tit_for_tat_successes = run_bad_peer_series(SelectionStrategy::TitForTat).await;
2020
2021 assert!(
2022 tit_for_tat_successes >= 5,
2023 "expected tit-for-tat path to recover after the first consistently bad peer observation (successes={tit_for_tat_successes})"
2024 );
2025 }
2026}
2027
2028pub type SimMeshStore<S> =
2030 MeshStoreCore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
2031
2032pub type ProductionMeshStore<S> =
2034 MeshStoreCore<S, crate::nostr::NostrRelayTransport, crate::real_factory::WebRtcPeerLinkFactory>;