1use async_trait::async_trait;
8use std::collections::hash_map::DefaultHasher;
9use std::collections::{HashMap, HashSet};
10use std::future::Future;
11use std::hash::{Hash as _, Hasher};
12use std::ops::Range;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use tokio::sync::{oneshot, Mutex, RwLock};
16
17use hashtree_core::{Hash, Store, StoreError};
18
19use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
20use crate::protocol::{
21 create_quote_request, create_quote_response_available, create_quote_response_unavailable,
22 create_request, create_request_with_quote, create_response, encode_quote_request,
23 encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
24 DataMessage, DataQuoteRequest, DataQuoteResponse,
25};
26use crate::signaling::MeshRouter;
27use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
28use crate::types::{PeerHTLConfig, SignalingMessage, MAX_HTL};
29
30const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
33
34struct PendingRequest {
36 response_tx: oneshot::Sender<Option<Vec<u8>>>,
37 started_at: Instant,
38 queried_peers: Vec<String>,
39}
40
41struct PendingQuoteRequest {
42 response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
43 preferred_mint_url: Option<String>,
44 offered_payment_sat: u64,
45}
46
47#[derive(Debug, Clone)]
48struct NegotiatedQuote {
49 peer_id: String,
50 quote_id: u64,
51 #[allow(dead_code)]
52 mint_url: Option<String>,
53}
54
55struct IssuedQuote {
56 expires_at: Instant,
57 #[allow(dead_code)]
58 payment_sat: u64,
59 #[allow(dead_code)]
60 mint_url: Option<String>,
61}
62
63#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
65pub struct DataPumpStats {
66 pub processed: usize,
67 pub request_messages: usize,
68 pub response_messages: usize,
69 pub quote_request_messages: u64,
70 pub quote_response_messages: u64,
71 pub processed_bytes: u64,
72}
73
74#[derive(Debug, Clone, Copy)]
80pub struct RequestDispatchConfig {
81 pub initial_fanout: usize,
83 pub hedge_fanout: usize,
85 pub max_fanout: usize,
87 pub hedge_interval_ms: u64,
89}
90
91impl Default for RequestDispatchConfig {
92 fn default() -> Self {
93 Self {
94 initial_fanout: usize::MAX,
95 hedge_fanout: usize::MAX,
96 max_fanout: usize::MAX,
97 hedge_interval_ms: 0,
98 }
99 }
100}
101
102pub fn normalize_dispatch_config(
104 dispatch: RequestDispatchConfig,
105 available_peers: usize,
106) -> RequestDispatchConfig {
107 let mut cfg = dispatch;
108 let cap = if cfg.max_fanout == 0 {
109 available_peers
110 } else {
111 cfg.max_fanout.min(available_peers)
112 };
113 cfg.max_fanout = cap;
114 cfg.initial_fanout = if cfg.initial_fanout == 0 {
115 1
116 } else {
117 cfg.initial_fanout.min(cap.max(1))
118 };
119 cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
120 1
121 } else {
122 cfg.hedge_fanout.min(cap.max(1))
123 };
124 cfg
125}
126
127pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
129 if peer_count == 0 {
130 return Vec::new();
131 }
132 let cap = dispatch.max_fanout.min(peer_count);
133 if cap == 0 {
134 return Vec::new();
135 }
136
137 let mut plan = Vec::new();
138 let mut sent = 0usize;
139 let first = dispatch.initial_fanout.min(cap).max(1);
140 plan.push(first);
141 sent += first;
142
143 while sent < cap {
144 let next = dispatch.hedge_fanout.min(cap - sent).max(1);
145 plan.push(next);
146 sent += next;
147 }
148 plan
149}
150
151#[derive(Debug)]
153pub enum HedgedWaveAction<T> {
154 Continue,
155 Success(T),
156 Abort,
157}
158
159pub async fn run_hedged_waves<T, SendWave, SendWaveFut, WaitWave, WaitWaveFut>(
164 peer_count: usize,
165 dispatch: RequestDispatchConfig,
166 request_timeout: Duration,
167 mut send_wave: SendWave,
168 mut wait_wave: WaitWave,
169) -> Option<T>
170where
171 SendWave: FnMut(Range<usize>) -> SendWaveFut,
172 SendWaveFut: Future<Output = usize>,
173 WaitWave: FnMut(Duration) -> WaitWaveFut,
174 WaitWaveFut: Future<Output = HedgedWaveAction<T>>,
175{
176 let dispatch = normalize_dispatch_config(dispatch, peer_count);
177 let wave_plan = build_hedged_wave_plan(peer_count, dispatch);
178 if wave_plan.is_empty() {
179 return None;
180 }
181
182 let deadline = Instant::now() + request_timeout;
183 let mut sent_total = 0usize;
184 let mut next_peer_idx = 0usize;
185
186 for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
187 let from = next_peer_idx;
188 let to = (next_peer_idx + wave_size).min(peer_count);
189 next_peer_idx = to;
190
191 if from == to {
192 continue;
193 }
194
195 sent_total += send_wave(from..to).await;
196 if sent_total == 0 {
197 if next_peer_idx >= peer_count {
198 break;
199 }
200 continue;
201 }
202
203 let now = Instant::now();
204 if now >= deadline {
205 break;
206 }
207 let remaining = deadline.saturating_duration_since(now);
208 let is_last_wave = wave_idx + 1 == wave_plan.len() || next_peer_idx >= peer_count;
209 let wait = if is_last_wave {
210 remaining
211 } else if dispatch.hedge_interval_ms == 0 {
212 Duration::ZERO
213 } else {
214 Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
215 };
216
217 if wait.is_zero() {
218 continue;
219 }
220
221 match wait_wave(wait).await {
222 HedgedWaveAction::Continue => {}
223 HedgedWaveAction::Success(value) => return Some(value),
224 HedgedWaveAction::Abort => break,
225 }
226 }
227
228 None
229}
230
231pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
233 let mut selector = selector.write().await;
234 let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
235 let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
236 for peer_id in known {
237 if !current.contains(peer_id.as_str()) {
238 selector.remove_peer(&peer_id);
239 }
240 }
241 for peer_id in current_peer_ids {
242 selector.add_peer(peer_id.clone());
243 }
244}
245
246#[derive(Debug, Clone, Copy)]
250pub struct ResponseBehaviorConfig {
251 pub drop_response_prob: f64,
253 pub corrupt_response_prob: f64,
255 pub extra_delay_ms: u64,
257}
258
259impl Default for ResponseBehaviorConfig {
260 fn default() -> Self {
261 Self {
262 drop_response_prob: 0.0,
263 corrupt_response_prob: 0.0,
264 extra_delay_ms: 0,
265 }
266 }
267}
268
269impl ResponseBehaviorConfig {
270 fn normalized(self) -> Self {
271 Self {
272 drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
273 corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
274 extra_delay_ms: self.extra_delay_ms,
275 }
276 }
277}
278
279#[derive(Debug, Clone)]
281pub struct MeshRoutingConfig {
282 pub selection_strategy: SelectionStrategy,
283 pub fairness_enabled: bool,
284 pub cashu_payment_weight: f64,
286 pub cashu_payment_default_block_threshold: u64,
289 pub cashu_accepted_mints: Vec<String>,
291 pub cashu_default_mint: Option<String>,
293 pub cashu_peer_suggested_mint_base_cap_sat: u64,
295 pub cashu_peer_suggested_mint_success_step_sat: u64,
297 pub cashu_peer_suggested_mint_receipt_step_sat: u64,
299 pub cashu_peer_suggested_mint_max_cap_sat: u64,
301 pub dispatch: RequestDispatchConfig,
302 pub response_behavior: ResponseBehaviorConfig,
303}
304
305impl Default for MeshRoutingConfig {
306 fn default() -> Self {
307 Self {
308 selection_strategy: SelectionStrategy::Weighted,
309 fairness_enabled: true,
310 cashu_payment_weight: 0.0,
311 cashu_payment_default_block_threshold: 0,
312 cashu_accepted_mints: Vec::new(),
313 cashu_default_mint: None,
314 cashu_peer_suggested_mint_base_cap_sat: 0,
315 cashu_peer_suggested_mint_success_step_sat: 0,
316 cashu_peer_suggested_mint_receipt_step_sat: 0,
317 cashu_peer_suggested_mint_max_cap_sat: 0,
318 dispatch: RequestDispatchConfig::default(),
319 response_behavior: ResponseBehaviorConfig::default(),
320 }
321 }
322}
323
324pub struct MeshStoreCore<S, R, F>
331where
332 S: Store + Send + Sync + 'static,
333 R: SignalingTransport + Send + Sync + 'static,
334 F: PeerLinkFactory + Send + Sync + 'static,
335{
336 local_store: Arc<S>,
338 signaling: Arc<MeshRouter<R, F>>,
340 htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
342 pending_requests: RwLock<HashMap<String, PendingRequest>>,
344 pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
346 issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
348 next_quote_id: RwLock<u64>,
350 peer_selector: RwLock<PeerSelector>,
352 routing: MeshRoutingConfig,
354 request_timeout: Duration,
356 debug: bool,
358 running: RwLock<bool>,
360}
361
362impl<S, R, F> MeshStoreCore<S, R, F>
363where
364 S: Store + Send + Sync + 'static,
365 R: SignalingTransport + Send + Sync + 'static,
366 F: PeerLinkFactory + Send + Sync + 'static,
367{
368 pub fn new(
370 local_store: Arc<S>,
371 signaling: Arc<MeshRouter<R, F>>,
372 request_timeout: Duration,
373 debug: bool,
374 ) -> Self {
375 Self::new_with_routing(
376 local_store,
377 signaling,
378 request_timeout,
379 debug,
380 Default::default(),
381 )
382 }
383
384 pub fn new_with_routing(
386 local_store: Arc<S>,
387 signaling: Arc<MeshRouter<R, F>>,
388 request_timeout: Duration,
389 debug: bool,
390 routing: MeshRoutingConfig,
391 ) -> Self {
392 let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
393 selector.set_fairness(routing.fairness_enabled);
394 selector.set_cashu_payment_weight(routing.cashu_payment_weight);
395 Self {
396 local_store,
397 signaling,
398 htl_configs: RwLock::new(HashMap::new()),
399 pending_requests: RwLock::new(HashMap::new()),
400 pending_quotes: RwLock::new(HashMap::new()),
401 issued_quotes: RwLock::new(HashMap::new()),
402 next_quote_id: RwLock::new(1),
403 peer_selector: RwLock::new(selector),
404 routing,
405 request_timeout,
406 debug,
407 running: RwLock::new(false),
408 }
409 }
410
411 pub async fn start(&self) -> Result<(), TransportError> {
413 *self.running.write().await = true;
414
415 self.signaling.send_hello(vec![]).await?;
417
418 Ok(())
419 }
420
421 pub async fn stop(&self) {
423 *self.running.write().await = false;
424 }
425
426 pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
428 let peer_id = msg.peer_id().to_string();
430 {
431 let mut configs = self.htl_configs.write().await;
432 if !configs.contains_key(&peer_id) {
433 configs.insert(peer_id.clone(), PeerHTLConfig::random());
434 }
435 }
436 self.peer_selector.write().await.add_peer(peer_id);
437
438 self.signaling.handle_message(msg).await
439 }
440
441 pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
443 &self.signaling
444 }
445
446 fn response_behavior(&self) -> ResponseBehaviorConfig {
447 self.routing.response_behavior.normalized()
448 }
449
450 fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
451 let mut hasher = DefaultHasher::new();
452 peer_id.hash(&mut hasher);
453 hash.hash(&mut hasher);
454 salt.hash(&mut hasher);
455 let v = hasher.finish();
456 (v as f64) / (u64::MAX as f64)
457 }
458
459 fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
460 Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
461 }
462
463 fn peer_metadata_pointer_slot_hash() -> Hash {
464 hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
465 }
466
467 fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
468 let bytes = hex::decode(hash_hex)
469 .map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
470 if bytes.len() != 32 {
471 return Err(StoreError::Other(format!(
472 "Invalid hash length {}, expected 32 bytes",
473 bytes.len()
474 )));
475 }
476 let mut hash = [0u8; 32];
477 hash.copy_from_slice(&bytes);
478 Ok(hash)
479 }
480
481 fn should_drop_response(&self, hash: &Hash) -> bool {
482 let p = self.response_behavior().drop_response_prob;
483 if p <= 0.0 {
484 return false;
485 }
486 self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
487 }
488
489 fn should_corrupt_response(&self, hash: &Hash) -> bool {
490 let p = self.response_behavior().corrupt_response_prob;
491 if p <= 0.0 {
492 return false;
493 }
494 self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
495 }
496
497 async fn ordered_connected_peers(&self) -> Vec<String> {
498 let current_peer_ids = self.signaling.peer_ids().await;
499 if current_peer_ids.is_empty() {
500 return Vec::new();
501 }
502
503 sync_selector_peers(&self.peer_selector, ¤t_peer_ids).await;
504 let current_set: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
505 let mut ordered_peer_ids = self.peer_selector.write().await.select_peers();
506 ordered_peer_ids.retain(|peer_id| current_set.contains(peer_id.as_str()));
507 if ordered_peer_ids.is_empty() {
508 let mut fallback = current_peer_ids;
509 fallback.sort();
510 return fallback;
511 }
512 ordered_peer_ids
513 }
514
515 fn requested_quote_mint(&self) -> Option<&str> {
516 if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
517 if self.routing.cashu_accepted_mints.is_empty()
518 || self
519 .routing
520 .cashu_accepted_mints
521 .iter()
522 .any(|mint| mint == default_mint)
523 {
524 return Some(default_mint);
525 }
526 }
527
528 self.routing
529 .cashu_accepted_mints
530 .first()
531 .map(String::as_str)
532 }
533
534 fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
535 if let Some(requested_mint) = requested_mint {
536 if self.accepts_quote_mint(Some(requested_mint)) {
537 return Some(requested_mint.to_string());
538 }
539 }
540 if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
541 return Some(default_mint.clone());
542 }
543 if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
544 return Some(first_mint.clone());
545 }
546 requested_mint.map(str::to_string)
547 }
548
549 fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
550 if self.routing.cashu_accepted_mints.is_empty() {
551 return true;
552 }
553
554 let Some(mint_url) = mint_url else {
555 return false;
556 };
557 self.routing
558 .cashu_accepted_mints
559 .iter()
560 .any(|mint| mint == mint_url)
561 }
562
563 fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
564 let Some(mint_url) = mint_url else {
565 return self.routing.cashu_default_mint.is_none()
566 && self.routing.cashu_accepted_mints.is_empty();
567 };
568 self.routing.cashu_default_mint.as_deref() == Some(mint_url)
569 || self
570 .routing
571 .cashu_accepted_mints
572 .iter()
573 .any(|mint| mint == mint_url)
574 }
575
576 async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
577 let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
578 if base == 0 {
579 return 0;
580 }
581
582 let selector = self.peer_selector.read().await;
583 let Some(stats) = selector.get_stats(peer_id) else {
584 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
585 return if max_cap > 0 { base.min(max_cap) } else { base };
586 };
587
588 if stats.cashu_payment_defaults > 0
589 && stats.cashu_payment_defaults >= stats.cashu_payment_receipts
590 {
591 return 0;
592 }
593
594 let success_bonus = stats
595 .successes
596 .saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
597 let receipt_bonus = stats
598 .cashu_payment_receipts
599 .saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
600 let mut cap = base
601 .saturating_add(success_bonus)
602 .saturating_add(receipt_bonus);
603 let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
604 if max_cap > 0 {
605 cap = cap.min(max_cap);
606 }
607 cap
608 }
609
610 async fn should_accept_quote_response(
611 &self,
612 from_peer: &str,
613 preferred_mint_url: Option<&str>,
614 offered_payment_sat: u64,
615 res: &DataQuoteResponse,
616 ) -> bool {
617 let Some(payment_sat) = res.p else {
618 return false;
619 };
620 if payment_sat > offered_payment_sat {
621 return false;
622 }
623
624 let response_mint = res.m.as_deref();
625 if response_mint == preferred_mint_url {
626 return true;
627 }
628 if self.trusts_quote_mint(response_mint) {
629 return true;
630 }
631 if response_mint.is_none() {
632 return false;
633 }
634
635 payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
636 }
637
638 async fn issue_quote(
639 &self,
640 peer_id: &str,
641 hash_key: &str,
642 payment_sat: u64,
643 ttl_ms: u32,
644 mint_url: Option<&str>,
645 ) -> u64 {
646 let quote_id = {
647 let mut next = self.next_quote_id.write().await;
648 let quote_id = *next;
649 *next = next.saturating_add(1);
650 quote_id
651 };
652
653 let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
654 self.issued_quotes.write().await.insert(
655 (peer_id.to_string(), hash_key.to_string(), quote_id),
656 IssuedQuote {
657 expires_at,
658 payment_sat,
659 mint_url: mint_url.map(str::to_string),
660 },
661 );
662 quote_id
663 }
664
665 async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
666 let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
667 let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
668 return false;
669 };
670 quote.expires_at > Instant::now()
671 }
672
673 async fn send_request_to_peer(
674 &self,
675 peer_id: &str,
676 hash: &Hash,
677 quote_id: Option<u64>,
678 ) -> bool {
679 let channel = match self.signaling.get_channel(peer_id).await {
680 Some(c) => c,
681 None => return false,
682 };
683
684 let htl_config = {
685 let configs = self.htl_configs.read().await;
686 configs
687 .get(peer_id)
688 .cloned()
689 .unwrap_or_else(PeerHTLConfig::random)
690 };
691
692 let send_htl = htl_config.decrement(MAX_HTL);
693 let req = match quote_id {
694 Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
695 None => create_request(hash, send_htl),
696 };
697 let request_bytes = encode_request(&req);
698
699 {
700 let mut selector = self.peer_selector.write().await;
701 selector.record_request(peer_id, request_bytes.len() as u64);
702 }
703
704 match channel.send(request_bytes).await {
705 Ok(()) => true,
706 Err(_) => {
707 self.peer_selector.write().await.record_failure(peer_id);
708 false
709 }
710 }
711 }
712
713 async fn send_quote_request_to_peer(
714 &self,
715 peer_id: &str,
716 hash: &Hash,
717 payment_sat: u64,
718 ttl_ms: u32,
719 mint_url: Option<&str>,
720 ) -> bool {
721 let channel = match self.signaling.get_channel(peer_id).await {
722 Some(c) => c,
723 None => return false,
724 };
725
726 let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
727 let request_bytes = encode_quote_request(&req);
728
729 match channel.send(request_bytes).await {
730 Ok(()) => true,
731 Err(_) => false,
732 }
733 }
734
735 pub async fn peer_count(&self) -> usize {
737 self.signaling.peer_count().await
738 }
739
740 pub async fn needs_peers(&self) -> bool {
742 self.signaling.needs_peers().await
743 }
744
745 pub async fn send_hello(&self) -> Result<(), TransportError> {
747 self.signaling.send_hello(vec![]).await
748 }
749
750 pub async fn drain_available_data_messages(&self) -> DataPumpStats {
755 let mut stats = DataPumpStats::default();
756 let peer_ids = self.signaling.peer_ids().await;
757 for peer_id in peer_ids {
758 let Some(channel) = self.signaling.get_channel(&peer_id).await else {
759 continue;
760 };
761
762 while let Some(data) = channel.try_recv() {
763 stats.processed += 1;
764 stats.processed_bytes += data.len() as u64;
765 if let Some(msg) = parse_message(&data) {
766 match msg {
767 DataMessage::Request(_) => stats.request_messages += 1,
768 DataMessage::Response(_) => stats.response_messages += 1,
769 DataMessage::QuoteRequest(_) => stats.quote_request_messages += 1,
770 DataMessage::QuoteResponse(_) => stats.quote_response_messages += 1,
771 DataMessage::Payment(_)
772 | DataMessage::PaymentAck(_)
773 | DataMessage::Chunk(_) => {}
774 }
775 }
776 self.handle_data_message(&peer_id, &data).await;
777 }
778 }
779 stats
780 }
781
782 pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
784 self.peer_selector
785 .write()
786 .await
787 .record_cashu_payment(peer_id, amount_sat);
788 }
789
790 pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
792 self.peer_selector
793 .write()
794 .await
795 .record_cashu_receipt(peer_id, amount_sat);
796 }
797
798 pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
800 self.peer_selector
801 .write()
802 .await
803 .record_cashu_payment_default(peer_id);
804 }
805
806 pub async fn selector_summary(&self) -> crate::peer_selector::SelectorSummary {
808 self.peer_selector.read().await.summary()
809 }
810
811 fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
812 selector.is_peer_blocked_for_payment_defaults(
813 peer_id,
814 self.routing.cashu_payment_default_block_threshold,
815 )
816 }
817
818 pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
820 self.peer_selector
821 .read()
822 .await
823 .export_peer_metadata_snapshot()
824 }
825
826 pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
831 let snapshot = self
832 .peer_selector
833 .read()
834 .await
835 .export_peer_metadata_snapshot();
836 let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
837 StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
838 })?;
839 let snapshot_hash = hashtree_core::sha256(&bytes);
840 let _ = self.local_store.put(snapshot_hash, bytes).await?;
841
842 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
843 let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
844 let _ = self.local_store.delete(&pointer_slot).await?;
845 let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
846
847 Ok(snapshot_hash)
848 }
849
850 pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
852 let pointer_slot = Self::peer_metadata_pointer_slot_hash();
853 let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
854 return Ok(false);
855 };
856 let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
857 StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
858 })?;
859 let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
860
861 let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
862 return Ok(false);
863 };
864 let snapshot: PeerMetadataSnapshot =
865 serde_json::from_slice(&snapshot_bytes).map_err(|e| {
866 StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
867 })?;
868 self.peer_selector
869 .write()
870 .await
871 .import_peer_metadata_snapshot(&snapshot);
872 Ok(true)
873 }
874
875 pub async fn get_with_quote(
880 &self,
881 hash: &Hash,
882 payment_sat: u64,
883 quote_ttl: Duration,
884 ) -> Result<Option<Vec<u8>>, StoreError> {
885 if let Some(data) = self.local_store.get(hash).await? {
886 return Ok(Some(data));
887 }
888 Ok(self
889 .request_from_peers_with_quote(hash, payment_sat, quote_ttl)
890 .await)
891 }
892
893 async fn request_from_peers_with_quote(
894 &self,
895 hash: &Hash,
896 payment_sat: u64,
897 quote_ttl: Duration,
898 ) -> Option<Vec<u8>> {
899 let ordered_peer_ids = self.ordered_connected_peers().await;
900 if ordered_peer_ids.is_empty() {
901 return None;
902 }
903
904 if let Some(quote) = self
905 .request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
906 .await
907 {
908 if let Some(data) = self
909 .request_from_single_peer(hash, "e.peer_id, Some(quote.quote_id))
910 .await
911 {
912 return Some(data);
913 }
914 }
915
916 self.request_from_ordered_peers(hash, &ordered_peer_ids)
917 .await
918 }
919
920 async fn request_quote_from_peers(
921 &self,
922 hash: &Hash,
923 payment_sat: u64,
924 quote_ttl: Duration,
925 ordered_peer_ids: &[String],
926 ) -> Option<NegotiatedQuote> {
927 if ordered_peer_ids.is_empty() {
928 return None;
929 }
930 let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
931 if ttl_ms == 0 {
932 return None;
933 }
934 let requested_mint = self.requested_quote_mint().map(str::to_string);
935
936 let hash_key = hash_to_key(hash);
937 let (tx, rx) = oneshot::channel();
938 self.pending_quotes.write().await.insert(
939 hash_key.clone(),
940 PendingQuoteRequest {
941 response_tx: tx,
942 preferred_mint_url: requested_mint.clone(),
943 offered_payment_sat: payment_sat,
944 },
945 );
946
947 let rx = Arc::new(Mutex::new(rx));
948 let result = run_hedged_waves(
949 ordered_peer_ids.len(),
950 self.routing.dispatch,
951 self.request_timeout,
952 |range| {
953 let wave_peer_ids = ordered_peer_ids[range].to_vec();
954 let requested_mint = requested_mint.clone();
955 let hash = *hash;
956 async move {
957 let mut sent = 0usize;
958 for peer_id in wave_peer_ids {
959 if self
960 .send_quote_request_to_peer(
961 &peer_id,
962 &hash,
963 payment_sat,
964 ttl_ms,
965 requested_mint.as_deref(),
966 )
967 .await
968 {
969 sent += 1;
970 }
971 }
972 sent
973 }
974 },
975 |wait| {
976 let rx = rx.clone();
977 async move {
978 let mut rx = rx.lock().await;
979 match tokio::time::timeout(wait, &mut *rx).await {
980 Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
981 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
982 Err(_) => HedgedWaveAction::Continue,
983 }
984 }
985 },
986 )
987 .await;
988 let _ = self.pending_quotes.write().await.remove(&hash_key);
989 result
990 }
991
992 async fn request_from_single_peer(
993 &self,
994 hash: &Hash,
995 peer_id: &str,
996 quote_id: Option<u64>,
997 ) -> Option<Vec<u8>> {
998 let hash_key = hash_to_key(hash);
999 let (tx, rx) = oneshot::channel();
1000 self.pending_requests.write().await.insert(
1001 hash_key.clone(),
1002 PendingRequest {
1003 response_tx: tx,
1004 started_at: Instant::now(),
1005 queried_peers: vec![peer_id.to_string()],
1006 },
1007 );
1008
1009 let mut rx = rx;
1010 if !self.send_request_to_peer(peer_id, hash, quote_id).await {
1011 let _ = self.pending_requests.write().await.remove(&hash_key);
1012 return None;
1013 }
1014
1015 if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
1016 if hashtree_core::sha256(&data) == *hash {
1017 let _ = self.local_store.put(*hash, data.clone()).await;
1018 return Some(data);
1019 }
1020 }
1021
1022 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1023 for peer_id in pending.queried_peers {
1024 self.peer_selector.write().await.record_timeout(&peer_id);
1025 }
1026 }
1027 None
1028 }
1029
1030 async fn request_from_ordered_peers(
1031 &self,
1032 hash: &Hash,
1033 ordered_peer_ids: &[String],
1034 ) -> Option<Vec<u8>> {
1035 let hash_key = hash_to_key(hash);
1036 let (tx, rx) = oneshot::channel();
1037 self.pending_requests.write().await.insert(
1038 hash_key.clone(),
1039 PendingRequest {
1040 response_tx: tx,
1041 started_at: Instant::now(),
1042 queried_peers: Vec::new(),
1043 },
1044 );
1045
1046 let rx = Arc::new(Mutex::new(rx));
1047 let result = run_hedged_waves(
1048 ordered_peer_ids.len(),
1049 self.routing.dispatch,
1050 self.request_timeout,
1051 |range| {
1052 let wave_peer_ids = ordered_peer_ids[range].to_vec();
1053 let hash = *hash;
1054 let hash_key = hash_key.clone();
1055 async move {
1056 let mut sent = 0usize;
1057 for peer_id in wave_peer_ids {
1058 if self.send_request_to_peer(&peer_id, &hash, None).await {
1059 sent += 1;
1060 if let Some(pending) =
1061 self.pending_requests.write().await.get_mut(&hash_key)
1062 {
1063 pending.queried_peers.push(peer_id);
1064 }
1065 }
1066 }
1067 sent
1068 }
1069 },
1070 |wait| {
1071 let rx = rx.clone();
1072 async move {
1073 let mut rx = rx.lock().await;
1074 match tokio::time::timeout(wait, &mut *rx).await {
1075 Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == *hash => {
1076 HedgedWaveAction::Success(data)
1077 }
1078 Ok(Ok(Some(_))) => HedgedWaveAction::Continue,
1079 Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
1080 Err(_) => HedgedWaveAction::Continue,
1081 }
1082 }
1083 },
1084 )
1085 .await;
1086
1087 let Some(data) = result else {
1088 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1089 for peer_id in pending.queried_peers {
1090 self.peer_selector.write().await.record_timeout(&peer_id);
1091 }
1092 }
1093 return None;
1094 };
1095
1096 let _ = self.local_store.put(*hash, data.clone()).await;
1097 Some(data)
1098 }
1099
1100 async fn request_from_peers(&self, hash: &Hash) -> Option<Vec<u8>> {
1102 let ordered_peer_ids = self.ordered_connected_peers().await;
1103 if ordered_peer_ids.is_empty() {
1104 return None;
1105 }
1106 self.request_from_ordered_peers(hash, &ordered_peer_ids)
1107 .await
1108 }
1109
1110 async fn complete_pending_response(&self, from_peer: &str, hash_key: String, payload: Vec<u8>) {
1111 if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
1112 let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
1113 self.peer_selector.write().await.record_success(
1114 from_peer,
1115 rtt_ms,
1116 payload.len() as u64,
1117 );
1118 let _ = pending.response_tx.send(Some(payload));
1119 }
1120 }
1121
1122 async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
1123 if !res.a {
1124 return;
1125 }
1126
1127 let Some(quote_id) = res.q else {
1128 return;
1129 };
1130
1131 let hash_key = hash_to_key(&res.h);
1132 let (preferred_mint_url, offered_payment_sat) = {
1133 let pending_quotes = self.pending_quotes.read().await;
1134 let Some(pending) = pending_quotes.get(&hash_key) else {
1135 return;
1136 };
1137 (
1138 pending.preferred_mint_url.clone(),
1139 pending.offered_payment_sat,
1140 )
1141 };
1142 if !self
1143 .should_accept_quote_response(
1144 from_peer,
1145 preferred_mint_url.as_deref(),
1146 offered_payment_sat,
1147 &res,
1148 )
1149 .await
1150 {
1151 return;
1152 }
1153 let mut pending_quotes = self.pending_quotes.write().await;
1154 if let Some(pending) = pending_quotes.remove(&hash_key) {
1155 let _ = pending.response_tx.send(Some(NegotiatedQuote {
1156 peer_id: from_peer.to_string(),
1157 quote_id,
1158 mint_url: res.m,
1159 }));
1160 }
1161 }
1162
1163 async fn handle_response_message(&self, from_peer: &str, res: crate::protocol::DataResponse) {
1164 let hash_key = hash_to_key(&res.h);
1165 let hash = match crate::protocol::bytes_to_hash(&res.h) {
1166 Some(h) => h,
1167 None => return,
1168 };
1169
1170 if hashtree_core::sha256(&res.d) != hash {
1172 self.peer_selector.write().await.record_failure(from_peer);
1173 if self.debug {
1174 println!("[MeshStoreCore] Ignoring invalid response payload for {hash_key}");
1175 }
1176 return;
1177 }
1178
1179 self.complete_pending_response(from_peer, hash_key, res.d)
1180 .await;
1181 }
1182
1183 async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
1184 let hash = match crate::protocol::bytes_to_hash(&req.h) {
1185 Some(h) => h,
1186 None => return,
1187 };
1188 let hash_key = hash_to_key(&hash);
1189
1190 {
1191 let selector = self.peer_selector.read().await;
1192 if self.should_refuse_requests_from_peer(&selector, from_peer) {
1193 if self.debug {
1194 println!(
1195 "[MeshStoreCore] Refusing quote request from delinquent peer {}",
1196 from_peer
1197 );
1198 }
1199 return;
1200 }
1201 }
1202
1203 let chosen_mint = self.choose_quote_mint(req.m.as_deref());
1204 let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
1205 && !self.should_drop_response(&hash)
1206 && !self.should_corrupt_response(&hash);
1207
1208 let res = if can_serve {
1209 let quote_id = self
1210 .issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
1211 .await;
1212 create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
1213 } else {
1214 create_quote_response_unavailable(&hash)
1215 };
1216 let response_bytes = encode_quote_response(&res);
1217 if let Some(channel) = self.signaling.get_channel(from_peer).await {
1218 let _ = channel.send(response_bytes).await;
1219 }
1220 }
1221
1222 async fn handle_request_message(&self, from_peer: &str, req: crate::protocol::DataRequest) {
1223 let hash = match crate::protocol::bytes_to_hash(&req.h) {
1224 Some(h) => h,
1225 None => return,
1226 };
1227 let hash_key = hash_to_key(&hash);
1228
1229 {
1230 let selector = self.peer_selector.read().await;
1231 if self.should_refuse_requests_from_peer(&selector, from_peer) {
1232 if self.debug {
1233 println!(
1234 "[MeshStoreCore] Refusing request from delinquent peer {}",
1235 from_peer
1236 );
1237 }
1238 return;
1239 }
1240 }
1241
1242 if let Some(quote_id) = req.q {
1243 if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
1244 if self.debug {
1245 println!(
1246 "[MeshStoreCore] Refusing request with invalid or expired quote {} from {}",
1247 quote_id, from_peer
1248 );
1249 }
1250 return;
1251 }
1252 }
1253
1254 if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
1256 if self.should_drop_response(&hash) {
1257 if self.debug {
1258 println!(
1259 "[MeshStoreCore] Dropping response for {} due to actor profile",
1260 hash_to_key(&hash)
1261 );
1262 }
1263 return;
1264 }
1265
1266 let behavior = self.response_behavior();
1267 if behavior.extra_delay_ms > 0 {
1268 tokio::time::sleep(Duration::from_millis(behavior.extra_delay_ms)).await;
1269 }
1270
1271 if self.should_corrupt_response(&hash) {
1272 if data.is_empty() {
1273 data.push(0x80);
1274 } else {
1275 data[0] ^= 0x80;
1276 }
1277 }
1278
1279 let res = create_response(&hash, data);
1281 let response_bytes = encode_response(&res);
1282 if let Some(channel) = self.signaling.get_channel(from_peer).await {
1283 let _ = channel.send(response_bytes).await;
1284 }
1285 }
1286 }
1288
1289 pub async fn handle_data_message(&self, from_peer: &str, data: &[u8]) {
1291 let parsed = match parse_message(data) {
1292 Some(m) => m,
1293 None => return,
1294 };
1295
1296 match parsed {
1297 DataMessage::Request(req) => {
1298 self.handle_request_message(from_peer, req).await;
1299 }
1300 DataMessage::Response(res) => {
1301 self.handle_response_message(from_peer, res).await;
1302 }
1303 DataMessage::QuoteRequest(req) => {
1304 self.handle_quote_request_message(from_peer, req).await;
1305 }
1306 DataMessage::QuoteResponse(res) => {
1307 self.handle_quote_response_message(from_peer, res).await;
1308 }
1309 DataMessage::Payment(_) | DataMessage::PaymentAck(_) | DataMessage::Chunk(_) => {}
1310 }
1311 }
1312}
1313
1314#[async_trait]
1315impl<S, R, F> Store for MeshStoreCore<S, R, F>
1316where
1317 S: Store + Send + Sync + 'static,
1318 R: SignalingTransport + Send + Sync + 'static,
1319 F: PeerLinkFactory + Send + Sync + 'static,
1320{
1321 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
1322 self.local_store.put(hash, data).await
1323 }
1324
1325 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
1326 if let Some(data) = self.local_store.get(hash).await? {
1328 return Ok(Some(data));
1329 }
1330
1331 Ok(self.request_from_peers(hash).await)
1333 }
1334
1335 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
1336 self.local_store.has(hash).await
1337 }
1338
1339 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
1340 self.local_store.delete(hash).await
1341 }
1342}
1343
1344#[cfg(test)]
1345mod tests;
1346
1347pub type SimMeshStore<S> =
1349 MeshStoreCore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
1350
1351pub type ProductionMeshStore<S> =
1353 MeshStoreCore<S, crate::nostr::NostrRelayTransport, crate::real_factory::WebRtcPeerLinkFactory>;