use async_trait::async_trait;
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use std::collections::hash_map::DefaultHasher;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::hash::{Hash as _, Hasher};
use std::ops::Range;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{oneshot, Mutex, RwLock};
use tokio::time::Instant;
use hashtree_core::{Hash, Store, StoreError};
use crate::peer_selector::{PeerMetadataSnapshot, PeerSelector, SelectionStrategy};
use crate::protocol::{
create_quote_request, create_quote_response_available, create_quote_response_unavailable,
create_request, create_request_with_quote, create_response, encode_quote_request,
encode_quote_response, encode_request, encode_response, hash_to_key, parse_message,
DataMessage, DataQuoteRequest, DataQuoteResponse,
};
use crate::signaling::MeshRouter;
use crate::transport::{PeerLinkFactory, SignalingTransport, TransportError};
use crate::types::{should_forward_htl, PeerHTLConfig, SignalingMessage, TimedSeenSet, MAX_HTL};
const PEER_METADATA_POINTER_SLOT_KEY: &[u8] = b"hashtree-webrtc/peer-metadata/latest/v1";
const RECENT_FORWARD_MISS_CAPACITY: usize = 4096;
const MIN_RECENT_FORWARD_MISS_TTL_MS: u64 = 250;
struct PendingRequest {
response_tx: oneshot::Sender<Option<Vec<u8>>>,
started_at: Instant,
queried_peers: Vec<String>,
}
struct PendingQuoteRequest {
response_tx: oneshot::Sender<Option<NegotiatedQuote>>,
preferred_mint_url: Option<String>,
offered_payment_sat: u64,
}
struct PendingForwardRequest {
requester_ids: HashSet<String>,
}
#[derive(Debug, Clone, Default)]
struct PeerWireStats {
bytes_sent: u64,
bytes_received: u64,
bandwidth_debt: f64,
}
struct PendingResponseSend {
job_id: u64,
peer_id: String,
bytes: Vec<u8>,
ready_at: Instant,
queue_sequence: u64,
}
#[async_trait]
pub trait MeshReadSource: Send + Sync {
fn id(&self) -> &str;
fn is_available(&self) -> bool {
true
}
async fn get(&self, hash: &Hash) -> Option<Vec<u8>>;
}
#[derive(Debug, Clone)]
struct NegotiatedQuote {
peer_id: String,
quote_id: u64,
#[allow(dead_code)]
mint_url: Option<String>,
}
struct IssuedQuote {
expires_at: Instant,
#[allow(dead_code)]
payment_sat: u64,
#[allow(dead_code)]
mint_url: Option<String>,
}
#[derive(Debug, Clone, Default)]
struct AdaptiveSourceStats {
requests: u64,
successes: u64,
misses: u64,
failures: u64,
timeouts: u64,
srtt_ms: f64,
rttvar_ms: f64,
backoff_level: u32,
backed_off_until: Option<Instant>,
last_success_at: Option<Instant>,
last_failure_at: Option<Instant>,
}
#[derive(Debug, Clone)]
enum RouteFetchOutcome {
Hit(Vec<u8>),
Miss,
Timeout,
}
struct InflightSourceFetch {
waiters: Vec<oneshot::Sender<RouteFetchOutcome>>,
}
enum SourceFetchOutcome {
Hit {
source_id: String,
data: Vec<u8>,
elapsed_ms: u64,
},
Miss {
source_id: String,
},
Failure {
source_id: String,
},
}
const INITIAL_SOURCE_BACKOFF_MS: u64 = 250;
const MAX_SOURCE_BACKOFF_MS: u64 = 10_000;
const SOURCE_SCORE_TIE_DELTA: f64 = 0.15;
const RECENT_SOURCE_SUCCESS_WINDOW: Duration = Duration::from_secs(60);
const ACTIVE_PEER_REQUEST_RANK_PENALTY: usize = 3;
fn source_reliability_score(stats: &AdaptiveSourceStats) -> f64 {
(stats.successes as f64 + 1.0) / (stats.requests as f64 + 2.0)
}
fn source_latency_score(stats: &AdaptiveSourceStats) -> f64 {
if stats.srtt_ms <= 0.0 {
return 0.5;
}
(500.0 / (stats.srtt_ms + 50.0)).min(1.0)
}
fn source_has_history(stats: &AdaptiveSourceStats) -> bool {
stats.requests > 0
|| stats.successes > 0
|| stats.misses > 0
|| stats.failures > 0
|| stats.timeouts > 0
}
fn adaptive_source_score(stats: &AdaptiveSourceStats, now: Instant) -> f64 {
if let Some(backed_off_until) = stats.backed_off_until {
if backed_off_until > now {
return f64::NEG_INFINITY;
}
}
let miss_penalty = if stats.requests > 0 {
(stats.misses as f64 / stats.requests as f64) * 0.15
} else {
0.0
};
let failure_penalty = if stats.requests > 0 {
((stats.failures + stats.timeouts) as f64 / stats.requests as f64) * 0.3
} else {
0.0
};
let recency_bonus = if stats
.last_success_at
.is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
{
0.1
} else {
0.0
};
0.6 * source_reliability_score(stats) + 0.3 * source_latency_score(stats) + recency_bonus
- miss_penalty
- failure_penalty
}
fn peer_endpoint_has_history(stats: &crate::peer_selector::PeerStats) -> bool {
stats.requests_sent > 0 || stats.successes > 0 || stats.failures > 0 || stats.timeouts > 0
}
fn peer_endpoint_score(stats: &crate::peer_selector::PeerStats, now: Instant) -> f64 {
if stats.backed_off_until.is_some_and(|until| until > now) {
return f64::NEG_INFINITY;
}
let miss_penalty = 0.0;
let failure_penalty = if stats.requests_sent > 0 {
((stats.failures + stats.timeouts) as f64 / stats.requests_sent as f64) * 0.3
} else {
0.0
};
let recency_bonus = if stats
.last_success
.is_some_and(|last| now.duration_since(last) < RECENT_SOURCE_SUCCESS_WINDOW)
{
0.1
} else {
0.0
};
0.6 * stats.success_rate()
+ 0.3
* source_latency_score(&AdaptiveSourceStats {
srtt_ms: stats.srtt_ms,
..AdaptiveSourceStats::default()
})
+ recency_bonus
- miss_penalty
- failure_penalty
}
#[derive(Clone)]
enum ReadRoute {
Peers(Vec<String>),
Sources,
}
impl ReadRoute {
fn id(&self) -> &'static str {
match self {
Self::Peers(_) => "peers",
Self::Sources => "sources",
}
}
}
struct RankedReadRoute {
route: ReadRoute,
best_endpoint_id: String,
score: f64,
has_history: bool,
}
fn ranked_route_kind(route: &ReadRoute) -> u8 {
match route {
ReadRoute::Sources => 0,
ReadRoute::Peers(_) => 1,
}
}
#[derive(Debug, Clone)]
struct MeshReadContext {
exclude_peer_id: Option<String>,
request_htl: u8,
}
impl Default for MeshReadContext {
fn default() -> Self {
Self {
exclude_peer_id: None,
request_htl: MAX_HTL,
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct DataPumpStats {
pub processed: usize,
pub request_messages: usize,
pub response_messages: usize,
pub quote_request_messages: u64,
pub quote_response_messages: u64,
pub processed_bytes: u64,
}
#[derive(Debug, Clone, Copy)]
pub struct RequestDispatchConfig {
pub initial_fanout: usize,
pub hedge_fanout: usize,
pub max_fanout: usize,
pub hedge_interval_ms: u64,
}
impl Default for RequestDispatchConfig {
fn default() -> Self {
Self {
initial_fanout: usize::MAX,
hedge_fanout: usize::MAX,
max_fanout: usize::MAX,
hedge_interval_ms: 0,
}
}
}
pub fn normalize_dispatch_config(
dispatch: RequestDispatchConfig,
available_peers: usize,
) -> RequestDispatchConfig {
let mut cfg = dispatch;
let cap = if cfg.max_fanout == 0 {
available_peers
} else {
cfg.max_fanout.min(available_peers)
};
cfg.max_fanout = cap;
cfg.initial_fanout = if cfg.initial_fanout == 0 {
1
} else {
cfg.initial_fanout.min(cap.max(1))
};
cfg.hedge_fanout = if cfg.hedge_fanout == 0 {
1
} else {
cfg.hedge_fanout.min(cap.max(1))
};
cfg
}
pub fn build_hedged_wave_plan(peer_count: usize, dispatch: RequestDispatchConfig) -> Vec<usize> {
if peer_count == 0 {
return Vec::new();
}
let cap = dispatch.max_fanout.min(peer_count);
if cap == 0 {
return Vec::new();
}
let mut plan = Vec::new();
let mut sent = 0usize;
let first = dispatch.initial_fanout.min(cap).max(1);
plan.push(first);
sent += first;
while sent < cap {
let next = dispatch.hedge_fanout.min(cap - sent).max(1);
plan.push(next);
sent += next;
}
plan
}
#[derive(Debug)]
pub enum HedgedWaveAction<T> {
Continue,
Success(T),
Abort,
}
pub async fn run_hedged_waves<T, SendWave, SendWaveFut, WaitWave, WaitWaveFut>(
peer_count: usize,
dispatch: RequestDispatchConfig,
request_timeout: Duration,
mut send_wave: SendWave,
mut wait_wave: WaitWave,
) -> Option<T>
where
SendWave: FnMut(Range<usize>) -> SendWaveFut,
SendWaveFut: Future<Output = usize>,
WaitWave: FnMut(Duration) -> WaitWaveFut,
WaitWaveFut: Future<Output = HedgedWaveAction<T>>,
{
let dispatch = normalize_dispatch_config(dispatch, peer_count);
let wave_plan = build_hedged_wave_plan(peer_count, dispatch);
if wave_plan.is_empty() {
return None;
}
let deadline = Instant::now() + request_timeout;
let mut sent_total = 0usize;
let mut next_peer_idx = 0usize;
for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
let from = next_peer_idx;
let to = (next_peer_idx + wave_size).min(peer_count);
next_peer_idx = to;
if from == to {
continue;
}
sent_total += send_wave(from..to).await;
if sent_total == 0 {
if next_peer_idx >= peer_count {
break;
}
continue;
}
let now = Instant::now();
if now >= deadline {
break;
}
let remaining = deadline.saturating_duration_since(now);
let is_last_wave = wave_idx + 1 == wave_plan.len() || next_peer_idx >= peer_count;
let wait = if is_last_wave {
remaining
} else if dispatch.hedge_interval_ms == 0 {
Duration::ZERO
} else {
Duration::from_millis(dispatch.hedge_interval_ms).min(remaining)
};
if wait.is_zero() {
continue;
}
match wait_wave(wait).await {
HedgedWaveAction::Continue => {}
HedgedWaveAction::Success(value) => return Some(value),
HedgedWaveAction::Abort => break,
}
}
None
}
pub async fn sync_selector_peers(selector: &RwLock<PeerSelector>, current_peer_ids: &[String]) {
let mut selector = selector.write().await;
let current: HashSet<&str> = current_peer_ids.iter().map(String::as_str).collect();
let known: Vec<String> = selector.all_stats().map(|s| s.peer_id.clone()).collect();
for peer_id in known {
if !current.contains(peer_id.as_str()) {
selector.remove_peer(&peer_id);
}
}
for peer_id in current_peer_ids {
selector.add_peer(peer_id.clone());
}
}
#[derive(Debug, Clone, Copy)]
pub struct ResponseBehaviorConfig {
pub drop_response_prob: f64,
pub corrupt_response_prob: f64,
pub extra_delay_ms: u64,
pub first_byte_delay_ms: u64,
pub bytes_per_second: u64,
pub stall_response_prob: f64,
pub stall_delay_ms: u64,
}
impl Default for ResponseBehaviorConfig {
fn default() -> Self {
Self {
drop_response_prob: 0.0,
corrupt_response_prob: 0.0,
extra_delay_ms: 0,
first_byte_delay_ms: 0,
bytes_per_second: 0,
stall_response_prob: 0.0,
stall_delay_ms: 0,
}
}
}
impl ResponseBehaviorConfig {
fn normalized(self) -> Self {
Self {
drop_response_prob: self.drop_response_prob.clamp(0.0, 1.0),
corrupt_response_prob: self.corrupt_response_prob.clamp(0.0, 1.0),
extra_delay_ms: self.extra_delay_ms,
first_byte_delay_ms: self.first_byte_delay_ms,
bytes_per_second: self.bytes_per_second,
stall_response_prob: self.stall_response_prob.clamp(0.0, 1.0),
stall_delay_ms: self.stall_delay_ms,
}
}
}
#[derive(Debug, Clone)]
pub struct MeshRoutingConfig {
pub selection_strategy: SelectionStrategy,
pub fairness_enabled: bool,
pub cashu_payment_weight: f64,
pub cashu_payment_default_block_threshold: u64,
pub cashu_accepted_mints: Vec<String>,
pub cashu_default_mint: Option<String>,
pub cashu_peer_suggested_mint_base_cap_sat: u64,
pub cashu_peer_suggested_mint_success_step_sat: u64,
pub cashu_peer_suggested_mint_receipt_step_sat: u64,
pub cashu_peer_suggested_mint_max_cap_sat: u64,
pub dispatch: RequestDispatchConfig,
pub response_behavior: ResponseBehaviorConfig,
}
impl Default for MeshRoutingConfig {
fn default() -> Self {
Self {
selection_strategy: SelectionStrategy::Weighted,
fairness_enabled: true,
cashu_payment_weight: 0.0,
cashu_payment_default_block_threshold: 0,
cashu_accepted_mints: Vec::new(),
cashu_default_mint: None,
cashu_peer_suggested_mint_base_cap_sat: 0,
cashu_peer_suggested_mint_success_step_sat: 0,
cashu_peer_suggested_mint_receipt_step_sat: 0,
cashu_peer_suggested_mint_max_cap_sat: 0,
dispatch: RequestDispatchConfig::default(),
response_behavior: ResponseBehaviorConfig::default(),
}
}
}
pub struct MeshStoreCore<S, R, F>
where
S: Store + Send + Sync + 'static,
R: SignalingTransport + Send + Sync + 'static,
F: PeerLinkFactory + Send + Sync + 'static,
{
local_store: Arc<S>,
signaling: Arc<MeshRouter<R, F>>,
htl_configs: RwLock<HashMap<String, PeerHTLConfig>>,
pending_requests: RwLock<HashMap<String, PendingRequest>>,
pending_quotes: RwLock<HashMap<String, PendingQuoteRequest>>,
pending_forward_requests: RwLock<HashMap<String, PendingForwardRequest>>,
recent_forward_misses: Mutex<TimedSeenSet>,
issued_quotes: RwLock<HashMap<(String, String, u64), IssuedQuote>>,
next_quote_id: RwLock<u64>,
read_sources: RwLock<HashMap<String, Arc<dyn MeshReadSource>>>,
read_source_stats: RwLock<HashMap<String, AdaptiveSourceStats>>,
inflight_source_fetches: Mutex<HashMap<String, InflightSourceFetch>>,
peer_selector: RwLock<PeerSelector>,
peer_active_requests: RwLock<HashMap<String, usize>>,
peer_wire_stats: RwLock<HashMap<String, PeerWireStats>>,
pending_response_sends: Mutex<Vec<PendingResponseSend>>,
response_scheduler_running: AtomicBool,
next_response_job_id: AtomicU64,
routing: MeshRoutingConfig,
request_timeout: Duration,
debug: bool,
running: RwLock<bool>,
}
impl<S, R, F> MeshStoreCore<S, R, F>
where
S: Store + Send + Sync + 'static,
R: SignalingTransport + Send + Sync + 'static,
F: PeerLinkFactory + Send + Sync + 'static,
{
pub fn new(
local_store: Arc<S>,
signaling: Arc<MeshRouter<R, F>>,
request_timeout: Duration,
debug: bool,
) -> Self {
Self::new_with_routing(
local_store,
signaling,
request_timeout,
debug,
Default::default(),
)
}
pub fn new_with_routing(
local_store: Arc<S>,
signaling: Arc<MeshRouter<R, F>>,
request_timeout: Duration,
debug: bool,
routing: MeshRoutingConfig,
) -> Self {
let mut selector = PeerSelector::with_strategy(routing.selection_strategy);
selector.set_fairness(routing.fairness_enabled);
selector.set_cashu_payment_weight(routing.cashu_payment_weight);
Self {
local_store,
signaling,
htl_configs: RwLock::new(HashMap::new()),
pending_requests: RwLock::new(HashMap::new()),
pending_quotes: RwLock::new(HashMap::new()),
pending_forward_requests: RwLock::new(HashMap::new()),
recent_forward_misses: Mutex::new(TimedSeenSet::new(
RECENT_FORWARD_MISS_CAPACITY,
Self::recent_forward_miss_ttl(request_timeout),
)),
issued_quotes: RwLock::new(HashMap::new()),
next_quote_id: RwLock::new(1),
read_sources: RwLock::new(HashMap::new()),
read_source_stats: RwLock::new(HashMap::new()),
inflight_source_fetches: Mutex::new(HashMap::new()),
peer_selector: RwLock::new(selector),
peer_active_requests: RwLock::new(HashMap::new()),
peer_wire_stats: RwLock::new(HashMap::new()),
pending_response_sends: Mutex::new(Vec::new()),
response_scheduler_running: AtomicBool::new(false),
next_response_job_id: AtomicU64::new(1),
routing,
request_timeout,
debug,
running: RwLock::new(false),
}
}
fn recent_forward_miss_ttl(request_timeout: Duration) -> Duration {
let ttl_ms = request_timeout
.as_millis()
.saturating_mul(2)
.max(MIN_RECENT_FORWARD_MISS_TTL_MS as u128)
.min(u64::MAX as u128) as u64;
Duration::from_millis(ttl_ms)
}
pub async fn start(&self) -> Result<(), TransportError> {
*self.running.write().await = true;
self.signaling.send_hello(vec![]).await?;
Ok(())
}
pub async fn stop(&self) {
*self.running.write().await = false;
}
pub async fn process_signaling(&self, msg: SignalingMessage) -> Result<(), TransportError> {
let peer_id = msg.peer_id().to_string();
{
let mut configs = self.htl_configs.write().await;
if !configs.contains_key(&peer_id) {
configs.insert(peer_id.clone(), PeerHTLConfig::random());
}
}
self.peer_selector.write().await.add_peer(peer_id);
self.signaling.handle_message(msg).await
}
pub fn signaling(&self) -> &Arc<MeshRouter<R, F>> {
&self.signaling
}
fn response_behavior(&self) -> ResponseBehaviorConfig {
self.routing.response_behavior.normalized()
}
async fn record_peer_wire_sent(&self, peer_id: &str, bytes: u64) {
if bytes == 0 {
return;
}
let mut stats = self.peer_wire_stats.write().await;
let entry = stats.entry(peer_id.to_string()).or_default();
entry.bytes_sent = entry.bytes_sent.saturating_add(bytes);
}
async fn record_peer_wire_received(&self, peer_id: &str, bytes: u64) {
if bytes == 0 {
return;
}
let mut stats = self.peer_wire_stats.write().await;
let entry = stats.entry(peer_id.to_string()).or_default();
entry.bytes_received = entry.bytes_received.saturating_add(bytes);
}
fn peer_upload_weight(stats: &PeerWireStats) -> f64 {
let raw_ratio = (stats.bytes_received.saturating_add(1024) as f64)
/ (stats.bytes_sent.saturating_add(1024) as f64);
let bounded_ratio = raw_ratio / (1.0 + raw_ratio);
0.5 + 1.5 * bounded_ratio
}
fn choose_ready_response_job(
ready_jobs: &[(u64, String, usize, Instant, u64)],
stats: &HashMap<String, PeerWireStats>,
) -> Option<(u64, f64)> {
ready_jobs
.iter()
.map(|job| {
let peer_stats = stats.get(&job.1).cloned().unwrap_or_default();
let finish = peer_stats.bandwidth_debt
+ (job.2 as f64 / Self::peer_upload_weight(&peer_stats));
(job.0, job.1.as_str(), job.4, finish)
})
.min_by(|left, right| {
left.3
.partial_cmp(&right.3)
.unwrap_or(std::cmp::Ordering::Equal)
.then_with(|| left.2.cmp(&right.2))
.then_with(|| left.1.cmp(right.1))
})
.map(|choice| (choice.0, choice.3))
}
async fn enqueue_response_send(
self: &Arc<Self>,
peer_id: String,
bytes: Vec<u8>,
ready_at: Instant,
) {
let job_id = self.next_response_job_id.fetch_add(1, Ordering::Relaxed);
{
let mut queue = self.pending_response_sends.lock().await;
queue.push(PendingResponseSend {
job_id,
peer_id,
bytes,
ready_at,
queue_sequence: job_id,
});
}
if self
.response_scheduler_running
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
let this = Arc::clone(self);
tokio::spawn(async move {
this.run_response_scheduler().await;
});
}
}
async fn run_response_scheduler(self: Arc<Self>) {
loop {
let snapshot = {
let queue = self.pending_response_sends.lock().await;
if queue.is_empty() {
self.response_scheduler_running
.store(false, Ordering::Release);
return;
}
queue
.iter()
.map(|job| {
(
job.job_id,
job.peer_id.clone(),
job.bytes.len(),
job.ready_at,
job.queue_sequence,
)
})
.collect::<Vec<_>>()
};
let now = Instant::now();
let mut earliest_ready_at: Option<Instant> = None;
let mut ready_jobs = Vec::new();
for job in &snapshot {
if job.3 <= now {
ready_jobs.push(job.clone());
} else {
earliest_ready_at = Some(match earliest_ready_at {
Some(current) => current.min(job.3),
None => job.3,
});
}
}
if ready_jobs.is_empty() {
if let Some(ready_at) = earliest_ready_at {
tokio::time::sleep(ready_at.saturating_duration_since(Instant::now())).await;
continue;
}
self.response_scheduler_running
.store(false, Ordering::Release);
return;
}
let (selected_job_id, selected_finish) = {
let stats = self.peer_wire_stats.read().await;
Self::choose_ready_response_job(&ready_jobs, &stats).expect("ready response job")
};
let selected = {
let mut queue = self.pending_response_sends.lock().await;
let Some(index) = queue.iter().position(|job| job.job_id == selected_job_id) else {
continue;
};
queue.swap_remove(index)
};
let sent = if let Some(channel) = self.signaling.get_channel(&selected.peer_id).await {
channel.send(selected.bytes.clone()).await.is_ok()
} else {
false
};
let queued_peers = {
let queue = self.pending_response_sends.lock().await;
queue
.iter()
.map(|job| job.peer_id.clone())
.collect::<HashSet<_>>()
};
let mut stats = self.peer_wire_stats.write().await;
let entry = stats.entry(selected.peer_id.clone()).or_default();
if sent {
entry.bytes_sent = entry.bytes_sent.saturating_add(selected.bytes.len() as u64);
entry.bandwidth_debt = selected_finish;
}
if queued_peers.is_empty() {
for peer_stats in stats.values_mut() {
peer_stats.bandwidth_debt = 0.0;
}
} else {
let floor = queued_peers
.iter()
.filter_map(|peer_id| stats.get(peer_id).map(|peer| peer.bandwidth_debt))
.fold(f64::INFINITY, f64::min);
if floor.is_finite() && floor > 0.0 {
for peer_id in queued_peers {
if let Some(peer_stats) = stats.get_mut(&peer_id) {
peer_stats.bandwidth_debt =
(peer_stats.bandwidth_debt - floor).max(0.0);
}
}
}
}
}
}
fn deterministic_actor_draw_for(peer_id: &str, hash: &Hash, salt: u64) -> f64 {
let mut hasher = DefaultHasher::new();
peer_id.hash(&mut hasher);
hash.hash(&mut hasher);
salt.hash(&mut hasher);
let v = hasher.finish();
(v as f64) / (u64::MAX as f64)
}
fn deterministic_actor_draw(&self, hash: &Hash, salt: u64) -> f64 {
Self::deterministic_actor_draw_for(self.signaling.peer_id(), hash, salt)
}
fn peer_metadata_pointer_slot_hash() -> Hash {
hashtree_core::sha256(PEER_METADATA_POINTER_SLOT_KEY)
}
fn decode_hash_hex(hash_hex: &str) -> Result<Hash, StoreError> {
let bytes = hex::decode(hash_hex)
.map_err(|e| StoreError::Other(format!("Invalid hash hex: {e}")))?;
if bytes.len() != 32 {
return Err(StoreError::Other(format!(
"Invalid hash length {}, expected 32 bytes",
bytes.len()
)));
}
let mut hash = [0u8; 32];
hash.copy_from_slice(&bytes);
Ok(hash)
}
fn should_drop_response(&self, hash: &Hash) -> bool {
let p = self.response_behavior().drop_response_prob;
if p <= 0.0 {
return false;
}
self.deterministic_actor_draw(hash, 0xD0_D0_D0_D0_D0_D0_D0_D0) < p
}
fn should_corrupt_response(&self, hash: &Hash) -> bool {
let p = self.response_behavior().corrupt_response_prob;
if p <= 0.0 {
return false;
}
self.deterministic_actor_draw(hash, 0xC0_C0_C0_C0_C0_C0_C0_C0) < p
}
fn should_stall_response(&self, hash: &Hash) -> bool {
let p = self.response_behavior().stall_response_prob;
if p <= 0.0 {
return false;
}
self.deterministic_actor_draw(hash, 0x5A_11_5A_11_5A_11_5A_11) < p
}
fn response_send_delay(&self, hash: &Hash, payload_len: usize) -> Duration {
let behavior = self.response_behavior();
let mut total_ms = behavior
.extra_delay_ms
.saturating_add(behavior.first_byte_delay_ms);
if behavior.bytes_per_second > 0 && payload_len > 0 {
let throughput_ms = ((payload_len as u128) * 1000)
.div_ceil(behavior.bytes_per_second as u128)
.min(u64::MAX as u128) as u64;
total_ms = total_ms.saturating_add(throughput_ms);
}
if behavior.stall_delay_ms > 0 && self.should_stall_response(hash) {
total_ms = total_ms.saturating_add(behavior.stall_delay_ms);
}
Duration::from_millis(total_ms)
}
async fn ordered_connected_peers(&self, exclude_peer_id: Option<&str>) -> Vec<String> {
let current_peer_ids = self.signaling.peer_ids().await;
if current_peer_ids.is_empty() {
return Vec::new();
}
sync_selector_peers(&self.peer_selector, ¤t_peer_ids).await;
let hash_get_peer_ids: HashSet<String> = self
.signaling
.hash_get_peer_ids()
.await
.into_iter()
.collect();
let mut candidate_peer_ids: Vec<String> = current_peer_ids
.into_iter()
.filter(|peer_id| hash_get_peer_ids.contains(peer_id))
.filter(|peer_id| exclude_peer_id.is_none_or(|exclude| peer_id != exclude))
.collect();
if candidate_peer_ids.is_empty() {
return Vec::new();
}
let current_set: HashSet<&str> = candidate_peer_ids.iter().map(String::as_str).collect();
let mut selector = self.peer_selector.write().await;
let mut selector_order = selector.select_peers();
selector_order.retain(|peer_id| current_set.contains(peer_id.as_str()));
if selector_order.is_empty() {
let mut fallback = candidate_peer_ids;
fallback.sort();
return fallback;
}
let backed_off: HashMap<String, bool> = candidate_peer_ids
.iter()
.map(|peer_id| (peer_id.clone(), selector.is_peer_backed_off(peer_id)))
.collect();
drop(selector);
let rank: HashMap<&str, usize> = selector_order
.iter()
.enumerate()
.map(|(idx, peer_id)| (peer_id.as_str(), idx))
.collect();
let active = self.peer_active_requests.read().await;
candidate_peer_ids.sort_by(|left, right| {
let left_backed_off = backed_off.get(left).copied().unwrap_or(false);
let right_backed_off = backed_off.get(right).copied().unwrap_or(false);
if left_backed_off != right_backed_off {
return if left_backed_off {
std::cmp::Ordering::Greater
} else {
std::cmp::Ordering::Less
};
}
let left_rank = rank.get(left.as_str()).copied().unwrap_or(usize::MAX / 2);
let right_rank = rank.get(right.as_str()).copied().unwrap_or(usize::MAX / 2);
let left_load = active.get(left).copied().unwrap_or(0);
let right_load = active.get(right).copied().unwrap_or(0);
(left_rank + left_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY))
.cmp(&(right_rank + right_load.saturating_mul(ACTIVE_PEER_REQUEST_RANK_PENALTY)))
.then_with(|| left.cmp(right))
});
candidate_peer_ids
}
async fn reserve_peer_request(&self, peer_id: &str) {
let mut active = self.peer_active_requests.write().await;
*active.entry(peer_id.to_string()).or_insert(0) += 1;
}
async fn release_peer_request(&self, peer_id: &str) {
let mut active = self.peer_active_requests.write().await;
let Some(count) = active.get_mut(peer_id) else {
return;
};
if *count <= 1 {
active.remove(peer_id);
} else {
*count -= 1;
}
}
async fn release_queried_peer_requests(&self, peer_ids: &[String]) {
for peer_id in peer_ids {
self.release_peer_request(peer_id).await;
}
}
fn requested_quote_mint(&self) -> Option<&str> {
if let Some(default_mint) = self.routing.cashu_default_mint.as_deref() {
if self.routing.cashu_accepted_mints.is_empty()
|| self
.routing
.cashu_accepted_mints
.iter()
.any(|mint| mint == default_mint)
{
return Some(default_mint);
}
}
self.routing
.cashu_accepted_mints
.first()
.map(String::as_str)
}
fn choose_quote_mint(&self, requested_mint: Option<&str>) -> Option<String> {
if let Some(requested_mint) = requested_mint {
if self.accepts_quote_mint(Some(requested_mint)) {
return Some(requested_mint.to_string());
}
}
if let Some(default_mint) = self.routing.cashu_default_mint.as_ref() {
return Some(default_mint.clone());
}
if let Some(first_mint) = self.routing.cashu_accepted_mints.first() {
return Some(first_mint.clone());
}
requested_mint.map(str::to_string)
}
fn accepts_quote_mint(&self, mint_url: Option<&str>) -> bool {
if self.routing.cashu_accepted_mints.is_empty() {
return true;
}
let Some(mint_url) = mint_url else {
return false;
};
self.routing
.cashu_accepted_mints
.iter()
.any(|mint| mint == mint_url)
}
fn trusts_quote_mint(&self, mint_url: Option<&str>) -> bool {
let Some(mint_url) = mint_url else {
return self.routing.cashu_default_mint.is_none()
&& self.routing.cashu_accepted_mints.is_empty();
};
self.routing.cashu_default_mint.as_deref() == Some(mint_url)
|| self
.routing
.cashu_accepted_mints
.iter()
.any(|mint| mint == mint_url)
}
async fn peer_suggested_mint_cap_sat(&self, peer_id: &str) -> u64 {
let base = self.routing.cashu_peer_suggested_mint_base_cap_sat;
if base == 0 {
return 0;
}
let selector = self.peer_selector.read().await;
let Some(stats) = selector.get_stats(peer_id) else {
let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
return if max_cap > 0 { base.min(max_cap) } else { base };
};
if stats.cashu_payment_defaults > 0
&& stats.cashu_payment_defaults >= stats.cashu_payment_receipts
{
return 0;
}
let success_bonus = stats
.successes
.saturating_mul(self.routing.cashu_peer_suggested_mint_success_step_sat);
let receipt_bonus = stats
.cashu_payment_receipts
.saturating_mul(self.routing.cashu_peer_suggested_mint_receipt_step_sat);
let mut cap = base
.saturating_add(success_bonus)
.saturating_add(receipt_bonus);
let max_cap = self.routing.cashu_peer_suggested_mint_max_cap_sat;
if max_cap > 0 {
cap = cap.min(max_cap);
}
cap
}
async fn should_accept_quote_response(
&self,
from_peer: &str,
preferred_mint_url: Option<&str>,
offered_payment_sat: u64,
res: &DataQuoteResponse,
) -> bool {
let Some(payment_sat) = res.p else {
return false;
};
if payment_sat > offered_payment_sat {
return false;
}
let response_mint = res.m.as_deref();
if response_mint == preferred_mint_url {
return true;
}
if self.trusts_quote_mint(response_mint) {
return true;
}
if response_mint.is_none() {
return false;
}
payment_sat <= self.peer_suggested_mint_cap_sat(from_peer).await
}
async fn issue_quote(
&self,
peer_id: &str,
hash_key: &str,
payment_sat: u64,
ttl_ms: u32,
mint_url: Option<&str>,
) -> u64 {
let quote_id = {
let mut next = self.next_quote_id.write().await;
let quote_id = *next;
*next = next.saturating_add(1);
quote_id
};
let expires_at = Instant::now() + Duration::from_millis(ttl_ms as u64);
self.issued_quotes.write().await.insert(
(peer_id.to_string(), hash_key.to_string(), quote_id),
IssuedQuote {
expires_at,
payment_sat,
mint_url: mint_url.map(str::to_string),
},
);
quote_id
}
async fn take_valid_quote(&self, peer_id: &str, hash_key: &str, quote_id: u64) -> bool {
let key = (peer_id.to_string(), hash_key.to_string(), quote_id);
let Some(quote) = self.issued_quotes.write().await.remove(&key) else {
return false;
};
quote.expires_at > Instant::now()
}
async fn send_request_to_peer(
&self,
peer_id: &str,
hash: &Hash,
request_htl: u8,
quote_id: Option<u64>,
) -> bool {
if !should_forward_htl(request_htl) {
return false;
}
let channel = match self.signaling.get_channel(peer_id).await {
Some(c) => c,
None => return false,
};
let htl_config = {
let configs = self.htl_configs.read().await;
configs
.get(peer_id)
.cloned()
.unwrap_or_else(PeerHTLConfig::random)
};
let send_htl = htl_config.decrement(request_htl);
let req = match quote_id {
Some(quote_id) => create_request_with_quote(hash, send_htl, quote_id),
None => create_request(hash, send_htl),
};
let request_bytes = encode_request(&req);
let request_len = request_bytes.len() as u64;
{
let mut selector = self.peer_selector.write().await;
selector.record_request(peer_id, request_len);
}
match channel.send(request_bytes).await {
Ok(()) => {
self.record_peer_wire_sent(peer_id, request_len).await;
true
}
Err(_) => {
self.peer_selector.write().await.record_failure(peer_id);
false
}
}
}
async fn send_quote_request_to_peer(
&self,
peer_id: &str,
hash: &Hash,
payment_sat: u64,
ttl_ms: u32,
mint_url: Option<&str>,
) -> bool {
let channel = match self.signaling.get_channel(peer_id).await {
Some(c) => c,
None => return false,
};
let req = create_quote_request(hash, ttl_ms, payment_sat, mint_url);
let request_bytes = encode_quote_request(&req);
let request_len = request_bytes.len() as u64;
match channel.send(request_bytes).await {
Ok(()) => {
self.record_peer_wire_sent(peer_id, request_len).await;
true
}
Err(_) => false,
}
}
pub async fn set_read_sources(&self, sources: Vec<Arc<dyn MeshReadSource>>) {
let mut by_id = HashMap::new();
let mut stats = self.read_source_stats.write().await;
for source in sources {
let source_id = source.id().to_string();
by_id.insert(source_id.clone(), source);
stats
.entry(source_id)
.or_insert_with(AdaptiveSourceStats::default);
}
*self.read_sources.write().await = by_id;
}
async fn record_read_source_request(&self, source_id: &str) {
let mut stats = self.read_source_stats.write().await;
stats
.entry(source_id.to_string())
.or_insert_with(AdaptiveSourceStats::default)
.requests += 1;
}
async fn record_read_source_miss(&self, source_id: &str) {
let mut stats = self.read_source_stats.write().await;
stats
.entry(source_id.to_string())
.or_insert_with(AdaptiveSourceStats::default)
.misses += 1;
}
async fn record_read_source_success(&self, source_id: &str, elapsed_ms: u64) {
let now = Instant::now();
let mut stats = self.read_source_stats.write().await;
let stats = stats
.entry(source_id.to_string())
.or_insert_with(AdaptiveSourceStats::default);
stats.successes += 1;
stats.last_success_at = Some(now);
stats.backoff_level = 0;
stats.backed_off_until = None;
if stats.srtt_ms <= 0.0 {
stats.srtt_ms = elapsed_ms as f64;
stats.rttvar_ms = elapsed_ms as f64 / 2.0;
return;
}
let elapsed = elapsed_ms as f64;
stats.rttvar_ms = 0.75 * stats.rttvar_ms + 0.25 * (stats.srtt_ms - elapsed).abs();
stats.srtt_ms = 0.875 * stats.srtt_ms + 0.125 * elapsed;
}
async fn record_read_source_failure(&self, source_id: &str) {
let now = Instant::now();
let mut stats = self.read_source_stats.write().await;
let stats = stats
.entry(source_id.to_string())
.or_insert_with(AdaptiveSourceStats::default);
stats.failures += 1;
stats.last_failure_at = Some(now);
Self::apply_source_backoff(stats, now);
}
async fn record_read_source_timeout(&self, source_id: &str) {
let now = Instant::now();
let mut stats = self.read_source_stats.write().await;
let stats = stats
.entry(source_id.to_string())
.or_insert_with(AdaptiveSourceStats::default);
stats.timeouts += 1;
stats.last_failure_at = Some(now);
Self::apply_source_backoff(stats, now);
}
fn apply_source_backoff(stats: &mut AdaptiveSourceStats, now: Instant) {
stats.backoff_level = stats.backoff_level.saturating_add(1);
let backoff_ms = (INITIAL_SOURCE_BACKOFF_MS
.saturating_mul(2u64.saturating_pow(stats.backoff_level.saturating_sub(1))))
.min(MAX_SOURCE_BACKOFF_MS);
stats.backed_off_until = Some(now + Duration::from_millis(backoff_ms));
}
async fn ordered_read_sources(&self) -> Vec<Arc<dyn MeshReadSource>> {
let sources = self.read_sources.read().await;
if sources.is_empty() {
return Vec::new();
}
let mut available: Vec<Arc<dyn MeshReadSource>> = sources
.values()
.filter(|source| source.is_available())
.cloned()
.collect();
if available.is_empty() {
return Vec::new();
}
let now = Instant::now();
let stats = self.read_source_stats.read().await;
let mut healthy: Vec<Arc<dyn MeshReadSource>> = available
.iter()
.filter(|source| {
stats
.get(source.id())
.and_then(|s| s.backed_off_until)
.is_none_or(|until| until <= now)
})
.cloned()
.collect();
if !healthy.is_empty() {
available = std::mem::take(&mut healthy);
}
available.sort_by(|left, right| {
let left_stats = stats.get(left.id()).cloned().unwrap_or_default();
let right_stats = stats.get(right.id()).cloned().unwrap_or_default();
adaptive_source_score(&right_stats, now)
.partial_cmp(&adaptive_source_score(&left_stats, now))
.unwrap_or(std::cmp::Ordering::Equal)
.then_with(|| left.id().cmp(right.id()))
});
available
}
async fn should_probe_multiple_read_sources(
&self,
ordered_sources: &[Arc<dyn MeshReadSource>],
) -> bool {
if ordered_sources.len() <= 1 {
return false;
}
let stats = self.read_source_stats.read().await;
let best = stats
.get(ordered_sources[0].id())
.cloned()
.unwrap_or_default();
let second = stats
.get(ordered_sources[1].id())
.cloned()
.unwrap_or_default();
if !source_has_history(&best) || !source_has_history(&second) {
return false;
}
let now = Instant::now();
adaptive_source_score(&best, now) - adaptive_source_score(&second, now)
< SOURCE_SCORE_TIE_DELTA
}
async fn source_dispatch_for(&self, source_count: usize) -> RequestDispatchConfig {
if source_count == 0 {
return self.routing.dispatch;
}
let ordered_sources = self.ordered_read_sources().await;
let probe_multiple = self
.should_probe_multiple_read_sources(&ordered_sources)
.await;
let initial_fanout = if probe_multiple {
source_count.min(2)
} else {
1
};
RequestDispatchConfig {
initial_fanout,
hedge_fanout: self.routing.dispatch.hedge_fanout,
max_fanout: self.routing.dispatch.max_fanout.min(source_count),
hedge_interval_ms: self.routing.dispatch.hedge_interval_ms,
}
}
pub async fn peer_count(&self) -> usize {
self.signaling.peer_count().await
}
pub async fn needs_peers(&self) -> bool {
self.signaling.needs_peers().await
}
pub async fn send_hello(&self) -> Result<(), TransportError> {
self.signaling.send_hello(vec![]).await
}
pub async fn drain_available_data_messages(self: &Arc<Self>) -> DataPumpStats {
let mut stats = DataPumpStats::default();
let peer_ids = self.signaling.peer_ids().await;
for peer_id in peer_ids {
let Some(channel) = self.signaling.get_channel(&peer_id).await else {
continue;
};
while let Some(data) = channel.try_recv() {
stats.processed += 1;
stats.processed_bytes += data.len() as u64;
if let Some(msg) = parse_message(&data) {
match msg {
DataMessage::Request(_) => stats.request_messages += 1,
DataMessage::Response(_) => stats.response_messages += 1,
DataMessage::QuoteRequest(_) => stats.quote_request_messages += 1,
DataMessage::QuoteResponse(_) => stats.quote_response_messages += 1,
DataMessage::Payment(_)
| DataMessage::PaymentAck(_)
| DataMessage::Chunk(_)
| DataMessage::PeerHints(_) => {}
}
}
self.handle_data_message(&peer_id, &data).await;
}
}
stats
}
pub async fn record_cashu_payment_for_peer(&self, peer_id: &str, amount_sat: u64) {
self.peer_selector
.write()
.await
.record_cashu_payment(peer_id, amount_sat);
}
pub async fn record_cashu_receipt_from_peer(&self, peer_id: &str, amount_sat: u64) {
self.peer_selector
.write()
.await
.record_cashu_receipt(peer_id, amount_sat);
}
pub async fn record_cashu_payment_default_from_peer(&self, peer_id: &str) {
self.peer_selector
.write()
.await
.record_cashu_payment_default(peer_id);
}
pub async fn selector_summary(&self) -> crate::peer_selector::SelectorSummary {
self.peer_selector.read().await.summary()
}
fn should_refuse_requests_from_peer(&self, selector: &PeerSelector, peer_id: &str) -> bool {
selector.is_peer_blocked_for_payment_defaults(
peer_id,
self.routing.cashu_payment_default_block_threshold,
)
}
pub async fn peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
self.peer_selector
.read()
.await
.export_peer_metadata_snapshot()
}
pub async fn persist_peer_metadata(&self) -> Result<Hash, StoreError> {
let snapshot = self
.peer_selector
.read()
.await
.export_peer_metadata_snapshot();
let bytes = serde_json::to_vec(&snapshot).map_err(|e| {
StoreError::Other(format!("Failed to encode peer metadata snapshot: {e}"))
})?;
let snapshot_hash = hashtree_core::sha256(&bytes);
let _ = self.local_store.put(snapshot_hash, bytes).await?;
let pointer_slot = Self::peer_metadata_pointer_slot_hash();
let pointer_bytes = hex::encode(snapshot_hash).into_bytes();
let _ = self.local_store.delete(&pointer_slot).await?;
let _ = self.local_store.put(pointer_slot, pointer_bytes).await?;
Ok(snapshot_hash)
}
pub async fn load_peer_metadata(&self) -> Result<bool, StoreError> {
let pointer_slot = Self::peer_metadata_pointer_slot_hash();
let Some(pointer_bytes) = self.local_store.get(&pointer_slot).await? else {
return Ok(false);
};
let pointer_hex = std::str::from_utf8(&pointer_bytes).map_err(|e| {
StoreError::Other(format!("Peer metadata pointer is not valid UTF-8: {e}"))
})?;
let snapshot_hash = Self::decode_hash_hex(pointer_hex.trim())?;
let Some(snapshot_bytes) = self.local_store.get(&snapshot_hash).await? else {
return Ok(false);
};
let snapshot: PeerMetadataSnapshot =
serde_json::from_slice(&snapshot_bytes).map_err(|e| {
StoreError::Other(format!("Failed to decode peer metadata snapshot: {e}"))
})?;
self.peer_selector
.write()
.await
.import_peer_metadata_snapshot(&snapshot);
Ok(true)
}
pub async fn get_with_quote(
&self,
hash: &Hash,
payment_sat: u64,
quote_ttl: Duration,
) -> Result<Option<Vec<u8>>, StoreError> {
if let Some(data) = self.local_store.get(hash).await? {
return Ok(Some(data));
}
Ok(self
.request_from_peers_with_quote(hash, payment_sat, quote_ttl)
.await)
}
async fn request_from_peers_with_quote(
&self,
hash: &Hash,
payment_sat: u64,
quote_ttl: Duration,
) -> Option<Vec<u8>> {
let ordered_peer_ids = self.ordered_connected_peers(None).await;
if ordered_peer_ids.is_empty() {
return None;
}
if let Some(quote) = self
.request_quote_from_peers(hash, payment_sat, quote_ttl, &ordered_peer_ids)
.await
{
if let Some(data) = self
.request_from_single_peer(hash, "e.peer_id, MAX_HTL, Some(quote.quote_id))
.await
{
return Some(data);
}
}
self.request_from_mesh(hash).await
}
async fn request_quote_from_peers(
&self,
hash: &Hash,
payment_sat: u64,
quote_ttl: Duration,
ordered_peer_ids: &[String],
) -> Option<NegotiatedQuote> {
if ordered_peer_ids.is_empty() {
return None;
}
let ttl_ms = quote_ttl.as_millis().min(u32::MAX as u128) as u32;
if ttl_ms == 0 {
return None;
}
let requested_mint = self.requested_quote_mint().map(str::to_string);
let hash_key = hash_to_key(hash);
let (tx, rx) = oneshot::channel();
self.pending_quotes.write().await.insert(
hash_key.clone(),
PendingQuoteRequest {
response_tx: tx,
preferred_mint_url: requested_mint.clone(),
offered_payment_sat: payment_sat,
},
);
let rx = Arc::new(Mutex::new(rx));
let result = run_hedged_waves(
ordered_peer_ids.len(),
self.routing.dispatch,
self.request_timeout,
|range| {
let wave_peer_ids = ordered_peer_ids[range].to_vec();
let requested_mint = requested_mint.clone();
let hash = *hash;
async move {
let mut sent = 0usize;
for peer_id in wave_peer_ids {
if self
.send_quote_request_to_peer(
&peer_id,
&hash,
payment_sat,
ttl_ms,
requested_mint.as_deref(),
)
.await
{
sent += 1;
}
}
sent
}
},
|wait| {
let rx = rx.clone();
async move {
let mut rx = rx.lock().await;
match tokio::time::timeout(wait, &mut *rx).await {
Ok(Ok(Some(quote))) => HedgedWaveAction::Success(quote),
Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
Err(_) => HedgedWaveAction::Continue,
}
}
},
)
.await;
let _ = self.pending_quotes.write().await.remove(&hash_key);
result
}
async fn request_from_single_peer(
&self,
hash: &Hash,
peer_id: &str,
request_htl: u8,
quote_id: Option<u64>,
) -> Option<Vec<u8>> {
let hash_key = hash_to_key(hash);
let (tx, rx) = oneshot::channel();
self.pending_requests.write().await.insert(
hash_key.clone(),
PendingRequest {
response_tx: tx,
started_at: Instant::now(),
queried_peers: vec![peer_id.to_string()],
},
);
let mut rx = rx;
if !self
.send_request_to_peer(peer_id, hash, request_htl, quote_id)
.await
{
let _ = self.pending_requests.write().await.remove(&hash_key);
return None;
}
self.reserve_peer_request(peer_id).await;
if let Ok(Ok(Some(data))) = tokio::time::timeout(self.request_timeout, &mut rx).await {
if hashtree_core::sha256(&data) == *hash {
let _ = self.local_store.put(*hash, data.clone()).await;
return Some(data);
}
}
if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
self.release_queried_peer_requests(&pending.queried_peers)
.await;
for peer_id in pending.queried_peers {
self.peer_selector.write().await.record_timeout(&peer_id);
}
}
let _ = self.take_forward_requesters(&hash_key).await;
None
}
async fn request_from_ordered_peers(
&self,
hash: &Hash,
ordered_peer_ids: &[String],
request_htl: u8,
) -> RouteFetchOutcome {
let hash_key = hash_to_key(hash);
let (tx, rx) = oneshot::channel();
self.pending_requests.write().await.insert(
hash_key.clone(),
PendingRequest {
response_tx: tx,
started_at: Instant::now(),
queried_peers: Vec::new(),
},
);
let rx = Arc::new(Mutex::new(rx));
let result = run_hedged_waves(
ordered_peer_ids.len(),
self.routing.dispatch,
self.request_timeout,
|range| {
let wave_peer_ids = ordered_peer_ids[range].to_vec();
let hash = *hash;
let hash_key = hash_key.clone();
async move {
let mut sent = 0usize;
for peer_id in wave_peer_ids {
if self
.send_request_to_peer(&peer_id, &hash, request_htl, None)
.await
{
sent += 1;
self.reserve_peer_request(&peer_id).await;
if let Some(pending) =
self.pending_requests.write().await.get_mut(&hash_key)
{
pending.queried_peers.push(peer_id);
}
}
}
sent
}
},
|wait| {
let rx = rx.clone();
async move {
let mut rx = rx.lock().await;
match tokio::time::timeout(wait, &mut *rx).await {
Ok(Ok(Some(data))) if hashtree_core::sha256(&data) == *hash => {
HedgedWaveAction::Success(data)
}
Ok(Ok(Some(_))) => HedgedWaveAction::Continue,
Ok(Ok(None)) | Ok(Err(_)) => HedgedWaveAction::Abort,
Err(_) => HedgedWaveAction::Continue,
}
}
},
)
.await;
let Some(data) = result else {
if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
self.release_queried_peer_requests(&pending.queried_peers)
.await;
for peer_id in pending.queried_peers {
self.peer_selector.write().await.record_timeout(&peer_id);
}
}
let _ = self.take_forward_requesters(&hash_key).await;
return RouteFetchOutcome::Timeout;
};
let _ = self.local_store.put(*hash, data.clone()).await;
RouteFetchOutcome::Hit(data)
}
async fn request_from_read_sources_inner(&self, hash: &Hash) -> RouteFetchOutcome {
let ordered_sources = self.ordered_read_sources().await;
if ordered_sources.is_empty() {
return RouteFetchOutcome::Miss;
}
let dispatch = normalize_dispatch_config(
self.source_dispatch_for(ordered_sources.len()).await,
ordered_sources.len(),
);
let wave_plan = build_hedged_wave_plan(ordered_sources.len(), dispatch);
if wave_plan.is_empty() {
return RouteFetchOutcome::Miss;
}
let deadline = Instant::now() + self.request_timeout;
let mut pending = FuturesUnordered::new();
let mut pending_source_ids = HashSet::new();
let mut saw_timeout = false;
let mut next_source_idx = 0usize;
for (wave_idx, wave_size) in wave_plan.iter().copied().enumerate() {
let from = next_source_idx;
let to = (next_source_idx + wave_size).min(ordered_sources.len());
next_source_idx = to;
for source in ordered_sources[from..to].iter().cloned() {
let source_id = source.id().to_string();
self.record_read_source_request(&source_id).await;
pending_source_ids.insert(source_id.clone());
let hash = *hash;
pending.push(tokio::spawn(async move {
let started_at = Instant::now();
let result = std::panic::AssertUnwindSafe(source.get(&hash))
.catch_unwind()
.await;
match result {
Ok(Some(data)) => SourceFetchOutcome::Hit {
source_id,
data,
elapsed_ms: started_at.elapsed().as_millis().max(1) as u64,
},
Ok(None) => SourceFetchOutcome::Miss { source_id },
Err(_) => SourceFetchOutcome::Failure { source_id },
}
}));
}
let is_last_wave =
wave_idx + 1 == wave_plan.len() || next_source_idx >= ordered_sources.len();
let window_end = if is_last_wave {
deadline
} else {
(Instant::now() + Duration::from_millis(dispatch.hedge_interval_ms)).min(deadline)
};
while Instant::now() < window_end {
let remaining = window_end.saturating_duration_since(Instant::now());
let Some(result) = tokio::time::timeout(remaining, pending.next())
.await
.ok()
.flatten()
else {
break;
};
let Ok(outcome) = result else {
continue;
};
match outcome {
SourceFetchOutcome::Hit {
source_id,
data,
elapsed_ms,
} => {
pending_source_ids.remove(&source_id);
self.record_read_source_success(&source_id, elapsed_ms)
.await;
return RouteFetchOutcome::Hit(data);
}
SourceFetchOutcome::Miss { source_id } => {
pending_source_ids.remove(&source_id);
self.record_read_source_miss(&source_id).await;
}
SourceFetchOutcome::Failure { source_id } => {
pending_source_ids.remove(&source_id);
self.record_read_source_failure(&source_id).await;
}
}
}
if Instant::now() >= deadline {
break;
}
}
for source_id in pending_source_ids {
saw_timeout = true;
self.record_read_source_timeout(&source_id).await;
}
if saw_timeout {
RouteFetchOutcome::Timeout
} else {
RouteFetchOutcome::Miss
}
}
async fn request_from_read_sources(&self, hash: &Hash) -> RouteFetchOutcome {
let hash_key = hash_to_key(hash);
let existing_wait = {
let mut inflight = self.inflight_source_fetches.lock().await;
if let Some(existing) = inflight.get_mut(&hash_key) {
let (tx, rx) = oneshot::channel();
existing.waiters.push(tx);
Some(rx)
} else {
inflight.insert(
hash_key.clone(),
InflightSourceFetch {
waiters: Vec::new(),
},
);
None
}
};
if let Some(wait) = existing_wait {
return wait.await.unwrap_or(RouteFetchOutcome::Timeout);
}
let result = self.request_from_read_sources_inner(hash).await;
if let RouteFetchOutcome::Hit(hit) = &result {
let _ = self.local_store.put(*hash, hit.clone()).await;
}
self.complete_inflight_source_fetch(&hash_key, result.clone())
.await;
result
}
async fn complete_inflight_source_fetch(&self, hash_key: &str, result: RouteFetchOutcome) {
let waiters = self
.inflight_source_fetches
.lock()
.await
.remove(hash_key)
.map(|inflight| inflight.waiters)
.unwrap_or_default();
for waiter in waiters {
let _ = waiter.send(result.clone());
}
}
async fn cancel_pending_peer_route(&self, hash: &Hash) {
let hash_key = hash_to_key(hash);
if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
self.release_queried_peer_requests(&pending.queried_peers)
.await;
}
}
async fn cancel_losing_route(&self, hash: &Hash, route: &ReadRoute, winner_data: &[u8]) {
match route {
ReadRoute::Peers(_) => self.cancel_pending_peer_route(hash).await,
ReadRoute::Sources => {
let hash_key = hash_to_key(hash);
self.complete_inflight_source_fetch(
&hash_key,
RouteFetchOutcome::Hit(winner_data.to_vec()),
)
.await;
}
}
}
async fn ranked_read_routes(&self, context: &MeshReadContext) -> Vec<RankedReadRoute> {
let mut routes = Vec::new();
let ordered_peers = if should_forward_htl(context.request_htl) {
self.ordered_connected_peers(context.exclude_peer_id.as_deref())
.await
} else {
Vec::new()
};
if !ordered_peers.is_empty() {
let best_peer_id = ordered_peers[0].clone();
let selector = self.peer_selector.read().await;
let best_peer = selector.get_stats(&best_peer_id).cloned();
let now = Instant::now();
let (score, has_history) = match best_peer.as_ref() {
Some(stats) => (
peer_endpoint_score(stats, now),
peer_endpoint_has_history(stats),
),
None => (0.0, false),
};
routes.push(RankedReadRoute {
route: ReadRoute::Peers(ordered_peers),
best_endpoint_id: format!("peer:{best_peer_id}"),
score,
has_history,
});
}
let ordered_sources = self.ordered_read_sources().await;
if let Some(best_source) = ordered_sources.first() {
let stats = self.read_source_stats.read().await;
let best_source_stats = stats.get(best_source.id()).cloned().unwrap_or_default();
let now = Instant::now();
routes.push(RankedReadRoute {
route: ReadRoute::Sources,
best_endpoint_id: format!("source:{}", best_source.id()),
score: adaptive_source_score(&best_source_stats, now),
has_history: source_has_history(&best_source_stats),
});
}
if routes.len() <= 1 {
return routes;
}
routes.sort_by(|left, right| {
right
.score
.partial_cmp(&left.score)
.unwrap_or(std::cmp::Ordering::Equal)
.then_with(|| ranked_route_kind(&left.route).cmp(&ranked_route_kind(&right.route)))
.then_with(|| left.best_endpoint_id.cmp(&right.best_endpoint_id))
.then_with(|| left.route.id().cmp(right.route.id()))
});
routes
}
fn should_probe_multiple_routes(&self, routes: &[RankedReadRoute]) -> bool {
if routes.len() <= 1 {
return false;
}
if !routes[0].has_history || !routes[1].has_history {
return false;
}
(routes[0].score - routes[1].score) < SOURCE_SCORE_TIE_DELTA
}
async fn run_read_route(
&self,
hash: &Hash,
route: &ReadRoute,
context: &MeshReadContext,
) -> RouteFetchOutcome {
match route {
ReadRoute::Peers(peer_ids) => {
self.request_from_ordered_peers(hash, peer_ids, context.request_htl)
.await
}
ReadRoute::Sources => self.request_from_read_sources(hash).await,
}
}
async fn request_from_mesh_with_context(
&self,
hash: &Hash,
context: &MeshReadContext,
) -> Option<Vec<u8>> {
let routes = self.ranked_read_routes(context).await;
match routes.as_slice() {
[] => None,
[ranked] => match self.run_read_route(hash, &ranked.route, context).await {
RouteFetchOutcome::Hit(data) => Some(data),
RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
},
[first, second, ..] => {
if self.should_probe_multiple_routes(&routes) {
let first_fut = self.run_read_route(hash, &first.route, context);
let second_fut = self.run_read_route(hash, &second.route, context);
tokio::pin!(first_fut);
tokio::pin!(second_fut);
let mut first_done = false;
let mut second_done = false;
loop {
tokio::select! {
result = &mut first_fut, if !first_done => {
first_done = true;
if let RouteFetchOutcome::Hit(data) = result {
if !second_done {
self.cancel_losing_route(hash, &second.route, &data).await;
}
return Some(data);
}
}
result = &mut second_fut, if !second_done => {
second_done = true;
if let RouteFetchOutcome::Hit(data) = result {
if !first_done {
self.cancel_losing_route(hash, &first.route, &data).await;
}
return Some(data);
}
}
else => break,
}
if first_done && second_done {
break;
}
}
None
} else {
match self.run_read_route(hash, &first.route, context).await {
RouteFetchOutcome::Hit(data) => return Some(data),
RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
}
for ranked in routes.iter().skip(1) {
match self.run_read_route(hash, &ranked.route, context).await {
RouteFetchOutcome::Hit(data) => return Some(data),
RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => {}
}
}
None
}
}
}
}
async fn request_from_mesh(&self, hash: &Hash) -> Option<Vec<u8>> {
self.request_from_mesh_with_context(hash, &MeshReadContext::default())
.await
}
async fn begin_forward_request(&self, hash_key: &str, requester_id: &str) -> bool {
let mut pending = self.pending_forward_requests.write().await;
if let Some(existing) = pending.get_mut(hash_key) {
existing.requester_ids.insert(requester_id.to_string());
return false;
}
let mut requester_ids = HashSet::new();
requester_ids.insert(requester_id.to_string());
pending.insert(
hash_key.to_string(),
PendingForwardRequest { requester_ids },
);
true
}
async fn was_recent_forward_miss(&self, hash_key: &str) -> bool {
self.recent_forward_misses.lock().await.contains(hash_key)
}
async fn mark_recent_forward_miss(&self, hash_key: &str) {
let _ = self
.recent_forward_misses
.lock()
.await
.insert_if_new(hash_key.to_string());
}
async fn take_forward_requesters(&self, hash_key: &str) -> Vec<String> {
self.pending_forward_requests
.write()
.await
.remove(hash_key)
.map(|pending| pending.requester_ids.into_iter().collect())
.unwrap_or_default()
}
async fn complete_pending_response(
self: &Arc<Self>,
from_peer: &str,
hash: &Hash,
hash_key: String,
payload: Vec<u8>,
) {
if let Some(pending) = self.pending_requests.write().await.remove(&hash_key) {
self.release_queried_peer_requests(&pending.queried_peers)
.await;
let rtt_ms = pending.started_at.elapsed().as_millis() as u64;
self.peer_selector.write().await.record_success(
from_peer,
rtt_ms,
payload.len() as u64,
);
let forward_requesters = self.take_forward_requesters(&hash_key).await;
let response_bytes = if forward_requesters.is_empty() {
None
} else {
Some(encode_response(&create_response(hash, payload.clone())))
};
let _ = pending.response_tx.send(Some(payload));
if let Some(response_bytes) = response_bytes {
for requester_id in forward_requesters {
Arc::clone(&self)
.enqueue_response_send(requester_id, response_bytes.clone(), Instant::now())
.await;
}
}
}
}
async fn handle_quote_response_message(&self, from_peer: &str, res: DataQuoteResponse) {
if !res.a {
return;
}
let Some(quote_id) = res.q else {
return;
};
let hash_key = hash_to_key(&res.h);
let (preferred_mint_url, offered_payment_sat) = {
let pending_quotes = self.pending_quotes.read().await;
let Some(pending) = pending_quotes.get(&hash_key) else {
return;
};
(
pending.preferred_mint_url.clone(),
pending.offered_payment_sat,
)
};
if !self
.should_accept_quote_response(
from_peer,
preferred_mint_url.as_deref(),
offered_payment_sat,
&res,
)
.await
{
return;
}
let mut pending_quotes = self.pending_quotes.write().await;
if let Some(pending) = pending_quotes.remove(&hash_key) {
let _ = pending.response_tx.send(Some(NegotiatedQuote {
peer_id: from_peer.to_string(),
quote_id,
mint_url: res.m,
}));
}
}
async fn handle_response_message(
self: &Arc<Self>,
from_peer: &str,
res: crate::protocol::DataResponse,
) {
let hash_key = hash_to_key(&res.h);
let hash = match crate::protocol::bytes_to_hash(&res.h) {
Some(h) => h,
None => return,
};
if hashtree_core::sha256(&res.d) != hash {
self.peer_selector.write().await.record_failure(from_peer);
if self.debug {
println!("[MeshStoreCore] Ignoring invalid response payload for {hash_key}");
}
return;
}
self.complete_pending_response(from_peer, &hash, hash_key, res.d)
.await;
}
async fn handle_quote_request_message(&self, from_peer: &str, req: DataQuoteRequest) {
let hash = match crate::protocol::bytes_to_hash(&req.h) {
Some(h) => h,
None => return,
};
let hash_key = hash_to_key(&hash);
{
let selector = self.peer_selector.read().await;
if self.should_refuse_requests_from_peer(&selector, from_peer) {
if self.debug {
println!(
"[MeshStoreCore] Refusing quote request from delinquent peer {}",
from_peer
);
}
return;
}
}
let chosen_mint = self.choose_quote_mint(req.m.as_deref());
let can_serve = self.local_store.has(&hash).await.ok().unwrap_or(false)
&& !self.should_drop_response(&hash)
&& !self.should_corrupt_response(&hash);
let res = if can_serve {
let quote_id = self
.issue_quote(from_peer, &hash_key, req.p, req.t, chosen_mint.as_deref())
.await;
create_quote_response_available(&hash, quote_id, req.p, req.t, chosen_mint.as_deref())
} else {
create_quote_response_unavailable(&hash)
};
let response_bytes = encode_quote_response(&res);
if let Some(channel) = self.signaling.get_channel(from_peer).await {
if channel.send(response_bytes.clone()).await.is_ok() {
self.record_peer_wire_sent(from_peer, response_bytes.len() as u64)
.await;
}
}
}
async fn handle_request_message(
self: &Arc<Self>,
from_peer: &str,
req: crate::protocol::DataRequest,
) {
let hash = match crate::protocol::bytes_to_hash(&req.h) {
Some(h) => h,
None => return,
};
let hash_key = hash_to_key(&hash);
if let Some(quote_id) = req.q {
if !self.take_valid_quote(from_peer, &hash_key, quote_id).await {
if self.debug {
println!(
"[MeshStoreCore] Refusing request with invalid or expired quote {} from {}",
quote_id, from_peer
);
}
return;
}
}
let allow_peer_forwarding = {
let selector = self.peer_selector.read().await;
!self.should_refuse_requests_from_peer(&selector, from_peer)
};
if let Ok(Some(mut data)) = self.local_store.get(&hash).await {
if self.should_drop_response(&hash) {
if self.debug {
println!(
"[MeshStoreCore] Dropping response for {} due to actor profile",
hash_to_key(&hash)
);
}
return;
}
let response_delay = self.response_send_delay(&hash, data.len());
if self.should_corrupt_response(&hash) {
if data.is_empty() {
data.push(0x80);
} else {
data[0] ^= 0x80;
}
}
let res = create_response(&hash, data);
let response_bytes = encode_response(&res);
let ready_at = Instant::now() + response_delay;
Arc::clone(self)
.enqueue_response_send(from_peer.to_string(), response_bytes, ready_at)
.await;
return;
}
if self.pending_requests.read().await.contains_key(&hash_key) {
let _ = self.begin_forward_request(&hash_key, from_peer).await;
return;
}
if self.was_recent_forward_miss(&hash_key).await {
if self.debug {
println!(
"[MeshStoreCore] Suppressing recently missed forwarded request for {}",
hash_key
);
}
return;
}
if !self.begin_forward_request(&hash_key, from_peer).await {
return;
}
let from_peer = from_peer.to_string();
let this = Arc::clone(self);
let request_htl = req.htl;
tokio::spawn(async move {
let result = if allow_peer_forwarding {
let context = MeshReadContext {
exclude_peer_id: Some(from_peer.clone()),
request_htl,
};
this.request_from_mesh_with_context(&hash, &context).await
} else {
if this.debug {
println!(
"[MeshStoreCore] Serving request from delinquent peer {} via read sources only",
from_peer
);
}
match this.request_from_read_sources(&hash).await {
RouteFetchOutcome::Hit(data) => Some(data),
RouteFetchOutcome::Miss | RouteFetchOutcome::Timeout => None,
}
};
let requester_ids = this.take_forward_requesters(&hash_key).await;
if let Some(data) = result {
let ready_at = Instant::now() + this.response_send_delay(&hash, data.len());
let res = create_response(&hash, data);
let response_bytes = encode_response(&res);
for requester_id in requester_ids {
Arc::clone(&this)
.enqueue_response_send(requester_id, response_bytes.clone(), ready_at)
.await;
}
} else {
this.mark_recent_forward_miss(&hash_key).await;
}
});
}
pub async fn handle_data_message(self: &Arc<Self>, from_peer: &str, data: &[u8]) {
self.record_peer_wire_received(from_peer, data.len() as u64)
.await;
let parsed = match parse_message(data) {
Some(m) => m,
None => return,
};
match parsed {
DataMessage::Request(req) => {
self.handle_request_message(from_peer, req).await;
}
DataMessage::Response(res) => {
self.handle_response_message(from_peer, res).await;
}
DataMessage::QuoteRequest(req) => {
self.handle_quote_request_message(from_peer, req).await;
}
DataMessage::QuoteResponse(res) => {
self.handle_quote_response_message(from_peer, res).await;
}
DataMessage::Payment(_)
| DataMessage::PaymentAck(_)
| DataMessage::Chunk(_)
| DataMessage::PeerHints(_) => {}
}
}
}
#[async_trait]
impl<S, R, F> Store for MeshStoreCore<S, R, F>
where
S: Store + Send + Sync + 'static,
R: SignalingTransport + Send + Sync + 'static,
F: PeerLinkFactory + Send + Sync + 'static,
{
async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
self.local_store.put(hash, data).await
}
async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
if let Some(data) = self.local_store.get(hash).await? {
return Ok(Some(data));
}
Ok(self.request_from_mesh(hash).await)
}
async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
self.local_store.has(hash).await
}
async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
self.local_store.delete(hash).await
}
}
#[cfg(test)]
mod tests;
pub type SimMeshStore<S> =
MeshStoreCore<S, crate::mock::MockRelayTransport, crate::mock::MockConnectionFactory>;
pub type ProductionMeshStore<S> =
MeshStoreCore<S, crate::nostr::NostrRelayTransport, crate::real_factory::WebRtcPeerLinkFactory>;