use std::sync::Arc;
use std::time::{Duration, Instant};
use parking_lot::RwLock;
use saorsa_pqc::api::sig::MlDsaSecretKey;
use crate::ant_protocol::XorName;
use crate::replication::commitment::{
commitment_hash, sign_commitment, CommitmentError, MerkleTree, StorageCommitment,
};
#[derive(Debug, Clone)]
pub struct PeerCommitmentRecord {
last_commitment: Option<StorageCommitment>,
cached_hash: Option<[u8; 32]>,
pub commitment_capable: bool,
pub received_at: Instant,
pub last_sig_verify_at: Instant,
}
impl PeerCommitmentRecord {
#[must_use]
pub fn from_verified(commitment: StorageCommitment, now: Instant) -> Self {
let cached_hash = commitment_hash(&commitment);
Self {
last_commitment: Some(commitment),
cached_hash,
commitment_capable: true,
received_at: now,
last_sig_verify_at: now,
}
}
#[must_use]
pub fn capable_but_no_commitment(now: Instant) -> Self {
Self {
last_commitment: None,
cached_hash: None,
commitment_capable: true,
received_at: now,
last_sig_verify_at: now,
}
}
#[must_use]
pub fn last_commitment(&self) -> Option<&StorageCommitment> {
self.last_commitment.as_ref()
}
#[must_use]
pub fn commitment_hash(&self) -> Option<[u8; 32]> {
self.cached_hash
}
pub fn set_commitment(&mut self, commitment: StorageCommitment, now: Instant) {
self.cached_hash = commitment_hash(&commitment);
self.last_commitment = Some(commitment);
self.received_at = now;
}
pub fn clear_commitment(&mut self) {
self.last_commitment = None;
self.cached_hash = None;
}
}
pub struct BuiltCommitment {
commitment: StorageCommitment,
cached_hash: [u8; 32],
tree: MerkleTree,
}
impl BuiltCommitment {
pub fn build(
entries: Vec<(XorName, [u8; 32])>,
sender_peer_id: &[u8; 32],
secret_key: &MlDsaSecretKey,
sender_public_key: &[u8],
) -> Result<Self, CommitmentError> {
let tree = MerkleTree::build(entries)?;
Self::build_from_tree(tree, sender_peer_id, secret_key, sender_public_key)
}
pub fn build_from_tree(
tree: MerkleTree,
sender_peer_id: &[u8; 32],
secret_key: &MlDsaSecretKey,
sender_public_key: &[u8],
) -> Result<Self, CommitmentError> {
let root = tree.root();
let key_count = tree.key_count();
let signature = sign_commitment(
secret_key,
&root,
key_count,
sender_peer_id,
sender_public_key,
)?;
let commitment = StorageCommitment {
root,
key_count,
sender_peer_id: *sender_peer_id,
sender_public_key: sender_public_key.to_vec(),
signature,
};
let cached_hash = commitment_hash(&commitment).ok_or_else(|| {
CommitmentError::SignatureFailed("commitment serialization failed".to_string())
})?;
Ok(Self {
commitment,
cached_hash,
tree,
})
}
#[must_use]
pub fn commitment(&self) -> &StorageCommitment {
&self.commitment
}
#[must_use]
pub fn hash(&self) -> [u8; 32] {
self.cached_hash
}
#[must_use]
pub fn tree(&self) -> &MerkleTree {
&self.tree
}
#[must_use]
pub fn proof_for(&self, key: &XorName) -> Option<(Vec<[u8; 32]>, u32)> {
let idx = self.tree.key_index(key)?;
let path = self.tree.path_for(key)?;
let leaf_index = u32::try_from(idx).unwrap_or(u32::MAX);
Some((path, leaf_index))
}
#[must_use]
pub fn contains_key(&self, key: &XorName) -> bool {
self.tree.contains_key(key)
}
}
const RETAINED_GOSSIPED_COMMITMENTS: usize = 2;
pub(crate) const GOSSIP_ANSWERABILITY_TTL: Duration = Duration::from_secs(3 * 3600);
pub struct ResponderCommitmentState {
inner: RwLock<Inner>,
}
#[derive(Clone, Copy)]
struct GossipedAt {
hash: [u8; 32],
last_gossiped_at: Instant,
}
struct Inner {
slots: Vec<Arc<BuiltCommitment>>,
has_current: bool,
recently_gossiped: Vec<GossipedAt>,
}
impl Default for ResponderCommitmentState {
fn default() -> Self {
Self::new()
}
}
impl ResponderCommitmentState {
#[must_use]
pub fn new() -> Self {
Self {
inner: RwLock::new(Inner {
slots: Vec::with_capacity(RETAINED_GOSSIPED_COMMITMENTS + 1),
has_current: false,
recently_gossiped: Vec::with_capacity(RETAINED_GOSSIPED_COMMITMENTS),
}),
}
}
pub fn rotate(&self, new_current: BuiltCommitment) {
let new_current = Arc::new(new_current);
let mut guard = self.inner.write();
guard.slots.insert(0, new_current);
guard.has_current = true;
prune_slots(&mut guard, Instant::now());
}
pub fn retire_current(&self) {
let mut guard = self.inner.write();
guard.has_current = false;
prune_slots(&mut guard, Instant::now());
}
pub fn mark_gossiped(&self, hash: [u8; 32]) {
let now = Instant::now();
let mut guard = self.inner.write();
mark_gossiped_locked(&mut guard, hash, now);
}
#[must_use]
pub fn current_for_gossip(&self) -> Option<Arc<BuiltCommitment>> {
let now = Instant::now();
let mut guard = self.inner.write();
if !guard.has_current {
return None;
}
let current = guard.slots.first().map(Arc::clone)?;
mark_gossiped_locked(&mut guard, current.cached_hash, now);
Some(current)
}
pub fn age_out(&self) {
let mut guard = self.inner.write();
prune_slots(&mut guard, Instant::now());
}
#[must_use]
pub fn lookup_by_hash(&self, hash: &[u8; 32]) -> Option<Arc<BuiltCommitment>> {
let guard = self.inner.read();
for c in &guard.slots {
if &c.cached_hash == hash {
return Some(Arc::clone(c));
}
}
None
}
#[must_use]
pub fn is_held(&self, key: &XorName) -> bool {
self.inner.read().slots.iter().any(|c| c.contains_key(key))
}
#[must_use]
pub fn current(&self) -> Option<Arc<BuiltCommitment>> {
let guard = self.inner.read();
if guard.has_current {
guard.slots.first().map(Arc::clone)
} else {
None
}
}
#[must_use]
pub fn retained_slot_count(&self) -> usize {
self.inner.read().slots.len()
}
pub fn clear_all(&self) {
let mut guard = self.inner.write();
guard.slots.clear();
guard.has_current = false;
guard.recently_gossiped.clear();
}
}
fn mark_gossiped_locked(inner: &mut Inner, hash: [u8; 32], now: Instant) {
inner.recently_gossiped.retain(|g| g.hash != hash);
inner.recently_gossiped.insert(
0,
GossipedAt {
hash,
last_gossiped_at: now,
},
);
inner
.recently_gossiped
.truncate(RETAINED_GOSSIPED_COMMITMENTS);
prune_slots(inner, now);
}
fn prune_slots(inner: &mut Inner, now: Instant) {
inner
.recently_gossiped
.retain(|g| now.duration_since(g.last_gossiped_at) < GOSSIP_ANSWERABILITY_TTL);
let live: Vec<[u8; 32]> = inner.recently_gossiped.iter().map(|g| g.hash).collect();
let has_current = inner.has_current;
let mut idx = 0usize;
inner.slots.retain(|c| {
let keep = (has_current && idx == 0) || live.contains(&c.cached_hash);
idx += 1;
keep
});
if inner.slots.is_empty() {
inner.has_current = false;
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use crate::replication::commitment::{commitment_hash, leaf_hash, verify_path};
use saorsa_pqc::api::sig::ml_dsa_65;
fn key(byte: u8) -> XorName {
let mut k = [0u8; 32];
k[0] = byte;
k
}
fn bh(byte: u8) -> [u8; 32] {
[byte ^ 0x5A; 32]
}
fn keypair() -> (saorsa_pqc::api::sig::MlDsaPublicKey, MlDsaSecretKey) {
ml_dsa_65().generate_keypair().unwrap()
}
#[test]
fn built_commitment_hash_matches_global_hash() {
let (pk, sk) = keypair();
let pk_bytes = pk.to_bytes();
let entries: Vec<_> = (1..=5u8).map(|i| (key(i), bh(i))).collect();
let built = BuiltCommitment::build(entries, &[0xAB; 32], &sk, &pk_bytes).unwrap();
let expected = commitment_hash(built.commitment()).unwrap();
assert_eq!(built.hash(), expected);
}
#[test]
fn built_commitment_proof_verifies_under_its_own_root() {
let (pk, sk) = keypair();
let pk_bytes = pk.to_bytes();
let entries: Vec<_> = (1..=8u8).map(|i| (key(i), bh(i))).collect();
let built = BuiltCommitment::build(entries.clone(), &[1; 32], &sk, &pk_bytes).unwrap();
let root = built.commitment().root;
let key_count = built.commitment().key_count;
for (k, _) in &entries {
let (path, leaf_index) = built.proof_for(k).expect("present");
let bh_k = entries.iter().find(|(kk, _)| kk == k).unwrap().1;
let lh = leaf_hash(k, &bh_k);
assert!(
verify_path(&lh, &path, leaf_index as usize, key_count, &root),
"path verify failed for key {k:?}"
);
}
}
#[test]
fn proof_for_absent_key_is_none() {
let (pk, sk) = keypair();
let pk_bytes = pk.to_bytes();
let built = BuiltCommitment::build(
vec![(key(1), bh(1)), (key(2), bh(2))],
&[0; 32],
&sk,
&pk_bytes,
)
.unwrap();
assert!(built.proof_for(&key(99)).is_none());
}
#[test]
fn empty_state_returns_none() {
let state = ResponderCommitmentState::new();
assert!(state.current().is_none());
assert!(state.lookup_by_hash(&[0; 32]).is_none());
}
#[test]
fn clear_all_drops_every_slot() {
let (pk, sk) = keypair();
let pk_bytes = pk.to_bytes();
let state = ResponderCommitmentState::new();
let peer_id = *blake3::hash(&pk.to_bytes()).as_bytes();
let c1 = BuiltCommitment::build(vec![(key(1), bh(1))], &peer_id, &sk, &pk_bytes).unwrap();
let h1 = c1.hash();
state.rotate(c1);
state.mark_gossiped(h1); let c2 = BuiltCommitment::build(vec![(key(2), bh(2))], &peer_id, &sk, &pk_bytes).unwrap();
let h2 = c2.hash();
state.rotate(c2);
state.mark_gossiped(h2);
assert!(state.current().is_some());
assert!(state.lookup_by_hash(&h1).is_some());
state.clear_all();
assert!(state.current().is_none());
assert!(state.lookup_by_hash(&h1).is_none());
}
#[test]
fn lookup_arc_outlives_subsequent_rotation() {
let (pk, sk) = keypair();
let pk_bytes = pk.to_bytes();
let state = ResponderCommitmentState::new();
let c1 = BuiltCommitment::build(vec![(key(1), bh(1))], &[0; 32], &sk, &pk_bytes).unwrap();
let h1 = c1.hash();
state.rotate(c1);
let in_flight = state.lookup_by_hash(&h1).unwrap();
let c2 = BuiltCommitment::build(vec![(key(2), bh(2))], &[0; 32], &sk, &pk_bytes).unwrap();
state.rotate(c2);
assert!(state.lookup_by_hash(&h1).is_none());
assert_eq!(in_flight.hash(), h1);
assert!(in_flight.proof_for(&key(1)).is_some());
}
#[test]
fn gossiped_commitment_stays_answerable_across_rotations() {
let (pk, sk) = keypair();
let pk_bytes = pk.to_bytes();
let state = ResponderCommitmentState::new();
let c1 = BuiltCommitment::build(vec![(key(1), bh(1))], &[0; 32], &sk, &pk_bytes).unwrap();
let h1 = c1.hash();
state.rotate(c1);
state.mark_gossiped(h1);
let c2 = BuiltCommitment::build(vec![(key(2), bh(2))], &[0; 32], &sk, &pk_bytes).unwrap();
let h2 = c2.hash();
state.rotate(c2);
state.mark_gossiped(h2);
assert!(
state.lookup_by_hash(&h1).is_some(),
"c1 must stay answerable"
);
assert!(state.lookup_by_hash(&h2).is_some());
let c3 = BuiltCommitment::build(vec![(key(3), bh(3))], &[0; 32], &sk, &pk_bytes).unwrap();
let h3 = c3.hash();
state.rotate(c3);
state.mark_gossiped(h3);
assert!(
state.lookup_by_hash(&h1).is_none(),
"c1 aged out of gossip window"
);
assert!(state.lookup_by_hash(&h2).is_some());
assert!(state.lookup_by_hash(&h3).is_some());
}
#[test]
fn current_plus_last_two_gossiped_are_simultaneously_answerable() {
let (pk, sk) = keypair();
let pk_bytes = pk.to_bytes();
let state = ResponderCommitmentState::new();
let c1 = BuiltCommitment::build(vec![(key(1), bh(1))], &[0; 32], &sk, &pk_bytes).unwrap();
let h1 = c1.hash();
state.rotate(c1);
state.mark_gossiped(h1);
let c2 = BuiltCommitment::build(vec![(key(2), bh(2))], &[0; 32], &sk, &pk_bytes).unwrap();
let h2 = c2.hash();
state.rotate(c2);
state.mark_gossiped(h2);
assert!(
state.lookup_by_hash(&h1).is_some(),
"the commitment published just before the newest one must stay answerable"
);
assert!(
state.lookup_by_hash(&h2).is_some(),
"current must be answerable"
);
assert_ne!(h1, h2, "the two retained commitments must be distinct");
let c3 = BuiltCommitment::build(vec![(key(3), bh(3))], &[0; 32], &sk, &pk_bytes).unwrap();
let h3 = c3.hash();
state.rotate(c3);
state.mark_gossiped(h3);
assert_ne!(h2, h3);
assert_ne!(h1, h3);
assert!(
state.lookup_by_hash(&h3).is_some(),
"current (c3) answerable"
);
assert!(
state.lookup_by_hash(&h2).is_some(),
"c2 (published just before newest) answerable — the race-absorbing slot"
);
assert!(
state.lookup_by_hash(&h1).is_none(),
"c1 is the 3rd-oldest gossiped root and MUST be dropped — depth is exactly 2"
);
}
#[test]
fn is_held_tracks_keys_across_the_retention_window_and_ages_them_out() {
let (pk, sk) = keypair();
let pk_bytes = pk.to_bytes();
let state = ResponderCommitmentState::new();
let c1 = BuiltCommitment::build(vec![(key(1), bh(1))], &[0; 32], &sk, &pk_bytes).unwrap();
let h1 = c1.hash();
state.rotate(c1);
state.mark_gossiped(h1);
assert!(
state.is_held(&key(1)),
"freshly committed+gossiped key is held"
);
assert!(!state.is_held(&key(99)), "never-committed key is not held");
let c2 = BuiltCommitment::build(vec![(key(2), bh(2))], &[0; 32], &sk, &pk_bytes).unwrap();
let h2 = c2.hash();
state.rotate(c2);
state.mark_gossiped(h2);
assert!(
state.is_held(&key(1)),
"key dropped from the newest commitment is still held via the previous gossiped slot"
);
assert!(state.is_held(&key(2)), "newly committed key is held");
let c3 = BuiltCommitment::build(vec![(key(3), bh(3))], &[0; 32], &sk, &pk_bytes).unwrap();
let h3 = c3.hash();
state.rotate(c3);
state.mark_gossiped(h3);
assert!(
!state.is_held(&key(1)),
"key whose commitments all aged out of the retention window is no longer held"
);
assert!(
state.is_held(&key(2)),
"key(2) still held via the previous gossiped slot"
);
assert!(state.is_held(&key(3)), "current key held");
}
fn built(keys: &[u8]) -> BuiltCommitment {
let (pk, sk) = keypair();
let entries: Vec<_> = keys.iter().map(|&b| (key(b), bh(b))).collect();
BuiltCommitment::build(entries, &[0; 32], &sk, &pk.to_bytes()).unwrap()
}
#[test]
fn stale_gossip_record_expires_by_ttl_even_without_new_distinct_gossip() {
let c_current = Arc::new(built(&[1])); let c_stale = Arc::new(built(&[2])); let h_current = c_current.hash();
let h_stale = c_stale.hash();
let base = Instant::now();
let now = base + GOSSIP_ANSWERABILITY_TTL + Duration::from_secs(1);
let mut inner = Inner {
slots: vec![Arc::clone(&c_current), Arc::clone(&c_stale)],
has_current: true,
recently_gossiped: vec![
GossipedAt {
hash: h_current,
last_gossiped_at: now,
},
GossipedAt {
hash: h_stale,
last_gossiped_at: base,
},
],
};
prune_slots(&mut inner, now);
assert!(
inner.recently_gossiped.iter().all(|g| g.hash != h_stale),
"stale gossip record past its TTL must expire"
);
assert_eq!(inner.slots.len(), 1, "the stale slot must be dropped");
assert_eq!(inner.slots[0].hash(), h_current, "current slot retained");
assert!(
inner.slots.iter().all(|c| c.proof_for(&key(2)).is_none()),
"stale key is no longer held once its commitment ages out"
);
assert!(
inner.slots.iter().any(|c| c.proof_for(&key(1)).is_some()),
"current key still held"
);
}
#[test]
fn recent_gossip_record_stays_answerable_within_ttl() {
let c_current = Arc::new(built(&[1]));
let c_prev = Arc::new(built(&[2]));
let h_current = c_current.hash();
let h_prev = c_prev.hash();
let base = Instant::now();
let now = base + GOSSIP_ANSWERABILITY_TTL / 2;
let mut inner = Inner {
slots: vec![Arc::clone(&c_current), Arc::clone(&c_prev)],
has_current: true,
recently_gossiped: vec![
GossipedAt {
hash: h_current,
last_gossiped_at: now,
},
GossipedAt {
hash: h_prev,
last_gossiped_at: base,
},
],
};
prune_slots(&mut inner, now);
assert_eq!(
inner.slots.len(),
2,
"a commitment gossiped within the TTL must stay answerable (the 'two, not one' race window)"
);
assert!(
inner.slots.iter().any(|c| c.hash() == h_prev),
"the recently-gossiped previous commitment must not be dropped early"
);
}
#[test]
fn retire_current_hides_current_but_keeps_recent_pin_answerable() {
let state = ResponderCommitmentState::new();
let c1 = built(&[1]);
let h1 = c1.hash();
state.rotate(c1);
state.mark_gossiped(h1);
assert!(state.current().is_some(), "fresh current is advertised");
state.retire_current();
assert!(
state.current().is_none(),
"retired current must not be advertised (stops the gossip loop re-stamping it)"
);
assert!(
state.lookup_by_hash(&h1).is_some(),
"retired current stays answerable for an in-flight pin within its TTL"
);
assert!(
state.is_held(&key(1)),
"its keys are still held while answerable, so the pruner still vetoes them"
);
}
#[test]
fn retired_current_ages_out_by_gossip_ttl() {
let c1 = Arc::new(built(&[1]));
let h1 = c1.hash();
let base = Instant::now();
let now = base + GOSSIP_ANSWERABILITY_TTL + Duration::from_secs(1);
let mut inner = Inner {
slots: vec![Arc::clone(&c1)],
has_current: false, recently_gossiped: vec![GossipedAt {
hash: h1,
last_gossiped_at: base,
}],
};
prune_slots(&mut inner, now);
assert!(
inner.slots.is_empty(),
"retired current past its TTL is dropped"
);
assert!(!inner.has_current);
assert!(
inner.slots.iter().all(|c| c.proof_for(&key(1)).is_none()),
"its key is no longer held -> pruner reclaims it"
);
}
#[test]
fn retired_current_stays_answerable_within_ttl() {
let c1 = Arc::new(built(&[1]));
let h1 = c1.hash();
let base = Instant::now();
let now = base + GOSSIP_ANSWERABILITY_TTL / 2;
let mut inner = Inner {
slots: vec![Arc::clone(&c1)],
has_current: false, recently_gossiped: vec![GossipedAt {
hash: h1,
last_gossiped_at: base,
}],
};
prune_slots(&mut inner, now);
assert_eq!(
inner.slots.len(),
1,
"retired-but-recent current stays answerable"
);
assert_eq!(inner.slots[0].hash(), h1);
}
#[test]
fn re_acquire_after_retire_advertises_fresh_current_without_resurrecting_stale() {
let state = ResponderCommitmentState::new();
let c1 = built(&[1]);
let h1 = c1.hash();
state.rotate(c1);
state.mark_gossiped(h1); state.retire_current();
assert!(state.current().is_none());
let c2 = built(&[2]);
let h2 = c2.hash();
state.rotate(c2);
state.mark_gossiped(h2);
let cur = state
.current()
.expect("fresh current advertised after re-acquire");
assert_eq!(
cur.hash(),
h2,
"the FRESH commitment is current, not the retired one"
);
assert!(
state.lookup_by_hash(&h1).is_some(),
"the retired-but-recently-gossiped commitment is still answerable as a retained slot"
);
assert!(
state.is_held(&key(1)),
"retired key still held within its TTL"
);
assert!(state.is_held(&key(2)), "fresh current key held");
}
#[test]
fn retire_current_drops_ungossiped_current() {
let state = ResponderCommitmentState::new();
let c1 = built(&[1]);
let h1 = c1.hash();
state.rotate(c1);
state.retire_current();
assert!(state.current().is_none(), "no current after retire");
assert!(
state.lookup_by_hash(&h1).is_none(),
"an ungossiped retired current is not answerable (nothing to retain)"
);
assert!(!state.is_held(&key(1)));
}
#[test]
fn ungossiped_rebuild_does_not_evict_gossiped_commitment() {
let (pk, sk) = keypair();
let pk_bytes = pk.to_bytes();
let state = ResponderCommitmentState::new();
let c1 = BuiltCommitment::build(vec![(key(1), bh(1))], &[0; 32], &sk, &pk_bytes).unwrap();
let h1 = c1.hash();
state.rotate(c1);
state.mark_gossiped(h1);
for i in 2..=6u8 {
let c =
BuiltCommitment::build(vec![(key(i), bh(i))], &[0; 32], &sk, &pk_bytes).unwrap();
state.rotate(c);
}
assert!(
state.lookup_by_hash(&h1).is_some(),
"gossiped commitment must survive ungossiped rebuilds"
);
}
}