#![allow(clippy::significant_drop_tightening)]
pub mod admission;
pub mod audit;
pub mod bootstrap;
pub mod commitment;
pub mod commitment_state;
pub mod config;
pub mod fresh;
pub mod neighbor_sync;
pub mod paid_list;
pub mod possession;
pub mod protocol;
pub mod pruning;
pub mod quorum;
pub mod recent_provers;
pub mod scheduling;
pub mod storage_commitment_audit;
pub mod subtree;
pub mod types;
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::pin::Pin;
use crate::logging::{debug, error, info, warn};
use futures::stream::FuturesUnordered;
use futures::{Future, StreamExt};
use rand::Rng;
use tokio::sync::{mpsc, Notify, RwLock, Semaphore};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::ant_protocol::XorName;
use crate::error::{Error, Result};
use crate::payment::{PaymentVerifier, VerificationContext};
use crate::replication::audit::AuditTickResult;
use crate::replication::commitment::{commitment_hash, StorageCommitment};
use crate::replication::commitment_state::{PeerCommitmentRecord, ResponderCommitmentState};
use crate::replication::config::{
max_parallel_fetch, storage_admission_width, ReplicationConfig, MAX_AUDIT_RESPONSES_PER_PEER,
MAX_CONCURRENT_AUDIT_RESPONSES, MAX_CONCURRENT_REPLICATION_SENDS, REPLICATION_PROTOCOL_ID,
};
use crate::replication::paid_list::PaidList;
use crate::replication::protocol::{
FreshReplicationResponse, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
VerificationResponse,
};
use crate::replication::quorum::KeyVerificationOutcome;
use crate::replication::recent_provers::RecentProvers;
use crate::replication::scheduling::ReplicationQueues;
use crate::replication::types::{
AuditFailureReason, BootstrapClaimObservation, BootstrapState, FailureEvidence, HintPipeline,
NeighborSyncState, PeerSyncRecord, RepairProofs, VerificationEntry, VerificationState,
};
use crate::storage::LmdbStorage;
use saorsa_core::identity::{NodeIdentity, PeerId};
use saorsa_core::{DhtNetworkEvent, P2PEvent, P2PNode, TrustEvent};
const RR_PREFIX: &str = "/rr/";
fn fresh_offer_payment_context() -> VerificationContext {
VerificationContext::ClientPut
}
fn paid_notify_payment_context() -> VerificationContext {
VerificationContext::PaidListAdmission
}
type FetchFuture = Pin<Box<dyn Future<Output = (XorName, Option<FetchOutcome>)> + Send>>;
struct VerificationCycleContext<'a> {
p2p_node: &'a Arc<P2PNode>,
paid_list: &'a Arc<PaidList>,
storage: &'a Arc<LmdbStorage>,
queues: &'a Arc<RwLock<ReplicationQueues>>,
config: &'a ReplicationConfig,
bootstrap_state: &'a Arc<RwLock<BootstrapState>>,
is_bootstrapping: &'a Arc<RwLock<bool>>,
bootstrap_complete_notify: &'a Arc<Notify>,
last_commitment_by_peer: &'a Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
ever_capable_peers: &'a Arc<RwLock<HashSet<PeerId>>>,
recent_provers: &'a Arc<RwLock<RecentProvers>>,
}
const FETCH_WORKER_POLL_MS: u64 = 100;
const VERIFICATION_WORKER_POLL_MS: u64 = 250;
const VERIFICATION_CYCLE_SLOW_LOG_MS: u128 = 500;
const REPLICATION_TRUST_WEIGHT: f64 = 1.0;
const BOOTSTRAP_DRAIN_CHECK_SECS: u64 = 5;
const COMMITMENT_ROTATION_INTERVAL_SECS: u64 = 3600;
const COMMITMENT_SIG_VERIFY_MIN_INTERVAL: Duration = Duration::from_secs(60);
const MAX_LAST_COMMITMENT_BY_PEER: usize = 4096;
const MAX_EVER_CAPABLE_PEERS: usize = 4 * MAX_LAST_COMMITMENT_BY_PEER;
pub struct ReplicationEngine {
config: Arc<ReplicationConfig>,
p2p_node: Arc<P2PNode>,
storage: Arc<LmdbStorage>,
paid_list: Arc<PaidList>,
payment_verifier: Arc<PaymentVerifier>,
queues: Arc<RwLock<ReplicationQueues>>,
sync_state: Arc<RwLock<NeighborSyncState>>,
sync_history: Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
audit_on_gossip_cooldown: Arc<RwLock<HashMap<PeerId, Instant>>>,
sync_cycle_epoch: Arc<RwLock<u64>>,
repair_proofs: Arc<RwLock<RepairProofs>>,
bootstrap_state: Arc<RwLock<BootstrapState>>,
is_bootstrapping: Arc<RwLock<bool>>,
sync_trigger: Arc<Notify>,
bootstrap_complete_notify: Arc<Notify>,
identity: Arc<NodeIdentity>,
commitment_state: Arc<ResponderCommitmentState>,
last_commitment_by_peer: Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
ever_capable_peers: Arc<RwLock<HashSet<PeerId>>>,
recent_provers: Arc<RwLock<RecentProvers>>,
sig_verify_attempts: Arc<RwLock<HashMap<PeerId, Instant>>>,
send_semaphore: Arc<Semaphore>,
audit_responder_semaphore: Arc<Semaphore>,
audit_responder_inflight: Arc<RwLock<HashMap<PeerId, u32>>>,
fresh_write_rx: Option<mpsc::UnboundedReceiver<fresh::FreshWriteEvent>>,
possession_check_tx: mpsc::UnboundedSender<possession::PossessionCheckEvent>,
possession_check_rx: Option<mpsc::UnboundedReceiver<possession::PossessionCheckEvent>>,
shutdown: CancellationToken,
task_handles: Vec<JoinHandle<()>>,
}
impl ReplicationEngine {
#[allow(clippy::too_many_arguments)]
pub async fn new(
config: ReplicationConfig,
p2p_node: Arc<P2PNode>,
storage: Arc<LmdbStorage>,
payment_verifier: Arc<PaymentVerifier>,
identity: Arc<NodeIdentity>,
root_dir: &Path,
fresh_write_rx: mpsc::UnboundedReceiver<fresh::FreshWriteEvent>,
shutdown: CancellationToken,
) -> Result<Self> {
config.validate().map_err(Error::Config)?;
let paid_list = Arc::new(
PaidList::new(root_dir)
.await
.map_err(|e| Error::Storage(format!("Failed to open PaidList: {e}")))?,
);
let initial_neighbors = NeighborSyncState::new_cycle(Vec::new());
let config = Arc::new(config);
let (possession_check_tx, possession_check_rx) = mpsc::unbounded_channel();
Ok(Self {
config: Arc::clone(&config),
p2p_node,
storage,
paid_list,
payment_verifier,
queues: Arc::new(RwLock::new(ReplicationQueues::new())),
sync_state: Arc::new(RwLock::new(initial_neighbors)),
sync_history: Arc::new(RwLock::new(HashMap::new())),
audit_on_gossip_cooldown: Arc::new(RwLock::new(HashMap::new())),
sync_cycle_epoch: Arc::new(RwLock::new(0)),
repair_proofs: Arc::new(RwLock::new(RepairProofs::new())),
bootstrap_state: Arc::new(RwLock::new(BootstrapState::new())),
is_bootstrapping: Arc::new(RwLock::new(true)),
sync_trigger: Arc::new(Notify::new()),
bootstrap_complete_notify: Arc::new(Notify::new()),
identity,
commitment_state: Arc::new(ResponderCommitmentState::new()),
last_commitment_by_peer: Arc::new(RwLock::new(HashMap::new())),
ever_capable_peers: Arc::new(RwLock::new(HashSet::new())),
recent_provers: Arc::new(RwLock::new(RecentProvers::new())),
sig_verify_attempts: Arc::new(RwLock::new(HashMap::new())),
send_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_REPLICATION_SENDS)),
audit_responder_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_AUDIT_RESPONSES)),
audit_responder_inflight: Arc::new(RwLock::new(HashMap::new())),
fresh_write_rx: Some(fresh_write_rx),
possession_check_tx,
possession_check_rx: Some(possession_check_rx),
shutdown,
task_handles: Vec::new(),
})
}
#[must_use]
pub fn paid_list(&self) -> &Arc<PaidList> {
&self.paid_list
}
#[must_use]
pub fn commitment_state(&self) -> &Arc<ResponderCommitmentState> {
&self.commitment_state
}
#[must_use]
pub fn last_commitment_by_peer(&self) -> &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>> {
&self.last_commitment_by_peer
}
#[must_use]
pub fn recent_provers(&self) -> &Arc<RwLock<RecentProvers>> {
&self.recent_provers
}
#[cfg(any(test, feature = "test-utils"))]
pub async fn rebuild_commitment_now(&self) -> Result<()> {
rebuild_and_rotate_commitment(
&self.storage,
&self.identity,
&self.commitment_state,
&self.p2p_node,
&self.config,
)
.await
}
#[cfg(any(feature = "test-utils", test))]
pub async fn inject_peer_commitment_for_test(
&self,
peer: &PeerId,
commitment: StorageCommitment,
) {
let now = Instant::now();
self.last_commitment_by_peer
.write()
.await
.insert(*peer, PeerCommitmentRecord::from_verified(commitment, now));
self.ever_capable_peers.write().await.insert(*peer);
}
#[cfg(any(test, feature = "test-utils"))]
pub async fn audit_peer_now(&self, peer: &PeerId) -> audit::AuditTickResult {
let target = {
let map = self.last_commitment_by_peer.read().await;
map.get(peer)
.and_then(PeerCommitmentRecord::last_commitment)
.and_then(|c| commitment_hash(c).map(|h| (h, c.key_count)))
};
let Some((pin, key_count)) = target else {
return audit::AuditTickResult::Idle;
};
let credit = storage_commitment_audit::AuditCredit {
recent_provers: &self.recent_provers,
};
storage_commitment_audit::run_subtree_audit(
&self.p2p_node,
&self.config,
peer,
pin,
key_count,
Some(&credit),
)
.await
}
#[cfg(any(test, feature = "test-utils"))]
pub async fn run_possession_check_now(&self, key: XorName, peers: Vec<PeerId>) {
possession::run_possession_check(
key,
peers,
&self.p2p_node,
&self.storage,
&self.config,
&self.sync_state,
&self.shutdown,
)
.await;
}
pub fn start(&mut self, dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>) {
if !self.task_handles.is_empty() {
error!("ReplicationEngine::start() called while already running — ignoring");
return;
}
info!("Starting replication engine");
self.start_message_handler();
self.start_neighbor_sync_loop();
self.start_self_lookup_loop();
self.start_audit_loop();
self.start_commitment_rotation_loop();
self.start_fetch_worker();
self.start_verification_worker();
self.start_bootstrap_sync(dht_events);
self.start_fresh_write_drainer();
self.start_possession_check_scheduler();
info!(
"Replication engine started with {} background tasks",
self.task_handles.len()
);
}
pub async fn is_bootstrapping(&self) -> bool {
*self.is_bootstrapping.read().await
}
pub async fn wait_for_bootstrap_complete(&self, timeout: Duration) -> bool {
let notified = self.bootstrap_complete_notify.notified();
tokio::pin!(notified);
notified.as_mut().enable();
if !*self.is_bootstrapping.read().await {
return true;
}
tokio::time::timeout(timeout, notified).await.is_ok()
}
pub async fn shutdown(&mut self) {
self.shutdown.cancel();
for (i, mut handle) in self.task_handles.drain(..).enumerate() {
match tokio::time::timeout(std::time::Duration::from_secs(10), &mut handle).await {
Ok(Ok(())) => {}
Ok(Err(e)) if e.is_cancelled() => {}
Ok(Err(e)) => warn!("Replication task {i} panicked during shutdown: {e}"),
Err(_) => {
warn!("Replication task {i} did not stop within 10s, aborting");
handle.abort();
}
}
}
}
pub fn trigger_neighbor_sync(&self) {
self.sync_trigger.notify_one();
}
pub async fn replicate_fresh(&self, key: &XorName, data: &[u8], proof_of_payment: &[u8]) {
let peers = fresh::replicate_fresh(
key,
data,
proof_of_payment,
&self.p2p_node,
&self.paid_list,
&self.config,
&self.send_semaphore,
)
.await;
if !peers.is_empty() {
let _ = self
.possession_check_tx
.send(possession::PossessionCheckEvent { key: *key, peers });
}
}
fn start_fresh_write_drainer(&mut self) {
let Some(mut rx) = self.fresh_write_rx.take() else {
return;
};
let p2p = Arc::clone(&self.p2p_node);
let paid_list = Arc::clone(&self.paid_list);
let config = Arc::clone(&self.config);
let send_semaphore = Arc::clone(&self.send_semaphore);
let possession_tx = self.possession_check_tx.clone();
let shutdown = self.shutdown.clone();
let handle = tokio::spawn(async move {
loop {
tokio::select! {
() = shutdown.cancelled() => break,
event = rx.recv() => {
let Some(event) = event else { break };
let peers = fresh::replicate_fresh(
&event.key,
&event.data,
&event.payment_proof,
&p2p,
&paid_list,
&config,
&send_semaphore,
)
.await;
if !peers.is_empty() {
let _ = possession_tx.send(possession::PossessionCheckEvent {
key: event.key,
peers,
});
}
}
}
}
debug!("Fresh-write drainer shut down");
});
self.task_handles.push(handle);
}
fn start_possession_check_scheduler(&mut self) {
let Some(mut rx) = self.possession_check_rx.take() else {
return;
};
let p2p = Arc::clone(&self.p2p_node);
let storage = Arc::clone(&self.storage);
let config = Arc::clone(&self.config);
let sync_state = Arc::clone(&self.sync_state);
let shutdown = self.shutdown.clone();
let handle = tokio::spawn(async move {
loop {
tokio::select! {
() = shutdown.cancelled() => break,
event = rx.recv() => {
let Some(event) = event else { break };
let p2p = Arc::clone(&p2p);
let storage = Arc::clone(&storage);
let config = Arc::clone(&config);
let sync_state = Arc::clone(&sync_state);
let shutdown = shutdown.clone();
let delay_min = config.possession_check_delay_min;
let delay_max = config.possession_check_delay_max;
tokio::spawn(async move {
let delay = possession::random_delay(delay_min, delay_max);
tokio::select! {
() = shutdown.cancelled() => {}
() = tokio::time::sleep(delay) => {
possession::run_possession_check(
event.key,
event.peers,
&p2p,
&storage,
&config,
&sync_state,
&shutdown,
)
.await;
}
}
});
}
}
}
debug!("Possession-check scheduler shut down");
});
self.task_handles.push(handle);
}
#[allow(clippy::too_many_lines)]
fn start_message_handler(&mut self) {
let mut p2p_events = self.p2p_node.subscribe_events();
let mut dht_events = self.p2p_node.dht_manager().subscribe_events();
let p2p = Arc::clone(&self.p2p_node);
let storage = Arc::clone(&self.storage);
let paid_list = Arc::clone(&self.paid_list);
let payment_verifier = Arc::clone(&self.payment_verifier);
let queues = Arc::clone(&self.queues);
let config = Arc::clone(&self.config);
let shutdown = self.shutdown.clone();
let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
let bootstrap_state = Arc::clone(&self.bootstrap_state);
let sync_history = Arc::clone(&self.sync_history);
let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
let repair_proofs = Arc::clone(&self.repair_proofs);
let sync_trigger = Arc::clone(&self.sync_trigger);
let my_commitment_state = Arc::clone(&self.commitment_state);
let last_commitment_by_peer = Arc::clone(&self.last_commitment_by_peer);
let ever_capable_peers = Arc::clone(&self.ever_capable_peers);
let recent_provers = Arc::clone(&self.recent_provers);
let sig_verify_attempts = Arc::clone(&self.sig_verify_attempts);
let audit_on_gossip_cooldown = Arc::clone(&self.audit_on_gossip_cooldown);
let sync_state = Arc::clone(&self.sync_state);
let audit_responder_semaphore = Arc::clone(&self.audit_responder_semaphore);
let audit_responder_inflight = Arc::clone(&self.audit_responder_inflight);
let gossip_audit = GossipAuditTrigger {
p2p_node: Arc::clone(&p2p),
config: Arc::clone(&config),
recent_provers: Arc::clone(&recent_provers),
sync_state: Arc::clone(&sync_state),
cooldown: Arc::clone(&audit_on_gossip_cooldown),
};
let handle = tokio::spawn(async move {
loop {
tokio::select! {
() = shutdown.cancelled() => break,
event = p2p_events.recv() => {
let Ok(event) = event else { continue };
if let P2PEvent::Message {
topic,
source: Some(source),
data,
..
} = event {
let rr_info = if topic == REPLICATION_PROTOCOL_ID {
Some((data.clone(), None))
} else if topic.starts_with(RR_PREFIX)
&& &topic[RR_PREFIX.len()..] == REPLICATION_PROTOCOL_ID
{
P2PNode::parse_request_envelope(&data)
.filter(|(_, is_resp, _)| !is_resp)
.map(|(msg_id, _, payload)| (payload, Some(msg_id)))
} else {
None
};
if let Some((payload, rr_message_id)) = rr_info {
match handle_replication_message(
&source,
&payload,
&p2p,
&storage,
&paid_list,
&payment_verifier,
&queues,
&config,
&is_bootstrapping,
&bootstrap_state,
&sync_history,
&sync_cycle_epoch,
&repair_proofs,
&last_commitment_by_peer,
&ever_capable_peers,
&sig_verify_attempts,
&my_commitment_state,
&gossip_audit,
&audit_responder_semaphore,
&audit_responder_inflight,
rr_message_id.as_deref(),
).await {
Ok(()) => {}
Err(e) => {
debug!(
"Replication message from {source} error: {e}"
);
}
}
}
}
}
dht_event = dht_events.recv() => {
let Ok(dht_event) = dht_event else { continue };
match dht_event {
DhtNetworkEvent::KClosestPeersChanged { old, new } => {
let old_peers = old
.iter()
.take(config.neighbor_sync_scope)
.copied()
.collect::<HashSet<_>>();
let new_scoped = new
.iter()
.take(config.neighbor_sync_scope)
.copied()
.collect::<Vec<_>>();
let new_peers =
new_scoped.iter().copied().collect::<HashSet<_>>();
let entrants = new_scoped
.iter()
.copied()
.filter(|peer| !old_peers.contains(peer))
.collect::<Vec<_>>();
let entrant_count = entrants.len();
let (priority_insertions, sync_removals) = {
let mut state = sync_state.write().await;
let sync_removals = state.retain_sync_peers(&new_peers);
let priority_insertions = state.queue_priority_peers(entrants);
(priority_insertions, sync_removals)
};
if priority_insertions > 0 {
debug!(
"K-closest peers changed, queued {priority_insertions}/{entrant_count} new close peers for priority neighbor sync and pruned {sync_removals} departed pending sync entries"
);
} else {
debug!(
"K-closest peers changed, no additional close peers queued, pruned {sync_removals} departed pending sync entries, triggering early neighbor sync"
);
}
sync_trigger.notify_one();
}
DhtNetworkEvent::PeerRemoved { peer_id } => {
sync_state.write().await.remove_peer(&peer_id);
repair_proofs.write().await.remove_peer(&peer_id);
last_commitment_by_peer.write().await.remove(&peer_id);
recent_provers.write().await.forget_peer(&peer_id);
sig_verify_attempts.write().await.remove(&peer_id);
audit_on_gossip_cooldown.write().await.remove(&peer_id);
}
_ => {}
}
}
}
}
debug!("Replication message handler shut down");
});
self.task_handles.push(handle);
}
fn start_neighbor_sync_loop(&mut self) {
let p2p = Arc::clone(&self.p2p_node);
let storage = Arc::clone(&self.storage);
let paid_list = Arc::clone(&self.paid_list);
let queues = Arc::clone(&self.queues);
let config = Arc::clone(&self.config);
let shutdown = self.shutdown.clone();
let sync_state = Arc::clone(&self.sync_state);
let sync_history = Arc::clone(&self.sync_history);
let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
let repair_proofs = Arc::clone(&self.repair_proofs);
let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
let bootstrap_state = Arc::clone(&self.bootstrap_state);
let sync_trigger = Arc::clone(&self.sync_trigger);
let commitment_state = Arc::clone(&self.commitment_state);
let last_commitment_by_peer = Arc::clone(&self.last_commitment_by_peer);
let ever_capable_peers = Arc::clone(&self.ever_capable_peers);
let sig_verify_attempts = Arc::clone(&self.sig_verify_attempts);
let gossip_audit = GossipAuditTrigger {
p2p_node: Arc::clone(&p2p),
config: Arc::clone(&config),
recent_provers: Arc::clone(&self.recent_provers),
sync_state: Arc::clone(&sync_state),
cooldown: Arc::clone(&self.audit_on_gossip_cooldown),
};
let handle = tokio::spawn(async move {
loop {
let interval = config.random_neighbor_sync_interval();
tokio::select! {
() = shutdown.cancelled() => break,
() = tokio::time::sleep(interval) => {}
() = sync_trigger.notified() => {
debug!("Neighbor sync triggered by topology change");
}
}
tokio::select! {
() = shutdown.cancelled() => break,
() = run_neighbor_sync_round(
&p2p,
&storage,
&paid_list,
&queues,
&config,
&sync_state,
&sync_history,
&sync_cycle_epoch,
&repair_proofs,
&is_bootstrapping,
&bootstrap_state,
&commitment_state,
&last_commitment_by_peer,
&ever_capable_peers,
&sig_verify_attempts,
&gossip_audit,
) => {}
}
}
debug!("Neighbor sync loop shut down");
});
self.task_handles.push(handle);
}
fn start_self_lookup_loop(&mut self) {
let p2p = Arc::clone(&self.p2p_node);
let config = Arc::clone(&self.config);
let shutdown = self.shutdown.clone();
let handle = tokio::spawn(async move {
loop {
let interval = config.random_self_lookup_interval();
tokio::select! {
() = shutdown.cancelled() => break,
() = tokio::time::sleep(interval) => {
if let Err(e) = p2p.dht_manager().trigger_self_lookup().await {
debug!("Self-lookup failed: {e}");
}
}
}
}
debug!("Self-lookup loop shut down");
});
self.task_handles.push(handle);
}
fn start_audit_loop(&mut self) {
let p2p = Arc::clone(&self.p2p_node);
let storage = Arc::clone(&self.storage);
let config = Arc::clone(&self.config);
let shutdown = self.shutdown.clone();
let sync_history = Arc::clone(&self.sync_history);
let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
let repair_proofs = Arc::clone(&self.repair_proofs);
let bootstrap_state = Arc::clone(&self.bootstrap_state);
let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
let sync_state = Arc::clone(&self.sync_state);
let handle = tokio::spawn(async move {
loop {
tokio::select! {
() = shutdown.cancelled() => return,
() = tokio::time::sleep(
std::time::Duration::from_secs(BOOTSTRAP_DRAIN_CHECK_SECS)
) => {
if bootstrap_state.read().await.is_drained() {
break;
}
}
}
}
{
let bootstrapping = *is_bootstrapping.read().await;
let result = {
let history = sync_history.read().await;
let current_sync_epoch = *sync_cycle_epoch.read().await;
audit::audit_tick_with_repair_proofs(
&p2p,
&storage,
&config,
&history,
&repair_proofs,
current_sync_epoch,
bootstrapping,
)
.await
};
handle_audit_result(&result, &p2p, &sync_state, &config).await;
}
loop {
let interval = config.random_audit_tick_interval();
tokio::select! {
() = shutdown.cancelled() => break,
() = tokio::time::sleep(interval) => {
let bootstrapping = *is_bootstrapping.read().await;
let result = {
let history = sync_history.read().await;
let current_sync_epoch = *sync_cycle_epoch.read().await;
audit::audit_tick_with_repair_proofs(
&p2p,
&storage,
&config,
&history,
&repair_proofs,
current_sync_epoch,
bootstrapping,
)
.await
};
handle_audit_result(&result, &p2p, &sync_state, &config).await;
}
}
}
debug!("Audit loop shut down");
});
self.task_handles.push(handle);
}
fn start_commitment_rotation_loop(&mut self) {
let storage = Arc::clone(&self.storage);
let identity = Arc::clone(&self.identity);
let commitment_state = Arc::clone(&self.commitment_state);
let shutdown = self.shutdown.clone();
let p2p = Arc::clone(&self.p2p_node);
let config = Arc::clone(&self.config);
let sync_trigger = Arc::clone(&self.sync_trigger);
let recent_provers = Arc::clone(&self.recent_provers);
let handle = tokio::spawn(async move {
if let Err(e) =
rebuild_and_rotate_commitment(&storage, &identity, &commitment_state, &p2p, &config)
.await
{
warn!("Initial commitment build failed: {e}");
} else {
sync_trigger.notify_one();
}
loop {
tokio::select! {
() = shutdown.cancelled() => break,
() = tokio::time::sleep(
std::time::Duration::from_secs(COMMITMENT_ROTATION_INTERVAL_SECS)
) => {
if let Err(e) = rebuild_and_rotate_commitment(
&storage,
&identity,
&commitment_state,
&p2p,
&config,
).await {
warn!("Commitment rotation failed: {e}");
}
let dropped = recent_provers.write().await.sweep_expired(
std::time::Instant::now()
);
if dropped > 0 {
debug!("recent_provers: swept {dropped} expired entries");
}
}
}
}
debug!("Commitment rotation loop shut down");
});
self.task_handles.push(handle);
}
#[allow(clippy::too_many_lines, clippy::option_if_let_else)]
fn start_fetch_worker(&mut self) {
let p2p = Arc::clone(&self.p2p_node);
let storage = Arc::clone(&self.storage);
let queues = Arc::clone(&self.queues);
let config = Arc::clone(&self.config);
let shutdown = self.shutdown.clone();
let bootstrap_state = Arc::clone(&self.bootstrap_state);
let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
let concurrency = max_parallel_fetch();
info!("Fetch worker concurrency set to {concurrency} (hardware threads)");
let handle = tokio::spawn(async move {
let mut in_flight = FuturesUnordered::<FetchFuture>::new();
loop {
{
let mut q = queues.write().await;
while in_flight.len() < concurrency {
let Some(candidate) = q.dequeue_fetch() else {
break;
};
let Some(&source) = candidate.sources.first() else {
warn!(
"Fetch candidate {} has no sources — dropping",
hex::encode(candidate.key)
);
continue;
};
q.start_fetch(candidate.key, source, candidate.sources.clone());
let p2p = Arc::clone(&p2p);
let storage = Arc::clone(&storage);
let config = Arc::clone(&config);
let token = shutdown.clone();
let fetch_key = candidate.key;
in_flight.push(Box::pin(async move {
let handle = tokio::spawn(async move {
tokio::select! {
() = token.cancelled() => FetchOutcome {
key: fetch_key,
result: FetchResult::SourceFailed,
},
outcome = execute_single_fetch(
p2p, storage, config, fetch_key, source,
) => outcome,
}
});
match handle.await {
Ok(outcome) => (outcome.key, Some(outcome)),
Err(e) => {
error!(
"Fetch task for {} panicked: {e}",
hex::encode(fetch_key)
);
(fetch_key, None)
}
}
}));
}
}
if in_flight.is_empty() {
tokio::select! {
() = shutdown.cancelled() => break,
() = tokio::time::sleep(
std::time::Duration::from_millis(FETCH_WORKER_POLL_MS)
) => continue,
}
}
tokio::select! {
() = shutdown.cancelled() => break,
Some((key, maybe_outcome)) = in_flight.next() => {
let mut q = queues.write().await;
let terminal = if let Some(outcome) = maybe_outcome {
match outcome.result {
FetchResult::Stored => {
q.complete_fetch(&key);
true
}
FetchResult::IntegrityFailed | FetchResult::SourceFailed => {
if let Some(next_peer) = q.retry_fetch(&key) {
let p2p = Arc::clone(&p2p);
let storage = Arc::clone(&storage);
let config = Arc::clone(&config);
let token = shutdown.clone();
let fetch_key = key;
in_flight.push(Box::pin(async move {
let handle = tokio::spawn(async move {
tokio::select! {
() = token.cancelled() => FetchOutcome {
key: fetch_key,
result: FetchResult::SourceFailed,
},
outcome = execute_single_fetch(
p2p, storage, config, fetch_key, next_peer,
) => outcome,
}
});
match handle.await {
Ok(outcome) => (outcome.key, Some(outcome)),
Err(e) => {
error!(
"Fetch task for {} panicked: {e}",
hex::encode(fetch_key)
);
(fetch_key, None)
}
}
}));
false
} else {
q.complete_fetch(&key);
true
}
}
}
} else {
q.complete_fetch(&key);
true
};
if terminal {
drop(q); if !bootstrap_state.read().await.is_drained() {
bootstrap_state.write().await.remove_key(&key);
let q = queues.read().await;
if bootstrap::check_bootstrap_drained(
&bootstrap_state,
&q,
)
.await
{
complete_bootstrap(
&is_bootstrapping,
&bootstrap_complete_notify,
).await;
}
}
}
}
}
}
while in_flight.next().await.is_some() {}
debug!("Fetch worker shut down");
});
self.task_handles.push(handle);
}
fn start_verification_worker(&mut self) {
let p2p = Arc::clone(&self.p2p_node);
let storage = Arc::clone(&self.storage);
let queues = Arc::clone(&self.queues);
let paid_list = Arc::clone(&self.paid_list);
let config = Arc::clone(&self.config);
let shutdown = self.shutdown.clone();
let bootstrap_state = Arc::clone(&self.bootstrap_state);
let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
let last_commitment_by_peer = Arc::clone(&self.last_commitment_by_peer);
let ever_capable_peers = Arc::clone(&self.ever_capable_peers);
let recent_provers = Arc::clone(&self.recent_provers);
let handle = tokio::spawn(async move {
loop {
tokio::select! {
() = shutdown.cancelled() => break,
() = tokio::time::sleep(
std::time::Duration::from_millis(VERIFICATION_WORKER_POLL_MS)
) => {
let ctx = VerificationCycleContext {
p2p_node: &p2p,
paid_list: &paid_list,
storage: &storage,
queues: &queues,
config: &config,
bootstrap_state: &bootstrap_state,
is_bootstrapping: &is_bootstrapping,
bootstrap_complete_notify: &bootstrap_complete_notify,
last_commitment_by_peer: &last_commitment_by_peer,
ever_capable_peers: &ever_capable_peers,
recent_provers: &recent_provers,
};
run_verification_cycle(ctx).await;
}
}
}
debug!("Verification worker shut down");
});
self.task_handles.push(handle);
}
#[allow(clippy::too_many_lines)]
fn start_bootstrap_sync(
&mut self,
dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>,
) {
let p2p = Arc::clone(&self.p2p_node);
let storage = Arc::clone(&self.storage);
let paid_list = Arc::clone(&self.paid_list);
let queues = Arc::clone(&self.queues);
let config = Arc::clone(&self.config);
let shutdown = self.shutdown.clone();
let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
let bootstrap_state = Arc::clone(&self.bootstrap_state);
let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
let repair_proofs = Arc::clone(&self.repair_proofs);
let my_commitment_state = Arc::clone(&self.commitment_state);
let last_commitment_by_peer = Arc::clone(&self.last_commitment_by_peer);
let ever_capable_peers = Arc::clone(&self.ever_capable_peers);
let sig_verify_attempts = Arc::clone(&self.sig_verify_attempts);
let handle = tokio::spawn(async move {
let gate = bootstrap::wait_for_bootstrap_complete(
dht_events,
config.bootstrap_complete_timeout_secs,
&shutdown,
)
.await;
if gate == bootstrap::BootstrapGateResult::Shutdown {
return;
}
let self_id = *p2p.peer_id();
let neighbors =
neighbor_sync::snapshot_close_neighbors(&p2p, &self_id, config.neighbor_sync_scope)
.await;
if neighbors.is_empty() {
info!("Bootstrap sync: no close neighbors found, marking drained");
bootstrap::mark_bootstrap_drained(&bootstrap_state).await;
complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
return;
}
let neighbor_count = neighbors.len();
info!("Bootstrap sync: syncing with {neighbor_count} close neighbors");
for batch in neighbors.chunks(config.neighbor_sync_peer_count) {
if shutdown.is_cancelled() {
break;
}
let mut hints_by_peer = neighbor_sync::build_sync_hints_for_peers(
batch,
&storage,
&paid_list,
&p2p,
config.close_group_size,
config.paid_list_close_group_size,
)
.await;
for peer in batch {
if shutdown.is_cancelled() {
break;
}
let bootstrapping = *is_bootstrapping.read().await;
bootstrap::increment_pending_requests(&bootstrap_state, 1).await;
let hints = hints_by_peer.remove(peer).unwrap_or_default();
let outcome = neighbor_sync::sync_with_peer_with_hints(
peer,
&p2p,
&config,
bootstrapping,
hints,
my_commitment_state
.current_for_gossip()
.map(|b| b.commitment().clone()),
)
.await;
bootstrap::decrement_pending_requests(&bootstrap_state, 1).await;
if let Some(outcome) = outcome {
ingest_peer_commitment(
peer,
outcome.response.commitment.as_ref(),
&p2p,
&last_commitment_by_peer,
&ever_capable_peers,
&sig_verify_attempts,
)
.await;
if !outcome.response.bootstrapping {
record_sent_replica_hints(
peer,
&outcome.sent_replica_hints,
&repair_proofs,
&sync_cycle_epoch,
)
.await;
let outcome = admit_and_queue_hints(
&self_id,
peer,
&outcome.response.replica_hints,
&outcome.response.paid_hints,
&p2p,
&config,
&storage,
&paid_list,
&queues,
)
.await;
if !outcome.discovered.is_empty() {
bootstrap::track_discovered_keys(
&bootstrap_state,
&outcome.discovered,
)
.await;
}
if outcome.capacity_rejected_count > 0 {
bootstrap::note_capacity_rejected(&bootstrap_state, *peer).await;
} else {
bootstrap::clear_capacity_rejected(&bootstrap_state, peer).await;
}
}
}
}
}
{
let q = queues.read().await;
if bootstrap::check_bootstrap_drained(&bootstrap_state, &q).await {
complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
}
}
info!("Bootstrap sync completed");
});
self.task_handles.push(handle);
}
}
struct AuditResponderGuard {
_permit: tokio::sync::OwnedSemaphorePermit,
inflight: Arc<RwLock<HashMap<PeerId, u32>>>,
peer: PeerId,
}
impl Drop for AuditResponderGuard {
fn drop(&mut self) {
let peer = self.peer;
if let Ok(mut map) = self.inflight.try_write() {
if let Some(n) = map.get_mut(&peer) {
*n = n.saturating_sub(1);
if *n == 0 {
map.remove(&peer);
}
}
return;
}
if let Ok(handle) = tokio::runtime::Handle::try_current() {
let inflight = Arc::clone(&self.inflight);
handle.spawn(async move {
let mut map = inflight.write().await;
if let Some(n) = map.get_mut(&peer) {
*n = n.saturating_sub(1);
if *n == 0 {
map.remove(&peer);
}
}
});
}
}
}
async fn admit_audit_responder(
semaphore: &Arc<Semaphore>,
inflight: &Arc<RwLock<HashMap<PeerId, u32>>>,
source: &PeerId,
) -> Option<AuditResponderGuard> {
{
let mut map = inflight.write().await;
let entry = map.entry(*source).or_insert(0);
if *entry >= MAX_AUDIT_RESPONSES_PER_PEER {
return None;
}
*entry += 1;
}
let Ok(permit) = Arc::clone(semaphore).try_acquire_owned() else {
let mut map = inflight.write().await;
if let Some(n) = map.get_mut(source) {
*n = n.saturating_sub(1);
if *n == 0 {
map.remove(source);
}
}
return None;
};
Some(AuditResponderGuard {
_permit: permit,
inflight: Arc::clone(inflight),
peer: *source,
})
}
#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
async fn handle_replication_message(
source: &PeerId,
data: &[u8],
p2p_node: &Arc<P2PNode>,
storage: &Arc<LmdbStorage>,
paid_list: &Arc<PaidList>,
payment_verifier: &Arc<PaymentVerifier>,
queues: &Arc<RwLock<ReplicationQueues>>,
config: &ReplicationConfig,
is_bootstrapping: &Arc<RwLock<bool>>,
bootstrap_state: &Arc<RwLock<BootstrapState>>,
sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
sync_cycle_epoch: &Arc<RwLock<u64>>,
repair_proofs: &Arc<RwLock<RepairProofs>>,
last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
ever_capable_peers: &Arc<RwLock<HashSet<PeerId>>>,
sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
my_commitment_state: &Arc<ResponderCommitmentState>,
gossip_audit: &GossipAuditTrigger,
audit_responder_semaphore: &Arc<Semaphore>,
audit_responder_inflight: &Arc<RwLock<HashMap<PeerId, u32>>>,
rr_message_id: Option<&str>,
) -> Result<()> {
let msg = ReplicationMessage::decode(data)
.map_err(|e| Error::Protocol(format!("Failed to decode replication message: {e}")))?;
match msg.body {
ReplicationMessageBody::FreshReplicationOffer(ref offer) => {
handle_fresh_offer(
source,
offer,
storage,
paid_list,
payment_verifier,
p2p_node,
config,
msg.request_id,
rr_message_id,
)
.await
}
ReplicationMessageBody::PaidNotify(ref notify) => {
handle_paid_notify(
source,
notify,
paid_list,
payment_verifier,
p2p_node,
config,
)
.await
}
ReplicationMessageBody::NeighborSyncRequest(ref request) => {
let bootstrapping = *is_bootstrapping.read().await;
if let Some(target) = ingest_peer_commitment(
source,
request.commitment.as_ref(),
p2p_node,
last_commitment_by_peer,
ever_capable_peers,
sig_verify_attempts,
)
.await
{
maybe_trigger_gossip_audit(gossip_audit, source, target).await;
}
handle_neighbor_sync_request(
source,
request,
p2p_node,
storage,
paid_list,
queues,
config,
bootstrapping,
bootstrap_state,
sync_history,
sync_cycle_epoch,
repair_proofs,
my_commitment_state
.current_for_gossip()
.map(|b| b.commitment().clone()),
msg.request_id,
rr_message_id,
)
.await
}
ReplicationMessageBody::VerificationRequest(ref request) => {
handle_verification_request(
source,
request,
storage,
paid_list,
p2p_node,
msg.request_id,
rr_message_id,
)
.await
}
ReplicationMessageBody::FetchRequest(ref request) => {
handle_fetch_request(
source,
request,
storage,
p2p_node,
msg.request_id,
rr_message_id,
)
.await
}
ReplicationMessageBody::AuditChallenge(challenge) => {
let Some(guard) =
admit_audit_responder(audit_responder_semaphore, audit_responder_inflight, source)
.await
else {
warn!(
"Audit challenge reply not sent: kind=responsible response=dropped \
source={source} (audit-responder capacity reached)"
);
return Ok(());
};
let bootstrapping = *is_bootstrapping.read().await;
let storage = Arc::clone(storage);
let p2p_node = Arc::clone(p2p_node);
let source = *source;
let request_id = msg.request_id;
let rr_message_id = rr_message_id.map(ToOwned::to_owned);
tokio::spawn(async move {
let _guard = guard; if let Err(e) = handle_audit_challenge_msg(
&source,
&challenge,
&storage,
&p2p_node,
bootstrapping,
request_id,
rr_message_id.as_deref(),
)
.await
{
debug!("Audit challenge from {source} error: {e}");
}
});
Ok(())
}
ReplicationMessageBody::SubtreeAuditChallenge(challenge) => {
info!(
"Audit challenge received: kind=subtree source={source} request_response={}",
rr_message_id.is_some(),
);
let Some(guard) =
admit_audit_responder(audit_responder_semaphore, audit_responder_inflight, source)
.await
else {
warn!(
"Audit challenge reply not sent: kind=subtree response=dropped \
source={source} (audit-responder capacity reached)"
);
return Ok(());
};
let bootstrapping = *is_bootstrapping.read().await;
let storage = Arc::clone(storage);
let p2p_node = Arc::clone(p2p_node);
let my_commitment_state = Arc::clone(my_commitment_state);
let source = *source;
let request_id = msg.request_id;
let rr_message_id = rr_message_id.map(ToOwned::to_owned);
tokio::spawn(async move {
let _guard = guard; let response = storage_commitment_audit::handle_subtree_challenge(
&challenge,
&storage,
p2p_node.peer_id(),
bootstrapping,
Some(&my_commitment_state),
)
.await;
let response_kind = subtree_audit_response_kind(&response);
let sent = send_replication_response_checked(
&source,
&p2p_node,
request_id,
ReplicationMessageBody::SubtreeAuditResponse(response),
rr_message_id.as_deref(),
)
.await;
if sent {
info!(
"Audit challenge reply sent: kind=subtree response={response_kind} \
source={source} request_response={}",
rr_message_id.is_some(),
);
} else {
warn!(
"Audit challenge reply not sent: kind=subtree response={response_kind} \
source={source} request_response={}",
rr_message_id.is_some(),
);
}
});
Ok(())
}
ReplicationMessageBody::SubtreeByteChallenge(challenge) => {
info!(
"Audit challenge received: kind=byte source={source} request_response={}",
rr_message_id.is_some(),
);
let Some(guard) =
admit_audit_responder(audit_responder_semaphore, audit_responder_inflight, source)
.await
else {
warn!(
"Audit challenge reply not sent: kind=byte response=dropped \
source={source} (audit-responder capacity reached)"
);
return Ok(());
};
let bootstrapping = *is_bootstrapping.read().await;
let storage = Arc::clone(storage);
let p2p_node = Arc::clone(p2p_node);
let my_commitment_state = Arc::clone(my_commitment_state);
let source = *source;
let request_id = msg.request_id;
let rr_message_id = rr_message_id.map(ToOwned::to_owned);
tokio::spawn(async move {
let _guard = guard; let response = storage_commitment_audit::handle_subtree_byte_challenge(
&challenge,
&storage,
p2p_node.peer_id(),
bootstrapping,
Some(&my_commitment_state),
)
.await;
let response_kind = subtree_byte_response_kind(&response);
let sent = send_replication_response_checked(
&source,
&p2p_node,
request_id,
ReplicationMessageBody::SubtreeByteResponse(response),
rr_message_id.as_deref(),
)
.await;
if sent {
info!(
"Audit challenge reply sent: kind=byte response={response_kind} \
source={source} request_response={}",
rr_message_id.is_some(),
);
} else {
warn!(
"Audit challenge reply not sent: kind=byte response={response_kind} \
source={source} request_response={}",
rr_message_id.is_some(),
);
}
});
Ok(())
}
ReplicationMessageBody::FreshReplicationResponse(_)
| ReplicationMessageBody::NeighborSyncResponse(_)
| ReplicationMessageBody::VerificationResponse(_)
| ReplicationMessageBody::FetchResponse(_)
| ReplicationMessageBody::AuditResponse(_)
| ReplicationMessageBody::SubtreeAuditResponse(_)
| ReplicationMessageBody::SubtreeByteResponse(_) => Ok(()),
}
}
#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
async fn handle_fresh_offer(
source: &PeerId,
offer: &protocol::FreshReplicationOffer,
storage: &Arc<LmdbStorage>,
paid_list: &Arc<PaidList>,
payment_verifier: &Arc<PaymentVerifier>,
p2p_node: &Arc<P2PNode>,
config: &ReplicationConfig,
request_id: u64,
rr_message_id: Option<&str>,
) -> Result<()> {
let self_id = *p2p_node.peer_id();
if offer.proof_of_payment.is_empty() {
send_replication_response(
source,
p2p_node,
request_id,
ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
key: offer.key,
reason: "Missing proof of payment".to_string(),
}),
rr_message_id,
)
.await;
return Ok(());
}
if offer.data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
warn!(
"Rejecting fresh offer for key {}: data size {} exceeds MAX_CHUNK_SIZE {}",
hex::encode(offer.key),
offer.data.len(),
crate::ant_protocol::MAX_CHUNK_SIZE,
);
p2p_node
.report_trust_event(
source,
TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
)
.await;
send_replication_response(
source,
p2p_node,
request_id,
ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
key: offer.key,
reason: format!(
"Data size {} exceeds maximum chunk size {}",
offer.data.len(),
crate::ant_protocol::MAX_CHUNK_SIZE,
),
}),
rr_message_id,
)
.await;
return Ok(());
}
let computed_key = crate::client::compute_address(&offer.data);
if computed_key != offer.key {
warn!(
"Rejecting fresh offer for key {}: content address mismatch, computed {}",
hex::encode(offer.key),
hex::encode(computed_key),
);
p2p_node
.report_trust_event(
source,
TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
)
.await;
send_replication_response(
source,
p2p_node,
request_id,
ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
key: offer.key,
reason: format!(
"Content address mismatch: expected {}, computed {}",
hex::encode(offer.key),
hex::encode(computed_key),
),
}),
rr_message_id,
)
.await;
return Ok(());
}
if !admission::is_responsible(
&self_id,
&offer.key,
p2p_node,
config.paid_list_close_group_size,
)
.await
{
send_replication_response(
source,
p2p_node,
request_id,
ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
key: offer.key,
reason: "Not in storage-admission range for this key".to_string(),
}),
rr_message_id,
)
.await;
return Ok(());
}
if let Err(e) = storage.check_capacity() {
info!(
target: "ant_node::storage::disk_precheck",
key = %hex::encode(offer.key),
"Rejecting fresh replication offer before payment verification: {e}"
);
send_replication_response(
source,
p2p_node,
request_id,
ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
key: offer.key,
reason: e.to_string(),
}),
rr_message_id,
)
.await;
return Ok(());
}
match payment_verifier
.verify_payment(
&offer.key,
Some(&offer.proof_of_payment),
fresh_offer_payment_context(),
)
.await
{
Ok(status) if status.can_store() => {
debug!(
"PoP validated for fresh offer key {}",
hex::encode(offer.key)
);
}
Ok(_) => {
send_replication_response(
source,
p2p_node,
request_id,
ReplicationMessageBody::FreshReplicationResponse(
FreshReplicationResponse::Rejected {
key: offer.key,
reason: "Payment verification failed: payment required".to_string(),
},
),
rr_message_id,
)
.await;
return Ok(());
}
Err(e) => {
warn!(
"PoP verification error for key {}: {e}",
hex::encode(offer.key)
);
send_replication_response(
source,
p2p_node,
request_id,
ReplicationMessageBody::FreshReplicationResponse(
FreshReplicationResponse::Rejected {
key: offer.key,
reason: format!("Payment verification error: {e}"),
},
),
rr_message_id,
)
.await;
return Ok(());
}
}
if let Err(e) = paid_list.insert(&offer.key).await {
warn!("Failed to add key to PaidForList: {e}");
}
match storage.put(&offer.key, &offer.data).await {
Ok(_) => {
send_replication_response(
source,
p2p_node,
request_id,
ReplicationMessageBody::FreshReplicationResponse(
FreshReplicationResponse::Accepted { key: offer.key },
),
rr_message_id,
)
.await;
}
Err(e) => {
send_replication_response(
source,
p2p_node,
request_id,
ReplicationMessageBody::FreshReplicationResponse(
FreshReplicationResponse::Rejected {
key: offer.key,
reason: e.to_string(),
},
),
rr_message_id,
)
.await;
}
}
Ok(())
}
async fn handle_paid_notify(
_source: &PeerId,
notify: &protocol::PaidNotify,
paid_list: &Arc<PaidList>,
payment_verifier: &Arc<PaymentVerifier>,
p2p_node: &Arc<P2PNode>,
config: &ReplicationConfig,
) -> Result<()> {
let self_id = *p2p_node.peer_id();
if notify.proof_of_payment.is_empty() {
return Ok(());
}
if !admission::is_in_paid_close_group(
&self_id,
¬ify.key,
p2p_node,
config.paid_list_close_group_size,
)
.await
{
return Ok(());
}
match payment_verifier
.verify_payment(
¬ify.key,
Some(¬ify.proof_of_payment),
paid_notify_payment_context(),
)
.await
{
Ok(status) if status.can_store() => {
debug!(
"PoP validated for paid notify key {}",
hex::encode(notify.key)
);
}
Ok(_) => {
warn!(
"Paid notify rejected: payment required for key {}",
hex::encode(notify.key)
);
return Ok(());
}
Err(e) => {
warn!(
"PoP verification error for paid notify key {}: {e}",
hex::encode(notify.key)
);
return Ok(());
}
}
if let Err(e) = paid_list.insert(¬ify.key).await {
warn!("Failed to add paid notify key to PaidForList: {e}");
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn handle_neighbor_sync_request(
source: &PeerId,
request: &protocol::NeighborSyncRequest,
p2p_node: &Arc<P2PNode>,
storage: &Arc<LmdbStorage>,
paid_list: &Arc<PaidList>,
queues: &Arc<RwLock<ReplicationQueues>>,
config: &ReplicationConfig,
is_bootstrapping: bool,
bootstrap_state: &Arc<RwLock<BootstrapState>>,
sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
sync_cycle_epoch: &Arc<RwLock<u64>>,
repair_proofs: &Arc<RwLock<RepairProofs>>,
my_commitment: Option<StorageCommitment>,
request_id: u64,
rr_message_id: Option<&str>,
) -> Result<()> {
let self_id = *p2p_node.peer_id();
let (response, sent_replica_hints, sender_in_rt) =
neighbor_sync::handle_sync_request_with_proofs(
source,
request,
p2p_node,
storage,
paid_list,
config,
is_bootstrapping,
my_commitment.clone(),
)
.await;
let response_sent = send_replication_response_checked(
source,
p2p_node,
request_id,
ReplicationMessageBody::NeighborSyncResponse(response),
rr_message_id,
)
.await;
if !sender_in_rt {
return Ok(());
}
{
let mut history = sync_history.write().await;
let record = history.entry(*source).or_insert(PeerSyncRecord {
last_sync: None,
cycles_since_sync: 0,
});
record.last_sync = Some(Instant::now());
record.cycles_since_sync = 0;
}
if response_sent && !request.bootstrapping {
record_sent_replica_hints(source, &sent_replica_hints, repair_proofs, sync_cycle_epoch)
.await;
}
let outcome = admit_and_queue_hints(
&self_id,
source,
&request.replica_hints,
&request.paid_hints,
p2p_node,
config,
storage,
paid_list,
queues,
)
.await;
if is_bootstrapping {
if !outcome.discovered.is_empty() {
bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
}
if outcome.capacity_rejected_count > 0 {
bootstrap::note_capacity_rejected(bootstrap_state, *source).await;
} else {
bootstrap::clear_capacity_rejected(bootstrap_state, source).await;
}
}
Ok(())
}
async fn handle_verification_request(
source: &PeerId,
request: &protocol::VerificationRequest,
storage: &Arc<LmdbStorage>,
paid_list: &Arc<PaidList>,
p2p_node: &Arc<P2PNode>,
request_id: u64,
rr_message_id: Option<&str>,
) -> Result<()> {
#[allow(clippy::cast_possible_truncation)]
let keys_len = request.keys.len() as u32;
let paid_check_set: HashSet<u32> = request
.paid_list_check_indices
.iter()
.copied()
.filter(|&idx| {
if idx >= keys_len {
warn!(
"Verification request from {source}: paid_list_check_index {idx} out of bounds (keys.len() = {})",
request.keys.len(),
);
false
} else {
true
}
})
.collect();
let mut results = Vec::with_capacity(request.keys.len());
for (i, key) in request.keys.iter().enumerate() {
let present = storage.exists(key).unwrap_or(false);
let paid = if paid_check_set.contains(&u32::try_from(i).unwrap_or(u32::MAX)) {
Some(paid_list.contains(key).unwrap_or(false))
} else {
None
};
results.push(protocol::KeyVerificationResult {
key: *key,
present,
paid,
});
}
send_replication_response(
source,
p2p_node,
request_id,
ReplicationMessageBody::VerificationResponse(VerificationResponse { results }),
rr_message_id,
)
.await;
Ok(())
}
async fn handle_fetch_request(
source: &PeerId,
request: &protocol::FetchRequest,
storage: &Arc<LmdbStorage>,
p2p_node: &Arc<P2PNode>,
request_id: u64,
rr_message_id: Option<&str>,
) -> Result<()> {
let response = match storage.get(&request.key).await {
Ok(Some(data)) => protocol::FetchResponse::Success {
key: request.key,
data,
},
Ok(None) => protocol::FetchResponse::NotFound { key: request.key },
Err(e) => protocol::FetchResponse::Error {
key: request.key,
reason: format!("{e}"),
},
};
send_replication_response(
source,
p2p_node,
request_id,
ReplicationMessageBody::FetchResponse(response),
rr_message_id,
)
.await;
Ok(())
}
async fn handle_audit_challenge_msg(
source: &PeerId,
challenge: &protocol::AuditChallenge,
storage: &Arc<LmdbStorage>,
p2p_node: &Arc<P2PNode>,
is_bootstrapping: bool,
request_id: u64,
rr_message_id: Option<&str>,
) -> Result<()> {
#[allow(clippy::cast_possible_truncation)]
let stored_chunks = storage.current_chunks().map_or(0, |c| c as usize);
info!(
"Audit challenge received: kind=responsible keys={} bootstrapping={} request_response={}",
challenge.keys.len(),
is_bootstrapping,
rr_message_id.is_some(),
);
let response = audit::handle_audit_challenge(
challenge,
storage,
p2p_node.peer_id(),
is_bootstrapping,
stored_chunks,
)
.await;
let response_kind = audit_response_kind(&response);
let sent = send_replication_response_checked(
source,
p2p_node,
request_id,
ReplicationMessageBody::AuditResponse(response),
rr_message_id,
)
.await;
if sent {
info!(
"Audit challenge reply sent: kind=responsible response={} keys={} request_response={}",
response_kind,
challenge.keys.len(),
rr_message_id.is_some(),
);
} else {
warn!(
"Audit challenge reply not sent: kind=responsible response={} keys={} request_response={}",
response_kind,
challenge.keys.len(),
rr_message_id.is_some(),
);
}
Ok(())
}
fn audit_response_kind(response: &protocol::AuditResponse) -> &'static str {
match response {
protocol::AuditResponse::Digests { .. } => "digests",
protocol::AuditResponse::Bootstrapping { .. } => "bootstrapping",
protocol::AuditResponse::Rejected { .. } => "rejected",
}
}
fn subtree_audit_response_kind(response: &protocol::SubtreeAuditResponse) -> &'static str {
match response {
protocol::SubtreeAuditResponse::Proof { .. } => "proof",
protocol::SubtreeAuditResponse::Bootstrapping { .. } => "bootstrapping",
protocol::SubtreeAuditResponse::Rejected { .. } => "rejected",
}
}
fn subtree_byte_response_kind(response: &protocol::SubtreeByteResponse) -> &'static str {
match response {
protocol::SubtreeByteResponse::Items { .. } => "items",
protocol::SubtreeByteResponse::Bootstrapping { .. } => "bootstrapping",
protocol::SubtreeByteResponse::Rejected { .. } => "rejected",
}
}
async fn send_replication_response(
peer: &PeerId,
p2p_node: &Arc<P2PNode>,
request_id: u64,
body: ReplicationMessageBody,
rr_message_id: Option<&str>,
) {
let _ =
send_replication_response_checked(peer, p2p_node, request_id, body, rr_message_id).await;
}
async fn send_replication_response_checked(
peer: &PeerId,
p2p_node: &Arc<P2PNode>,
request_id: u64,
body: ReplicationMessageBody,
rr_message_id: Option<&str>,
) -> bool {
let msg = ReplicationMessage { request_id, body };
let encoded = match msg.encode() {
Ok(data) => data,
Err(e) => {
warn!("Failed to encode replication response: {e}");
return false;
}
};
let result = if let Some(msg_id) = rr_message_id {
p2p_node
.send_response(peer, REPLICATION_PROTOCOL_ID, msg_id, encoded)
.await
} else {
p2p_node
.send_message(peer, REPLICATION_PROTOCOL_ID, encoded, &[])
.await
};
if let Err(e) = result {
debug!("Failed to send replication response to {peer}: {e}");
return false;
}
true
}
async fn record_sent_replica_hints(
peer: &PeerId,
hints: &[neighbor_sync::SentReplicaHint],
repair_proofs: &Arc<RwLock<RepairProofs>>,
sync_cycle_epoch: &Arc<RwLock<u64>>,
) {
if hints.is_empty() {
return;
}
let hinted_at_epoch = *sync_cycle_epoch.read().await;
let mut proofs = repair_proofs.write().await;
for hint in hints {
if proofs.record_replica_hint_sent(*peer, hint.key, &hint.close_peers, hinted_at_epoch) {
debug!(
"Recorded repair hint proof for peer {peer} and key {}",
hex::encode(hint.key)
);
}
}
}
#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
async fn run_neighbor_sync_round(
p2p_node: &Arc<P2PNode>,
storage: &Arc<LmdbStorage>,
paid_list: &Arc<PaidList>,
queues: &Arc<RwLock<ReplicationQueues>>,
config: &ReplicationConfig,
sync_state: &Arc<RwLock<NeighborSyncState>>,
sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
sync_cycle_epoch: &Arc<RwLock<u64>>,
repair_proofs: &Arc<RwLock<RepairProofs>>,
is_bootstrapping: &Arc<RwLock<bool>>,
bootstrap_state: &Arc<RwLock<BootstrapState>>,
commitment_state: &Arc<ResponderCommitmentState>,
last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
ever_capable_peers: &Arc<RwLock<HashSet<PeerId>>>,
sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
gossip_audit: &GossipAuditTrigger,
) {
let self_id = *p2p_node.peer_id();
let bootstrapping = *is_bootstrapping.read().await;
let cycle_complete = sync_state.read().await.is_cycle_complete();
if cycle_complete {
{
let mut history = sync_history.write().await;
for record in history.values_mut() {
record.cycles_since_sync = record.cycles_since_sync.saturating_add(1);
}
}
let current_sync_epoch = {
let mut epoch = sync_cycle_epoch.write().await;
*epoch = epoch.saturating_add(1);
*epoch
};
let allow_remote_prune_audits = !bootstrapping && bootstrap_state.read().await.is_drained();
pruning::run_prune_pass_with_context(pruning::PrunePassContext {
self_id: &self_id,
storage,
paid_list,
p2p_node,
config,
sync_state,
repair_proofs,
current_sync_epoch,
#[cfg(any(test, feature = "test-utils"))]
repair_proof_now: None,
allow_remote_prune_audits,
commitment_state: Some(commitment_state),
})
.await;
let neighbors =
neighbor_sync::snapshot_close_neighbors(p2p_node, &self_id, config.neighbor_sync_scope)
.await;
let mut state = sync_state.write().await;
if state.is_cycle_complete() {
let old_sync_times = std::mem::take(&mut state.last_sync_times);
let old_bootstrap_claims = std::mem::take(&mut state.bootstrap_claims);
let old_bootstrap_claim_history = std::mem::take(&mut state.bootstrap_claim_history);
let old_prune_cursor = state.prune_cursor;
*state = NeighborSyncState::new_cycle(neighbors);
state.last_sync_times = old_sync_times;
state.bootstrap_claims = old_bootstrap_claims;
state.bootstrap_claim_history = old_bootstrap_claim_history;
state.prune_cursor = old_prune_cursor;
}
}
let batch = {
let mut state = sync_state.write().await;
neighbor_sync::select_sync_batch(
&mut state,
config.neighbor_sync_peer_count,
config.neighbor_sync_cooldown,
)
};
if batch.is_empty() {
return;
}
debug!("Neighbor sync: syncing with {} peers", batch.len());
let my_commitment = commitment_state
.current_for_gossip()
.map(|b| b.commitment().clone());
let mut hints_by_peer = neighbor_sync::build_sync_hints_for_peers(
&batch,
storage,
paid_list,
p2p_node,
config.close_group_size,
config.paid_list_close_group_size,
)
.await;
for peer in &batch {
let hints = hints_by_peer.remove(peer).unwrap_or_default();
let outcome = neighbor_sync::sync_with_peer_with_hints(
peer,
p2p_node,
config,
bootstrapping,
hints,
my_commitment.clone(),
)
.await;
if let Some(outcome) = outcome {
handle_sync_response(
&self_id,
peer,
&outcome.response,
&outcome.sent_replica_hints,
p2p_node,
config,
bootstrapping,
bootstrap_state,
storage,
paid_list,
queues,
sync_state,
sync_history,
sync_cycle_epoch,
repair_proofs,
last_commitment_by_peer,
ever_capable_peers,
sig_verify_attempts,
gossip_audit,
)
.await;
} else {
let replacement = {
let mut state = sync_state.write().await;
neighbor_sync::handle_sync_failure(&mut state, peer, config.neighbor_sync_cooldown)
};
if let Some(replacement_peer) = replacement {
let mut replacement_hints = neighbor_sync::build_sync_hints_for_peers(
std::slice::from_ref(&replacement_peer),
storage,
paid_list,
p2p_node,
config.close_group_size,
config.paid_list_close_group_size,
)
.await;
let hints = replacement_hints
.remove(&replacement_peer)
.unwrap_or_default();
let replacement_outcome = neighbor_sync::sync_with_peer_with_hints(
&replacement_peer,
p2p_node,
config,
bootstrapping,
hints,
my_commitment.clone(),
)
.await;
if let Some(outcome) = replacement_outcome {
handle_sync_response(
&self_id,
&replacement_peer,
&outcome.response,
&outcome.sent_replica_hints,
p2p_node,
config,
bootstrapping,
bootstrap_state,
storage,
paid_list,
queues,
sync_state,
sync_history,
sync_cycle_epoch,
repair_proofs,
last_commitment_by_peer,
ever_capable_peers,
sig_verify_attempts,
gossip_audit,
)
.await;
}
}
}
}
}
#[allow(clippy::too_many_arguments)]
async fn handle_sync_response(
self_id: &PeerId,
peer: &PeerId,
resp: &NeighborSyncResponse,
sent_replica_hints: &[neighbor_sync::SentReplicaHint],
p2p_node: &Arc<P2PNode>,
config: &ReplicationConfig,
bootstrapping: bool,
bootstrap_state: &Arc<RwLock<BootstrapState>>,
storage: &Arc<LmdbStorage>,
paid_list: &Arc<PaidList>,
queues: &Arc<RwLock<ReplicationQueues>>,
sync_state: &Arc<RwLock<NeighborSyncState>>,
sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
sync_cycle_epoch: &Arc<RwLock<u64>>,
repair_proofs: &Arc<RwLock<RepairProofs>>,
last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
ever_capable_peers: &Arc<RwLock<HashSet<PeerId>>>,
sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
gossip_audit: &GossipAuditTrigger,
) {
if let Some(target) = ingest_peer_commitment(
peer,
resp.commitment.as_ref(),
p2p_node,
last_commitment_by_peer,
ever_capable_peers,
sig_verify_attempts,
)
.await
{
maybe_trigger_gossip_audit(gossip_audit, peer, target).await;
}
{
let mut state = sync_state.write().await;
neighbor_sync::record_successful_sync(&mut state, peer);
}
{
let mut history = sync_history.write().await;
let record = history.entry(*peer).or_insert(PeerSyncRecord {
last_sync: None,
cycles_since_sync: 0,
});
record.last_sync = Some(Instant::now());
record.cycles_since_sync = 0;
}
if resp.bootstrapping {
let should_report = {
let now = Instant::now();
let mut state = sync_state.write().await;
match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period) {
BootstrapClaimObservation::WithinGrace { .. } => false,
BootstrapClaimObservation::PastGrace { first_seen } => {
warn!(
"Peer {peer} has been claiming bootstrap for {:?}, \
exceeding grace period of {:?} — reporting abuse",
now.duration_since(first_seen),
config.bootstrap_claim_grace_period,
);
true
}
BootstrapClaimObservation::Repeated { first_seen } => {
warn!(
"Peer {peer} repeated bootstrap claim after previously stopping; \
first claim was {:?} ago — reporting abuse",
now.duration_since(first_seen),
);
true
}
}
};
if should_report {
p2p_node
.report_trust_event(
peer,
TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
)
.await;
}
} else {
{
let mut state = sync_state.write().await;
state.clear_active_bootstrap_claim(peer);
}
record_sent_replica_hints(peer, sent_replica_hints, repair_proofs, sync_cycle_epoch).await;
let outcome = admit_and_queue_hints(
self_id,
peer,
&resp.replica_hints,
&resp.paid_hints,
p2p_node,
config,
storage,
paid_list,
queues,
)
.await;
if bootstrapping {
if !outcome.discovered.is_empty() {
bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
}
if outcome.capacity_rejected_count > 0 {
bootstrap::note_capacity_rejected(bootstrap_state, *peer).await;
} else {
bootstrap::clear_capacity_rejected(bootstrap_state, peer).await;
}
}
}
}
#[allow(clippy::too_many_arguments)]
struct AdmissionOutcome {
discovered: HashSet<XorName>,
capacity_rejected_count: usize,
}
#[allow(clippy::too_many_arguments)]
async fn admit_and_queue_hints(
self_id: &PeerId,
source_peer: &PeerId,
replica_hints: &[XorName],
paid_hints: &[XorName],
p2p_node: &Arc<P2PNode>,
config: &ReplicationConfig,
storage: &Arc<LmdbStorage>,
paid_list: &Arc<PaidList>,
queues: &Arc<RwLock<ReplicationQueues>>,
) -> AdmissionOutcome {
let pending_keys: HashSet<XorName> = {
let q = queues.read().await;
q.pending_keys().into_iter().collect()
};
let admitted = admission::admit_hints(
self_id,
replica_hints,
paid_hints,
p2p_node,
config,
storage,
paid_list,
&pending_keys,
)
.await;
let mut discovered = HashSet::new();
let mut capacity_rejected_count: usize = 0;
let mut q = queues.write().await;
let now = Instant::now();
for key in admitted.replica_keys {
if !storage.exists(&key).unwrap_or(false) {
let result = q.add_pending_verify(
key,
VerificationEntry {
state: VerificationState::PendingVerify,
pipeline: HintPipeline::Replica,
verified_sources: Vec::new(),
tried_sources: HashSet::new(),
created_at: now,
hint_sender: *source_peer,
},
);
match result {
crate::replication::scheduling::AdmissionResult::Admitted => {
discovered.insert(key);
}
crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
crate::replication::scheduling::AdmissionResult::CapacityRejected => {
capacity_rejected_count += 1;
}
}
}
}
for key in admitted.paid_only_keys {
let result = q.add_pending_verify(
key,
VerificationEntry {
state: VerificationState::PendingVerify,
pipeline: HintPipeline::PaidOnly,
verified_sources: Vec::new(),
tried_sources: HashSet::new(),
created_at: now,
hint_sender: *source_peer,
},
);
match result {
crate::replication::scheduling::AdmissionResult::Admitted => {
discovered.insert(key);
}
crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
crate::replication::scheduling::AdmissionResult::CapacityRejected => {
capacity_rejected_count += 1;
}
}
}
if capacity_rejected_count > 0 {
debug!(
"admit_and_queue_hints from {source_peer}: {capacity_rejected_count} hints \
rejected at queue capacity; source will need to re-hint after pending_verify drains"
);
}
AdmissionOutcome {
discovered,
capacity_rejected_count,
}
}
#[allow(clippy::too_many_lines)]
async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) {
let cycle_started = Instant::now();
let VerificationCycleContext {
p2p_node,
paid_list,
storage,
queues,
config,
bootstrap_state,
is_bootstrapping,
bootstrap_complete_notify,
last_commitment_by_peer,
ever_capable_peers,
recent_provers,
} = ctx;
{
let mut q = queues.write().await;
q.evict_stale(config::PENDING_VERIFY_MAX_AGE);
}
let pending_keys = {
let q = queues.read().await;
q.pending_keys()
};
if pending_keys.is_empty() {
return;
}
let initial_pending_count = pending_keys.len();
let self_id = *p2p_node.peer_id();
let mut local_paid_presence_probe_keys = Vec::new();
let mut local_paid_paid_only_keys = Vec::new();
let mut keys_needing_network = Vec::new();
let mut terminal_keys: Vec<XorName> = Vec::new();
{
let mut q = queues.write().await;
for key in &pending_keys {
if paid_list.contains(key).unwrap_or(false) {
if let Some(pipeline) =
q.set_pending_state(key, VerificationState::PaidListVerified)
{
match pipeline {
HintPipeline::PaidOnly => {
local_paid_paid_only_keys.push(*key);
}
HintPipeline::Replica => {
local_paid_presence_probe_keys.push(*key);
}
}
}
} else {
keys_needing_network.push(*key);
}
}
}
if !local_paid_paid_only_keys.is_empty() {
let mut terminal_paid_only = Vec::new();
for key in local_paid_paid_only_keys {
if storage.exists(&key).unwrap_or(false) {
terminal_paid_only.push(key);
} else if admission::is_responsible(
&self_id,
&key,
p2p_node,
storage_admission_width(config.close_group_size),
)
.await
{
local_paid_presence_probe_keys.push(key);
} else {
terminal_paid_only.push(key);
}
}
if !terminal_paid_only.is_empty() {
let mut q = queues.write().await;
for key in terminal_paid_only {
q.remove_pending(&key);
terminal_keys.push(key);
}
}
}
let local_paid_probe_count = local_paid_presence_probe_keys.len();
let keys_needing_network_count = keys_needing_network.len();
if !local_paid_presence_probe_keys.is_empty() {
let targets = quorum::compute_presence_targets(
&local_paid_presence_probe_keys,
p2p_node,
config,
&self_id,
)
.await;
let evidence = quorum::run_verification_round(
&local_paid_presence_probe_keys,
&targets,
p2p_node,
config,
)
.await;
let mut q = queues.write().await;
for key in local_paid_presence_probe_keys {
if storage.exists(&key).unwrap_or(false) {
q.remove_pending(&key);
terminal_keys.push(key);
continue;
}
let sources = evidence.get(&key).map_or_else(Vec::new, |ev| {
quorum::present_sources_for_key(&key, ev, &targets)
});
if sources.is_empty() {
q.remove_pending(&key);
warn!(
"Locally paid key {} has no responding holders (possible data loss)",
hex::encode(key)
);
terminal_keys.push(key);
} else {
let distance = crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
let _ = q.promote_pending_to_fetch(key, distance, sources);
}
}
}
if !keys_needing_network.is_empty() {
let targets =
quorum::compute_verification_targets(&keys_needing_network, p2p_node, config, &self_id)
.await;
let evidence =
quorum::run_verification_round(&keys_needing_network, &targets, p2p_node, config).await;
let commitment_by_peer_snapshot: HashMap<PeerId, [u8; 32]> = {
let map = last_commitment_by_peer.read().await;
map.iter()
.filter_map(|(p, rec)| rec.commitment_hash().map(|h| (*p, h)))
.collect()
};
let capable_peer_snapshot: HashSet<PeerId> = ever_capable_peers.read().await.clone();
let provers_snapshot = recent_provers.read().await.clone();
let mut locally_held: HashSet<XorName> = HashSet::new();
for key in &keys_needing_network {
if storage.exists(key).unwrap_or(false) {
locally_held.insert(*key);
}
}
let holder_credit = |peer: &PeerId, key: &XorName| -> bool {
if !locally_held.contains(key) {
return true;
}
if !capable_peer_snapshot.contains(peer) {
return true;
}
let Some(hash) = commitment_by_peer_snapshot.get(peer) else {
return false;
};
provers_snapshot.is_credited_holder(key, peer, hash)
};
let mut evaluated: Vec<(XorName, KeyVerificationOutcome, HintPipeline)> = Vec::new();
{
let q = queues.read().await;
for key in &keys_needing_network {
let Some(ev) = evidence.get(key) else {
continue;
};
let Some(entry) = q.get_pending(key) else {
continue;
};
let outcome = quorum::evaluate_key_evidence_with_holder_check(
key,
ev,
&targets,
config,
holder_credit,
);
evaluated.push((*key, outcome, entry.pipeline));
}
}
let mut paid_insert_keys: Vec<XorName> = Vec::new();
for (key, outcome, _) in &evaluated {
if matches!(
outcome,
KeyVerificationOutcome::QuorumVerified { .. }
| KeyVerificationOutcome::PaidListVerified { .. }
) {
paid_insert_keys.push(*key);
}
}
for key in &paid_insert_keys {
if let Err(e) = paid_list.insert(key).await {
warn!("Failed to add verified key to PaidForList: {e}");
}
}
let mut paid_only_fetch_keys: HashSet<XorName> = HashSet::new();
for (key, outcome, pipeline) in &evaluated {
if *pipeline == HintPipeline::PaidOnly
&& matches!(
outcome,
KeyVerificationOutcome::QuorumVerified { .. }
| KeyVerificationOutcome::PaidListVerified { .. }
)
&& !storage.exists(key).unwrap_or(false)
&& admission::is_responsible(
&self_id,
key,
p2p_node,
storage_admission_width(config.close_group_size),
)
.await
{
paid_only_fetch_keys.insert(*key);
}
}
let mut q = queues.write().await;
for (key, outcome, pipeline) in evaluated {
match outcome {
KeyVerificationOutcome::QuorumVerified { sources }
| KeyVerificationOutcome::PaidListVerified { sources } => {
let fetch_eligible =
pipeline == HintPipeline::Replica || paid_only_fetch_keys.contains(&key);
if fetch_eligible && !sources.is_empty() {
let distance =
crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
let _ = q.promote_pending_to_fetch(key, distance, sources);
} else if fetch_eligible && sources.is_empty() {
warn!(
"Verified storage-admitted key {} has no holders (possible data loss)",
hex::encode(key)
);
q.remove_pending(&key);
terminal_keys.push(key);
} else {
q.remove_pending(&key);
terminal_keys.push(key);
}
}
KeyVerificationOutcome::QuorumFailed
| KeyVerificationOutcome::QuorumInconclusive => {
q.remove_pending(&key);
terminal_keys.push(key);
}
}
}
}
update_bootstrap_after_verification(
&terminal_keys,
bootstrap_state,
queues,
is_bootstrapping,
bootstrap_complete_notify,
)
.await;
let (pending_after, fetch_after, in_flight_after) = {
let q = queues.read().await;
(
q.pending_count(),
q.fetch_queue_count(),
q.in_flight_count(),
)
};
let terminal_key_count = terminal_keys.len();
let elapsed_ms = cycle_started.elapsed().as_millis();
if elapsed_ms >= VERIFICATION_CYCLE_SLOW_LOG_MS {
info!(
target: "ant_node::replication::verification",
"Slow replication verification cycle: pending_start={initial_pending_count}, local_paid_probe={local_paid_probe_count}, network_verify={keys_needing_network_count}, terminal={terminal_key_count}, pending_after={pending_after}, fetch_after={fetch_after}, in_flight_after={in_flight_after}, elapsed_ms={elapsed_ms}",
);
} else {
debug!(
target: "ant_node::replication::verification",
"Replication verification cycle: pending_start={initial_pending_count}, local_paid_probe={local_paid_probe_count}, network_verify={keys_needing_network_count}, terminal={terminal_key_count}, pending_after={pending_after}, fetch_after={fetch_after}, in_flight_after={in_flight_after}, elapsed_ms={elapsed_ms}",
);
}
}
async fn update_bootstrap_after_verification(
terminal_keys: &[XorName],
bootstrap_state: &Arc<RwLock<BootstrapState>>,
queues: &Arc<RwLock<ReplicationQueues>>,
is_bootstrapping: &Arc<RwLock<bool>>,
bootstrap_complete_notify: &Arc<Notify>,
) {
if terminal_keys.is_empty() || bootstrap_state.read().await.is_drained() {
return;
}
{
let mut bs = bootstrap_state.write().await;
for key in terminal_keys {
bs.remove_key(key);
}
}
let q = queues.read().await;
if bootstrap::check_bootstrap_drained(bootstrap_state, &q).await {
complete_bootstrap(is_bootstrapping, bootstrap_complete_notify).await;
}
}
async fn complete_bootstrap(
is_bootstrapping: &Arc<RwLock<bool>>,
bootstrap_complete_notify: &Arc<Notify>,
) {
*is_bootstrapping.write().await = false;
bootstrap_complete_notify.notify_waiters();
info!("Replication bootstrap complete");
}
enum FetchResult {
Stored,
IntegrityFailed,
SourceFailed,
}
struct FetchOutcome {
key: XorName,
result: FetchResult,
}
#[allow(clippy::too_many_lines)]
async fn execute_single_fetch(
p2p_node: Arc<P2PNode>,
storage: Arc<LmdbStorage>,
config: Arc<ReplicationConfig>,
key: XorName,
source: PeerId,
) -> FetchOutcome {
let request = protocol::FetchRequest { key };
let msg = ReplicationMessage {
request_id: rand::thread_rng().gen::<u64>(),
body: ReplicationMessageBody::FetchRequest(request),
};
let encoded = match msg.encode() {
Ok(data) => data,
Err(e) => {
warn!("Failed to encode fetch request: {e}");
return FetchOutcome {
key,
result: FetchResult::SourceFailed,
};
}
};
let result = p2p_node
.send_request(
&source,
REPLICATION_PROTOCOL_ID,
encoded,
config.fetch_request_timeout,
)
.await;
match result {
Ok(response) => {
let Ok(resp_msg) = ReplicationMessage::decode(&response.data) else {
p2p_node
.report_trust_event(
&source,
TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
)
.await;
return FetchOutcome {
key,
result: FetchResult::SourceFailed,
};
};
match resp_msg.body {
ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Success {
key: resp_key,
data,
}) => {
if resp_key != key {
warn!(
"Fetch response key mismatch: requested {}, got {}",
hex::encode(key),
hex::encode(resp_key)
);
p2p_node
.report_trust_event(
&source,
TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
)
.await;
return FetchOutcome {
key,
result: FetchResult::IntegrityFailed,
};
}
if data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
warn!(
"Fetched record {} exceeds MAX_CHUNK_SIZE ({} > {})",
hex::encode(resp_key),
data.len(),
crate::ant_protocol::MAX_CHUNK_SIZE,
);
p2p_node
.report_trust_event(
&source,
TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
)
.await;
return FetchOutcome {
key,
result: FetchResult::IntegrityFailed,
};
}
let computed = crate::client::compute_address(&data);
if computed != resp_key {
warn!(
"Fetched record integrity check failed: expected {}, got {}",
hex::encode(resp_key),
hex::encode(computed)
);
p2p_node
.report_trust_event(
&source,
TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
)
.await;
return FetchOutcome {
key,
result: FetchResult::IntegrityFailed,
};
}
if let Err(e) = storage.put(&resp_key, &data).await {
warn!(
"Failed to store fetched record {}: {e}",
hex::encode(resp_key)
);
return FetchOutcome {
key,
result: FetchResult::SourceFailed,
};
}
FetchOutcome {
key,
result: FetchResult::Stored,
}
}
ReplicationMessageBody::FetchResponse(protocol::FetchResponse::NotFound {
..
}) => {
warn!(
"Fetch: verified source {source} returned NotFound for {}",
hex::encode(key)
);
p2p_node
.report_trust_event(
&source,
TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
)
.await;
FetchOutcome {
key,
result: FetchResult::SourceFailed,
}
}
ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Error {
reason,
..
}) => {
warn!(
"Fetch: peer {source} returned error for {}: {reason}",
hex::encode(key)
);
p2p_node
.report_trust_event(
&source,
TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
)
.await;
FetchOutcome {
key,
result: FetchResult::SourceFailed,
}
}
_ => {
p2p_node
.report_trust_event(
&source,
TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
)
.await;
FetchOutcome {
key,
result: FetchResult::SourceFailed,
}
}
}
}
Err(e) => {
debug!("Fetch request to {source} failed: {e}");
FetchOutcome {
key,
result: FetchResult::SourceFailed,
}
}
}
}
fn first_failed_key_label(confirmed_failed_keys: &[XorName]) -> String {
confirmed_failed_keys.first().map_or_else(
|| "0x".to_string(),
|k| format!("0x{}", hex::encode(&k[..8])),
)
}
async fn handle_subtree_failed_audit(
challenged_peer: &PeerId,
confirmed_failed_key_count: usize,
reason: &AuditFailureReason,
p2p_node: &Arc<P2PNode>,
sync_state: &Arc<RwLock<NeighborSyncState>>,
recent_provers: &Arc<RwLock<RecentProvers>>,
) {
if matches!(reason, AuditFailureReason::Timeout) {
debug!(
"Audit timeout for {challenged_peer} fully graced \
(subtree audit does not evict on timeout)"
);
return;
}
let _ = confirmed_failed_key_count;
{
let mut state = sync_state.write().await;
state.clear_active_bootstrap_claim(challenged_peer);
}
{
let mut provers_guard = recent_provers.write().await;
apply_audit_failure_credit_revocation(&mut provers_guard, challenged_peer, reason);
}
p2p_node
.report_trust_event(
challenged_peer,
TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT),
)
.await;
}
async fn handle_subtree_audit_result(
result: &AuditTickResult,
p2p_node: &Arc<P2PNode>,
sync_state: &Arc<RwLock<NeighborSyncState>>,
recent_provers: &Arc<RwLock<RecentProvers>>,
config: &ReplicationConfig,
) {
match result {
AuditTickResult::Passed {
challenged_peer,
keys_checked,
} => {
debug!("Audit passed for {challenged_peer} ({keys_checked} keys)");
{
let mut state = sync_state.write().await;
state.clear_active_bootstrap_claim(challenged_peer);
}
p2p_node
.report_trust_event(
challenged_peer,
TrustEvent::ApplicationSuccess(REPLICATION_TRUST_WEIGHT),
)
.await;
}
AuditTickResult::Failed { evidence } => {
if let FailureEvidence::AuditFailure {
challenged_peer,
confirmed_failed_keys,
summary,
reason,
..
} = evidence
{
let first_failed_key = first_failed_key_label(confirmed_failed_keys);
error!(
"Audit failure for {challenged_peer}: reason={reason:?}, confirmed_failed_keys={}, challenged_keys={}, absent_keys={}, digest_mismatch_keys={}, first_failed_key={first_failed_key}",
confirmed_failed_keys.len(),
summary.challenged_keys,
summary.absent_keys,
summary.digest_mismatch_keys,
);
handle_subtree_failed_audit(
challenged_peer,
confirmed_failed_keys.len(),
reason,
p2p_node,
sync_state,
recent_provers,
)
.await;
}
}
AuditTickResult::BootstrapClaim { peer } => {
let should_report = {
let now = Instant::now();
let mut state = sync_state.write().await;
match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period)
{
BootstrapClaimObservation::WithinGrace { .. } => {
debug!("Audit: peer {peer} claims bootstrapping (within grace period)");
false
}
BootstrapClaimObservation::PastGrace { first_seen } => {
warn!(
"Audit: peer {peer} claiming bootstrap past grace period \
({:?} > {:?}), reporting abuse",
now.duration_since(first_seen),
config.bootstrap_claim_grace_period,
);
true
}
BootstrapClaimObservation::Repeated { first_seen } => {
warn!(
"Audit: peer {peer} repeated bootstrap claim after previously \
stopping; first claim was {:?} ago, reporting abuse",
now.duration_since(first_seen),
);
true
}
}
};
if should_report {
p2p_node
.report_trust_event(
peer,
TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
)
.await;
}
}
AuditTickResult::Idle | AuditTickResult::InsufficientKeys => {}
}
}
fn audit_failure_clears_bootstrap_claim(reason: &AuditFailureReason) -> bool {
!matches!(reason, AuditFailureReason::Timeout)
}
async fn handle_audit_result(
result: &AuditTickResult,
p2p_node: &Arc<P2PNode>,
sync_state: &Arc<RwLock<NeighborSyncState>>,
config: &ReplicationConfig,
) {
match result {
AuditTickResult::Passed {
challenged_peer,
keys_checked,
} => {
debug!("Audit passed for {challenged_peer} ({keys_checked} keys)");
{
let mut state = sync_state.write().await;
state.clear_active_bootstrap_claim(challenged_peer);
}
p2p_node
.report_trust_event(
challenged_peer,
TrustEvent::ApplicationSuccess(REPLICATION_TRUST_WEIGHT),
)
.await;
}
AuditTickResult::Failed { evidence } => {
if let FailureEvidence::AuditFailure {
challenged_peer,
confirmed_failed_keys,
summary,
reason,
..
} = evidence
{
let first_failed_key = first_failed_key_label(confirmed_failed_keys);
error!(
"Audit failure for {challenged_peer}: reason={reason:?}, confirmed_failed_keys={}, challenged_keys={}, absent_keys={}, digest_mismatch_keys={}, first_failed_key={first_failed_key}",
confirmed_failed_keys.len(),
summary.challenged_keys,
summary.absent_keys,
summary.digest_mismatch_keys,
);
if audit_failure_clears_bootstrap_claim(reason) {
let mut state = sync_state.write().await;
state.clear_active_bootstrap_claim(challenged_peer);
} else {
debug!("Audit timeout for {challenged_peer}; retaining active bootstrap claim");
}
p2p_node
.report_trust_event(
challenged_peer,
TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT),
)
.await;
}
}
AuditTickResult::BootstrapClaim { peer } => {
let should_report = {
let now = Instant::now();
let mut state = sync_state.write().await;
match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period)
{
BootstrapClaimObservation::WithinGrace { .. } => {
debug!("Audit: peer {peer} claims bootstrapping (within grace period)");
false
}
BootstrapClaimObservation::PastGrace { first_seen } => {
warn!(
"Audit: peer {peer} claiming bootstrap past grace period \
({:?} > {:?}), reporting abuse",
now.duration_since(first_seen),
config.bootstrap_claim_grace_period,
);
true
}
BootstrapClaimObservation::Repeated { first_seen } => {
warn!(
"Audit: peer {peer} repeated bootstrap claim after previously \
stopping; first claim was {:?} ago, reporting abuse",
now.duration_since(first_seen),
);
true
}
}
};
if should_report {
p2p_node
.report_trust_event(
peer,
TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
)
.await;
}
}
AuditTickResult::Idle | AuditTickResult::InsufficientKeys => {}
}
}
fn audit_failure_revokes_holder_credit(reason: &AuditFailureReason) -> bool {
!matches!(reason, AuditFailureReason::Timeout)
}
fn apply_audit_failure_credit_revocation(
provers: &mut RecentProvers,
challenged_peer: &PeerId,
reason: &AuditFailureReason,
) {
if audit_failure_revokes_holder_credit(reason) {
provers.forget_peer(challenged_peer);
}
}
#[derive(Clone)]
struct GossipAuditTrigger {
p2p_node: Arc<P2PNode>,
config: Arc<ReplicationConfig>,
recent_provers: Arc<RwLock<RecentProvers>>,
sync_state: Arc<RwLock<NeighborSyncState>>,
cooldown: Arc<RwLock<HashMap<PeerId, Instant>>>,
}
#[derive(Debug, Clone, Copy)]
struct AuditTarget {
pin_hash: [u8; 32],
key_count: u32,
}
fn cooldown_allows_audit(map: &mut HashMap<PeerId, Instant>, peer: &PeerId, now: Instant) -> bool {
let cooldown = Duration::from_secs(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS);
let known = match map.get(peer) {
Some(&last) => {
if now.saturating_duration_since(last) < cooldown {
return false;
}
true
}
None => false,
};
if !known && map.len() >= MAX_LAST_COMMITMENT_BY_PEER {
if let Some(victim) = map.iter().min_by_key(|(_, &ts)| ts).map(|(p, _)| *p) {
map.remove(&victim);
}
}
map.insert(*peer, now);
true
}
fn audit_launch_decision(
map: &mut HashMap<PeerId, Instant>,
peer: &PeerId,
now: Instant,
lottery_wins: bool,
) -> bool {
if !cooldown_allows_audit(map, peer, now) {
return false;
}
lottery_wins
}
async fn maybe_trigger_gossip_audit(
trigger: &GossipAuditTrigger,
peer: &PeerId,
target: AuditTarget,
) {
let now = Instant::now();
let lottery_wins = rand::thread_rng().gen_bool(config::AUDIT_ON_GOSSIP_PROBABILITY);
{
let mut map = trigger.cooldown.write().await;
if !audit_launch_decision(&mut map, peer, now, lottery_wins) {
return;
}
}
let trigger = trigger.clone();
let peer = *peer;
tokio::spawn(async move {
let credit = storage_commitment_audit::AuditCredit {
recent_provers: &trigger.recent_provers,
};
let result = storage_commitment_audit::run_subtree_audit(
&trigger.p2p_node,
&trigger.config,
&peer,
target.pin_hash,
target.key_count,
Some(&credit),
)
.await;
handle_subtree_audit_result(
&result,
&trigger.p2p_node,
&trigger.sync_state,
&trigger.recent_provers,
&trigger.config,
)
.await;
});
}
async fn sig_verify_rate_limit_ok(
sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
source: &PeerId,
now: Instant,
) -> bool {
let mut attempts = sig_verify_attempts.write().await;
if let Some(&last) = attempts.get(source) {
if now.saturating_duration_since(last) < COMMITMENT_SIG_VERIFY_MIN_INTERVAL {
return false;
}
}
if attempts.len() >= MAX_LAST_COMMITMENT_BY_PEER && !attempts.contains_key(source) {
if let Some(victim) = attempts.iter().min_by_key(|(_, &ts)| ts).map(|(p, _)| *p) {
attempts.remove(&victim);
}
}
attempts.insert(*source, now);
true
}
async fn handle_commitment_downgrade(
source: &PeerId,
last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
) -> Option<AuditTarget> {
let now = Instant::now();
let cached = {
let map = last_commitment_by_peer.read().await;
map.get(source).and_then(|rec| {
if !rec.commitment_capable {
return None;
}
let last = rec.last_commitment()?;
let pin = rec.commitment_hash()?;
let fresh = now.saturating_duration_since(rec.received_at)
< crate::replication::commitment_state::GOSSIP_ANSWERABILITY_TTL;
Some((pin, last.key_count, fresh))
})
};
match cached {
Some((pin, key_count, true)) => {
warn!(
"ingest_peer_commitment: commitment-capable peer {source} sent None \
(downgrade attempt); auditing against its last cached commitment"
);
Some(AuditTarget {
pin_hash: pin,
key_count,
})
}
Some((_, _, false)) => {
if let Some(rec) = last_commitment_by_peer.write().await.get_mut(source) {
let still_stale = now.saturating_duration_since(rec.received_at)
>= crate::replication::commitment_state::GOSSIP_ANSWERABILITY_TTL;
if still_stale {
rec.clear_commitment();
debug!(
"ingest_peer_commitment: capable peer {source} sent None and its cached \
commitment aged past the answerability TTL; forgetting it"
);
}
}
None
}
None => None,
}
}
async fn ingest_peer_commitment(
source: &PeerId,
commitment: Option<&StorageCommitment>,
p2p_node: &Arc<P2PNode>,
last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
ever_capable_peers: &Arc<RwLock<HashSet<PeerId>>>,
sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
) -> Option<AuditTarget> {
let Some(c) = commitment else {
return handle_commitment_downgrade(source, last_commitment_by_peer).await;
};
if !p2p_node.dht_manager().is_in_routing_table(source).await {
debug!("ingest_peer_commitment: source {source} not in routing table (dropped)");
return None;
}
if &c.sender_peer_id != source.as_bytes() {
warn!(
"ingest_peer_commitment: sender_peer_id mismatch from {source} \
(dropped, possible relay attempt)"
);
return None;
}
let derived_peer_id = *blake3::hash(&c.sender_public_key).as_bytes();
if derived_peer_id != c.sender_peer_id {
warn!(
"ingest_peer_commitment: embedded pubkey does not hash to claimed peer_id for \
{source} (dropped, throwaway-key attack)"
);
return None;
}
let now = Instant::now();
if !sig_verify_rate_limit_ok(sig_verify_attempts, source, now).await {
debug!(
"ingest_peer_commitment: rate-limited sig verify from {source} \
(< {COMMITMENT_SIG_VERIFY_MIN_INTERVAL:?} since last attempt); dropped"
);
return None;
}
if !crate::replication::commitment::verify_commitment_signature(c) {
warn!(
"ingest_peer_commitment: signature did not verify under embedded key for {source} \
(dropped, forged commitment)"
);
return None;
}
let new_hash = commitment_hash(c);
let mut map = last_commitment_by_peer.write().await;
if map.len() >= MAX_LAST_COMMITMENT_BY_PEER && !map.contains_key(source) {
if let Some(victim) = map.keys().next().copied() {
map.remove(&victim);
warn!(
"ingest_peer_commitment: cache full ({MAX_LAST_COMMITMENT_BY_PEER}); \
evicted {victim} to admit {source}"
);
}
}
map.entry(*source)
.and_modify(|r| {
r.set_commitment(c.clone(), now);
r.last_sig_verify_at = now;
r.commitment_capable = true; })
.or_insert_with(|| PeerCommitmentRecord::from_verified(c.clone(), now));
drop(map);
{
let mut set = ever_capable_peers.write().await;
if set.contains(source) || set.len() < MAX_EVER_CAPABLE_PEERS {
set.insert(*source);
} else {
warn!(
"ingest_peer_commitment: ever_capable_peers at cap \
({MAX_EVER_CAPABLE_PEERS}); refusing to record {source} as sticky-capable"
);
}
}
new_hash.map(|pin_hash| AuditTarget {
pin_hash,
key_count: c.key_count,
})
}
async fn rebuild_and_rotate_commitment(
storage: &Arc<LmdbStorage>,
identity: &Arc<NodeIdentity>,
state: &Arc<ResponderCommitmentState>,
p2p: &Arc<P2PNode>,
config: &Arc<ReplicationConfig>,
) -> Result<()> {
use saorsa_pqc::api::sig::{MlDsaSecretKey, MlDsaVariant};
let stored_keys = storage
.all_keys()
.await
.map_err(|e| Error::Storage(format!("commitment build: read keys: {e}")))?;
let storage_empty = stored_keys.is_empty();
let self_id = *p2p.peer_id();
let mut keys = Vec::with_capacity(stored_keys.len());
for k in stored_keys {
if admission::is_responsible(&self_id, &k, p2p, config.close_group_size).await {
keys.push(k);
}
}
if keys.is_empty() {
if storage_empty {
if state.retained_slot_count() > 0 {
debug!("Commitment rotation: storage empty, clearing retained slots");
state.clear_all();
}
return Ok(());
}
debug!(
"Commitment rotation: no responsible keys to commit to; retiring current commitment \
(stays answerable until its gossip TTL lapses, bytes still on disk)"
);
state.retire_current();
return Ok(());
}
let cap = commitment::MAX_COMMITMENT_KEY_COUNT as usize;
if keys.len() > cap {
warn!(
"Commitment rotation: key set ({}) exceeds MAX_COMMITMENT_KEY_COUNT ({}); \
truncating — investigate as this likely means a misconfiguration",
keys.len(),
cap
);
}
let entries: Vec<_> = keys.into_iter().take(cap).map(|k| (k, k)).collect();
let candidate_tree = commitment::MerkleTree::build(entries)
.map_err(|e| Error::Crypto(format!("commitment tree build: {e}")))?;
let candidate_root = candidate_tree.root();
if let Some(current) = state.current() {
if current.commitment().root == candidate_root {
debug!(
"Commitment rotation: key set unchanged (root={}); skipping no-op re-sign",
hex::encode(candidate_root)
);
state.age_out();
return Ok(());
}
}
let sk_bytes = identity.secret_key_bytes().to_vec();
let sk = MlDsaSecretKey::from_bytes(MlDsaVariant::MlDsa65, &sk_bytes)
.map_err(|e| Error::Crypto(format!("commitment build: load sk: {e}")))?;
let pk_bytes = identity.public_key().as_bytes().to_vec();
let peer_id_bytes = *p2p.peer_id().as_bytes();
let built = commitment_state::BuiltCommitment::build_from_tree(
candidate_tree,
&peer_id_bytes,
&sk,
&pk_bytes,
)
.map_err(|e| Error::Crypto(format!("commitment build: {e}")))?;
let hash = hex::encode(built.hash());
let key_count = built.commitment().key_count;
state.rotate(built);
info!("Storage commitment rotated: hash={hash} key_count={key_count}");
Ok(())
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::{
apply_audit_failure_credit_revocation, audit_failure_clears_bootstrap_claim,
audit_failure_revokes_holder_credit, audit_launch_decision, config, cooldown_allows_audit,
first_failed_key_label, fresh_offer_payment_context, paid_notify_payment_context,
};
use crate::payment::VerificationContext;
use crate::replication::recent_provers::RecentProvers;
use crate::replication::types::AuditFailureReason;
use saorsa_core::identity::PeerId;
use std::collections::HashMap;
use std::time::Duration;
use std::time::Instant;
fn test_peer(b: u8) -> PeerId {
let mut bytes = [0u8; 32];
bytes[0] = b;
PeerId::from_bytes(bytes)
}
fn test_key(b: u8) -> crate::ant_protocol::XorName {
let mut k = [0u8; 32];
k[0] = b;
k
}
#[test]
fn fresh_offer_runs_client_put_payment_checks() {
assert_eq!(
fresh_offer_payment_context(),
VerificationContext::ClientPut
);
}
#[test]
fn paid_notify_uses_paid_list_admission_payment_checks() {
assert_eq!(
paid_notify_payment_context(),
VerificationContext::PaidListAdmission
);
}
#[test]
fn audit_timeout_preserves_active_bootstrap_claim() {
assert!(!audit_failure_clears_bootstrap_claim(
&AuditFailureReason::Timeout
));
}
fn strike_peer(b: u8) -> PeerId {
let mut bytes = [0u8; 32];
bytes[0] = b;
PeerId::from_bytes(bytes)
}
#[test]
fn gossip_flood_yields_at_most_one_audit_per_cooldown_window() {
let peer = strike_peer(1);
let mut map: HashMap<PeerId, Instant> = HashMap::new();
let t0 = Instant::now();
assert!(cooldown_allows_audit(&mut map, &peer, t0));
let mut passed = 1;
for _ in 0..100 {
if cooldown_allows_audit(&mut map, &peer, t0) {
passed += 1;
}
}
assert_eq!(
passed, 1,
"a flood at one instant must trigger exactly one audit"
);
}
#[test]
fn losing_lottery_still_consumes_cooldown_window() {
let peer = strike_peer(3);
let mut map: HashMap<PeerId, Instant> = HashMap::new();
let t0 = Instant::now();
assert!(
!audit_launch_decision(&mut map, &peer, t0, false),
"a losing ticket launches no audit"
);
let mut audits = 0;
for _ in 0..99 {
if audit_launch_decision(&mut map, &peer, t0, true) {
audits += 1;
}
}
assert_eq!(
audits, 0,
"a losing first ticket must consume the window so no later flooded \
message in the same window can audit"
);
let after = t0 + Duration::from_secs(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS + 1);
assert!(
audit_launch_decision(&mut map, &peer, after, true),
"after the cooldown a winning ticket audits again"
);
}
#[test]
fn cooldown_lets_audit_through_after_the_window() {
let peer = strike_peer(2);
let mut map: HashMap<PeerId, Instant> = HashMap::new();
let t0 = Instant::now();
assert!(cooldown_allows_audit(&mut map, &peer, t0));
let within = t0 + Duration::from_secs(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS - 1);
assert!(!cooldown_allows_audit(&mut map, &peer, within));
let after = t0 + Duration::from_secs(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS + 1);
assert!(cooldown_allows_audit(&mut map, &peer, after));
}
#[test]
fn cooldown_is_per_peer_independent() {
let mut map: HashMap<PeerId, Instant> = HashMap::new();
let t0 = Instant::now();
for i in 0..20u8 {
assert!(
cooldown_allows_audit(&mut map, &strike_peer(i), t0),
"peer {i} should be auditable independently"
);
}
}
#[test]
fn audit_on_gossip_constants_match_adr() {
assert_eq!(config::AUDIT_SPOTCHECK_COUNT, 5);
assert!((config::AUDIT_ON_GOSSIP_PROBABILITY - 0.2).abs() < f64::EPSILON);
assert_eq!(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS, 30 * 60);
}
#[test]
fn digest_mismatch_is_not_a_timeout_and_penalizes_immediately() {
assert!(audit_failure_clears_bootstrap_claim(
&AuditFailureReason::DigestMismatch
));
assert!(audit_failure_revokes_holder_credit(
&AuditFailureReason::DigestMismatch
));
}
#[test]
fn confirmed_failures_revoke_credit_timeout_does_not() {
for reason in [
AuditFailureReason::MalformedResponse,
AuditFailureReason::DigestMismatch,
AuditFailureReason::KeyAbsent,
AuditFailureReason::Rejected,
] {
assert!(
audit_failure_revokes_holder_credit(&reason),
"confirmed failure {reason:?} must revoke holder credit"
);
}
assert!(
!audit_failure_revokes_holder_credit(&AuditFailureReason::Timeout),
"Timeout must NOT revoke credit (single dropped packet != storage loss)"
);
}
#[test]
fn apply_revocation_strips_on_digest_mismatch_retains_on_timeout() {
let peer = test_peer(0xAB);
let key = test_key(1);
let hash = [0xCD; 32];
let mut provers = RecentProvers::new();
provers.record_proof(key, peer, hash, Instant::now());
assert!(
provers.is_credited_holder(&key, &peer, &hash),
"precondition: peer credited before failure"
);
apply_audit_failure_credit_revocation(
&mut provers,
&peer,
&AuditFailureReason::DigestMismatch,
);
assert!(
!provers.is_credited_holder(&key, &peer, &hash),
"DigestMismatch must strip the peer's holder credit"
);
let mut provers_timeout = RecentProvers::new();
provers_timeout.record_proof(key, peer, hash, Instant::now());
apply_audit_failure_credit_revocation(
&mut provers_timeout,
&peer,
&AuditFailureReason::Timeout,
);
assert!(
provers_timeout.is_credited_holder(&key, &peer, &hash),
"Timeout must retain holder credit (deliberate liveness cushion)"
);
}
#[test]
fn decoded_audit_failures_clear_active_bootstrap_claim() {
for reason in [
AuditFailureReason::MalformedResponse,
AuditFailureReason::DigestMismatch,
AuditFailureReason::KeyAbsent,
AuditFailureReason::Rejected,
] {
assert!(
audit_failure_clears_bootstrap_claim(&reason),
"decoded non-bootstrap failure {reason:?} should clear active claim"
);
}
}
#[test]
fn first_failed_key_label_truncates_to_16_hex_chars() {
let mut key = [0u8; 32];
key[0] = 0x18;
key[7] = 0xff;
for byte in &mut key[8..] {
*byte = 0xAA;
}
let label = first_failed_key_label(&[key]);
assert_eq!(label, "0x18000000000000ff");
assert_eq!(label.len(), "0x".len() + 16);
}
#[test]
fn first_failed_key_label_falls_back_when_empty() {
assert_eq!(first_failed_key_label(&[]), "0x");
}
#[test]
fn first_failed_key_label_uses_first_key_only() {
let first = [0x11u8; 32];
let second = [0x22u8; 32];
assert_eq!(
first_failed_key_label(&[first, second]),
format!("0x{}", hex::encode(&first[..8]))
);
}
}