Skip to main content

exo_api/
p2p.rs

1// Copyright 2026 Exochain Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at:
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15// SPDX-License-Identifier: Apache-2.0
16
17//! Peer-to-peer mesh networking.
18use std::collections::{BTreeMap, BTreeSet};
19
20use exo_core::{Did, Hash256, PublicKey, Signature, Timestamp};
21use serde::{Deserialize, Serialize};
22
23use crate::error::{ApiError, Result};
24
25const P2P_MESSAGE_SIGNING_DOMAIN: &str = "exo.p2p.message.v1";
26pub const MAX_P2P_MESSAGE_PAYLOAD_BYTES: usize = 64 * 1024;
27const MAX_BOOTSTRAP_DISCOVERY_PEERS: usize = 4_096;
28const MAX_DIVERSE_SELECTION_PEERS: usize = 4_096;
29
30/// Unique identifier for a peer in the P2P mesh, wrapping a DID.
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
32pub struct PeerId(pub Did);
33
34impl PeerId {
35    #[must_use]
36    pub fn as_str(&self) -> &str {
37        self.0.as_str()
38    }
39}
40
41impl std::fmt::Display for PeerId {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        f.write_str(self.as_str())
44    }
45}
46
47/// Metadata describing a known peer (addresses, key hash, reputation).
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct PeerInfo {
50    pub id: PeerId,
51    pub addresses: Vec<String>,
52    pub public_key_hash: Hash256,
53    pub last_seen: Timestamp,
54    pub reputation_score: u32,
55}
56
57/// In-memory registry of known peers in the P2P mesh.
58#[derive(Debug, Clone, Default)]
59pub struct PeerRegistry {
60    pub peers: BTreeMap<PeerId, PeerInfo>,
61}
62
63impl PeerRegistry {
64    /// Create an empty peer registry.
65    #[must_use]
66    pub fn new() -> Self {
67        Self {
68            peers: BTreeMap::new(),
69        }
70    }
71    /// Register a peer, replacing any existing entry with the same ID.
72    pub fn register(&mut self, info: PeerInfo) {
73        self.peers.insert(info.id.clone(), info);
74    }
75    /// Look up a peer by its ID.
76    #[must_use]
77    pub fn get(&self, id: &PeerId) -> Option<&PeerInfo> {
78        self.peers.get(id)
79    }
80    /// Return the number of registered peers.
81    #[must_use]
82    pub fn len(&self) -> usize {
83        self.peers.len()
84    }
85    /// Return `true` if no peers are registered.
86    #[must_use]
87    pub fn is_empty(&self) -> bool {
88        self.peers.is_empty()
89    }
90}
91
92/// A signed peer-to-peer message with optional recipient.
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct Message {
95    pub from: PeerId,
96    pub to: Option<PeerId>,
97    pub payload: Vec<u8>,
98    pub signature: Signature,
99    pub nonce: u64,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
103struct MessageReplayKey {
104    from: PeerId,
105    nonce: u64,
106}
107
108impl MessageReplayKey {
109    fn from_message(msg: &Message) -> Self {
110        Self {
111            from: msg.from.clone(),
112            nonce: msg.nonce,
113        }
114    }
115}
116
117/// Stateful acceptance guard for signed peer-to-peer messages.
118#[derive(Debug, Default)]
119pub struct MessageReplayGuard {
120    seen: BTreeSet<MessageReplayKey>,
121}
122
123impl MessageReplayGuard {
124    /// Maximum accepted messages tracked in a replay window.
125    pub const MAX_TRACKED_MESSAGES: usize = 4096;
126
127    /// Create an empty replay guard.
128    #[must_use]
129    pub fn new() -> Self {
130        Self {
131            seen: BTreeSet::new(),
132        }
133    }
134
135    /// Return the number of messages currently tracked in the replay window.
136    #[must_use]
137    pub fn len(&self) -> usize {
138        self.seen.len()
139    }
140
141    /// Return `true` when no message keys are tracked.
142    #[must_use]
143    pub fn is_empty(&self) -> bool {
144        self.seen.is_empty()
145    }
146
147    /// Reset the replay window.
148    pub fn reset(&mut self) {
149        self.seen.clear();
150    }
151
152    /// Verify a message signature and record its replay key after success.
153    pub fn verify_and_record(
154        &mut self,
155        msg: &Message,
156        sender_public_key: &PublicKey,
157    ) -> Result<()> {
158        validate_message_structure(msg)?;
159        let key = MessageReplayKey::from_message(msg);
160        if self.seen.contains(&key) {
161            return Err(ApiError::ReplayDetected {
162                peer_id: msg.from.to_string(),
163                nonce: msg.nonce,
164            });
165        }
166        if self.seen.len() >= Self::MAX_TRACKED_MESSAGES {
167            return Err(ApiError::RateLimited {
168                peer_id: msg.from.to_string(),
169            });
170        }
171        verify_message_signature(msg, sender_public_key)?;
172        self.seen.insert(key);
173        Ok(())
174    }
175}
176
177/// Rate-limit tracking per peer.
178#[derive(Debug, Default)]
179pub struct RateLimiter {
180    counts: BTreeMap<PeerId, u32>,
181}
182impl RateLimiter {
183    /// Maximum number of distinct peers tracked in one rate-limit window.
184    ///
185    /// Distinct peer IDs are attacker-controlled at the network boundary. Keep
186    /// this bounded so a stream of one-shot IDs cannot grow memory without
187    /// limit before the next window reset.
188    pub const MAX_TRACKED_PEERS: usize = 4096;
189    const MAX_PER_WINDOW: u32 = 100;
190
191    /// Create a new rate limiter with zero counts.
192    #[must_use]
193    pub fn new() -> Self {
194        Self {
195            counts: BTreeMap::new(),
196        }
197    }
198    /// Increment the request count for a peer, returning an error if the limit is exceeded.
199    pub fn check_and_increment(&mut self, peer: &PeerId) -> Result<()> {
200        if !self.counts.contains_key(peer) && self.counts.len() >= Self::MAX_TRACKED_PEERS {
201            return Err(ApiError::RateLimited {
202                peer_id: peer.to_string(),
203            });
204        }
205
206        let c = self.counts.entry(peer.clone()).or_insert(0);
207        if *c >= Self::MAX_PER_WINDOW {
208            return Err(ApiError::RateLimited {
209                peer_id: peer.to_string(),
210            });
211        }
212        *c += 1;
213        Ok(())
214    }
215    /// Reset all peer counters for the next rate-limit window.
216    pub fn reset(&mut self) {
217        self.counts.clear();
218    }
219}
220
221/// Send a message, verifying the recipient exists in the registry (if addressed).
222pub fn send(registry: &PeerRegistry, msg: &Message) -> Result<()> {
223    if let Some(ref to) = msg.to {
224        if !registry.peers.contains_key(to) {
225            return Err(ApiError::PeerNotFound(to.to_string()));
226        }
227    }
228    Ok(())
229}
230
231#[derive(Serialize)]
232struct MessageSigningPayload<'a> {
233    domain: &'static str,
234    from: &'a str,
235    to: Option<&'a str>,
236    payload: &'a [u8],
237    nonce: u64,
238}
239
240fn validate_message_payload_size(msg: &Message) -> Result<()> {
241    if msg.payload.len() > MAX_P2P_MESSAGE_PAYLOAD_BYTES {
242        return Err(ApiError::InvalidSchema {
243            reason: format!(
244                "P2P message payload length {} exceeds maximum {}",
245                msg.payload.len(),
246                MAX_P2P_MESSAGE_PAYLOAD_BYTES
247            ),
248        });
249    }
250    Ok(())
251}
252
253/// Canonical CBOR payload signed by a peer-to-peer message sender.
254///
255/// The domain tag prevents cross-protocol replay, and the payload binds sender,
256/// optional recipient, body bytes, and nonce. The signature field is excluded
257/// so callers can use this function before and after signing.
258pub fn message_signing_payload(msg: &Message) -> Result<Vec<u8>> {
259    validate_message_payload_size(msg)?;
260    let payload = MessageSigningPayload {
261        domain: P2P_MESSAGE_SIGNING_DOMAIN,
262        from: msg.from.0.as_str(),
263        to: msg.to.as_ref().map(|peer| peer.0.as_str()),
264        payload: &msg.payload,
265        nonce: msg.nonce,
266    };
267    let mut encoded = Vec::new();
268    ciborium::into_writer(&payload, &mut encoded)
269        .map_err(|e| ApiError::SerializationError(e.to_string()))?;
270    Ok(encoded)
271}
272
273/// Validate structural integrity of a peer-to-peer message.
274///
275/// Validates:
276/// 1. Signature is not empty / all-zero (rejects [`Signature::Empty`] and zero-filled Ed25519).
277/// 2. Sender DID (`msg.from`) is well-formed.
278///
279/// This helper is intentionally not named `verify`: it does not authenticate
280/// the signature. Use [`verify_message`] for Ed25519 verification.
281pub fn validate_message_structure(msg: &Message) -> Result<()> {
282    validate_message_payload_size(msg)?;
283    // Reject empty / all-zero signatures.
284    if msg.signature.is_empty() {
285        return Err(ApiError::VerificationFailed {
286            reason: "empty or zero signature".into(),
287        });
288    }
289    // Reject malformed sender DID (syntactic check — Does it parse?).
290    if msg.from.0.to_string().is_empty() {
291        return Err(ApiError::VerificationFailed {
292            reason: "sender DID is empty".into(),
293        });
294    }
295    Ok(())
296}
297
298fn verify_message_signature(msg: &Message, sender_public_key: &PublicKey) -> Result<()> {
299    let payload = message_signing_payload(msg)?;
300    if exo_core::crypto::verify(&payload, &msg.signature, sender_public_key) {
301        Ok(())
302    } else {
303        Err(ApiError::VerificationFailed {
304            reason: "invalid Ed25519 signature".into(),
305        })
306    }
307}
308
309/// Verify a peer-to-peer message Ed25519 signature against the sender public key.
310pub fn verify_message(msg: &Message, sender_public_key: &PublicKey) -> Result<()> {
311    validate_message_structure(msg)?;
312    verify_message_signature(msg, sender_public_key)
313}
314
315/// Discover new peers from bootstrap addresses and register them.
316pub fn discover_peers(registry: &mut PeerRegistry, bootstrap: &[String]) -> Result<Vec<PeerId>> {
317    if bootstrap.len() > MAX_BOOTSTRAP_DISCOVERY_PEERS {
318        return Err(ApiError::InvalidSchema {
319            reason: format!(
320                "bootstrap peer list length {} exceeds maximum {}",
321                bootstrap.len(),
322                MAX_BOOTSTRAP_DISCOVERY_PEERS
323            ),
324        });
325    }
326
327    let mut discovered = Vec::with_capacity(bootstrap.len());
328    for addr in bootstrap {
329        let did = Did::new(&format!(
330            "did:exo:peer-{}",
331            blake3::hash(addr.as_bytes()).to_hex()
332        ))
333        .map_err(|e| ApiError::InvalidSchema {
334            reason: e.to_string(),
335        })?;
336        let pid = PeerId(did);
337        if !registry.peers.contains_key(&pid) {
338            registry.register(PeerInfo {
339                id: pid.clone(),
340                addresses: vec![addr.clone()],
341                public_key_hash: Hash256::digest(addr.as_bytes()),
342                last_seen: Timestamp::ZERO,
343                reputation_score: 50,
344            });
345            discovered.push(pid);
346        }
347    }
348    Ok(discovered)
349}
350
351// ---------------------------------------------------------------------------
352// ASN Diversity Enforcement (T-06: Eclipse Attack mitigation)
353// ---------------------------------------------------------------------------
354
355/// Autonomous System Number wrapper.
356#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
357pub struct Asn(pub u32);
358
359/// Extended peer metadata including ASN information.
360#[derive(Debug, Clone, Serialize, Deserialize)]
361pub struct PeerMetadata {
362    pub info: PeerInfo,
363    pub asn: Option<Asn>,
364    pub first_seen: Timestamp,
365    pub last_seen: Timestamp,
366}
367
368/// Policy governing ASN diversity requirements.
369#[derive(Debug, Clone)]
370pub struct AsnPolicy {
371    /// Minimum number of unique ASNs required for a healthy peer set.
372    pub min_unique_asns: usize,
373    /// Maximum number of peers allowed from a single ASN.
374    pub max_peers_per_asn: usize,
375    /// Interval in milliseconds after which peers are considered for rotation.
376    pub rotation_interval_ms: u64,
377}
378
379impl Default for AsnPolicy {
380    fn default() -> Self {
381        Self {
382            min_unique_asns: 3,
383            max_peers_per_asn: 5,
384            rotation_interval_ms: 3_600_000, // 1 hour
385        }
386    }
387}
388
389/// Result of an ASN diversity check.
390#[derive(Debug, Clone, PartialEq, Eq)]
391pub enum DiversityResult {
392    Sufficient,
393    Insufficient {
394        unique_asns: usize,
395        required: usize,
396    },
397    OverrepresentedAsn {
398        asn: Asn,
399        peers: usize,
400        maximum: usize,
401    },
402}
403
404/// Sentinel ASN value used to group peers with no known ASN.
405const UNKNOWN_ASN: Asn = Asn(0);
406
407/// Resolve a peer's ASN for grouping purposes.
408/// Peers with `asn: None` are all placed in the same group ([`UNKNOWN_ASN`]).
409fn effective_asn(peer: &PeerMetadata) -> Asn {
410    peer.asn.unwrap_or(UNKNOWN_ASN)
411}
412
413/// Check whether the peer set meets the ASN diversity threshold.
414#[must_use]
415pub fn check_asn_diversity(peers: &[PeerMetadata], policy: &AsnPolicy) -> DiversityResult {
416    let mut by_asn: BTreeMap<Asn, usize> = BTreeMap::new();
417    for peer in peers {
418        *by_asn.entry(effective_asn(peer)).or_default() += 1;
419    }
420
421    for (asn, peer_count) in &by_asn {
422        if *peer_count > policy.max_peers_per_asn {
423            return DiversityResult::OverrepresentedAsn {
424                asn: *asn,
425                peers: *peer_count,
426                maximum: policy.max_peers_per_asn,
427            };
428        }
429    }
430
431    if by_asn.len() >= policy.min_unique_asns {
432        DiversityResult::Sufficient
433    } else {
434        DiversityResult::Insufficient {
435            unique_asns: by_asn.len(),
436            required: policy.min_unique_asns,
437        }
438    }
439}
440
441/// Select peers that maximise ASN diversity.
442///
443/// Strategy: round-robin across unique ASNs, then fill within each ASN up to
444/// `max_peers_per_asn`, stopping when `max_peers` is reached.
445#[must_use]
446pub fn select_diverse_peers(
447    candidates: &[PeerMetadata],
448    policy: &AsnPolicy,
449    max_peers: usize,
450) -> Vec<PeerMetadata> {
451    let selection_limit = max_peers
452        .min(candidates.len())
453        .min(MAX_DIVERSE_SELECTION_PEERS);
454    if candidates.is_empty() || selection_limit == 0 {
455        return Vec::new();
456    }
457
458    // Group candidates by effective ASN, preserving order within each group.
459    let mut by_asn: BTreeMap<Asn, Vec<&PeerMetadata>> = BTreeMap::new();
460    for c in candidates {
461        by_asn.entry(effective_asn(c)).or_default().push(c);
462    }
463
464    let mut selected: Vec<PeerMetadata> = Vec::with_capacity(selection_limit);
465    let mut round = 0usize;
466
467    loop {
468        if selected.len() >= selection_limit {
469            break;
470        }
471        let mut added_this_round = false;
472        for bucket in by_asn.values() {
473            if selected.len() >= selection_limit {
474                break;
475            }
476            if round >= policy.max_peers_per_asn {
477                continue;
478            }
479            if let Some(peer) = bucket.get(round) {
480                selected.push((*peer).clone());
481                added_this_round = true;
482            }
483        }
484        if !added_this_round {
485            break;
486        }
487        round += 1;
488    }
489
490    selected
491}
492
493/// Return the IDs of peers that have not been seen within `max_age_ms` of `now`.
494#[must_use]
495pub fn identify_stale_peers(
496    peers: &[PeerMetadata],
497    now: &Timestamp,
498    max_age_ms: u64,
499) -> Vec<PeerId> {
500    peers
501        .iter()
502        .filter(|p| {
503            // A peer is stale when its last_seen is older than now - max_age_ms.
504            now.physical_ms
505                .checked_sub(p.last_seen.physical_ms)
506                .is_none_or(|age| age > max_age_ms)
507        })
508        .map(|p| p.info.id.clone())
509        .collect()
510}
511
512/// Evict stale peers from `current` and replace them with diverse candidates.
513///
514/// Returns the [`PeerId`]s of the evicted peers.
515pub fn rotate_peers(
516    current: &mut Vec<PeerMetadata>,
517    candidates: &[PeerMetadata],
518    policy: &AsnPolicy,
519    now: &Timestamp,
520) -> Vec<PeerId> {
521    let stale_ids: BTreeSet<PeerId> =
522        identify_stale_peers(current, now, policy.rotation_interval_ms)
523            .into_iter()
524            .collect();
525
526    let evicted: Vec<PeerId> = stale_ids.iter().cloned().collect();
527
528    // Remove stale peers.
529    current.retain(|p| !stale_ids.contains(&p.info.id));
530
531    // Filter candidates that are not already in current.
532    let current_ids: BTreeSet<PeerId> = current.iter().map(|p| p.info.id.clone()).collect();
533    let fresh_candidates: Vec<PeerMetadata> = candidates
534        .iter()
535        .filter(|c| !current_ids.contains(&c.info.id))
536        .cloned()
537        .collect();
538
539    // Select diverse replacements up to the number of evicted peers.
540    let replacements = select_diverse_peers(&fresh_candidates, policy, evicted.len());
541    current.extend(replacements);
542
543    evicted
544}
545
546#[cfg(test)]
547#[allow(clippy::expect_used, clippy::unwrap_used)]
548mod tests {
549    use super::*;
550    fn pid(n: &str) -> PeerId {
551        PeerId(Did::new(&format!("did:exo:{n}")).unwrap())
552    }
553    fn info(n: &str) -> PeerInfo {
554        PeerInfo {
555            id: pid(n),
556            addresses: vec!["addr".into()],
557            public_key_hash: Hash256::ZERO,
558            last_seen: Timestamp::ZERO,
559            reputation_score: 50,
560        }
561    }
562    fn msg(from: &str, to: Option<&str>) -> Message {
563        let mut sig = [0u8; 64];
564        sig[0] = 1;
565        Message {
566            from: pid(from),
567            to: to.map(pid),
568            payload: b"hello".to_vec(),
569            signature: Signature::from_bytes(sig),
570            nonce: 1,
571        }
572    }
573    fn keypair(seed: u8) -> (PublicKey, exo_core::SecretKey) {
574        let keypair = exo_core::crypto::KeyPair::from_secret_bytes([seed; 32]).expect("keypair");
575        (*keypair.public_key(), keypair.secret_key().clone())
576    }
577    fn signed_msg(from: &str, to: Option<&str>, seed: u8) -> (Message, PublicKey) {
578        let (public_key, secret_key) = keypair(seed);
579        let mut message = Message {
580            from: pid(from),
581            to: to.map(pid),
582            payload: b"hello".to_vec(),
583            signature: Signature::Empty,
584            nonce: 1,
585        };
586        sign_message(&mut message, &secret_key);
587        (message, public_key)
588    }
589    fn sign_message(message: &mut Message, secret_key: &exo_core::SecretKey) {
590        let payload = message_signing_payload(message).expect("signing payload");
591        message.signature = exo_core::crypto::sign(&payload, secret_key);
592    }
593
594    #[test]
595    fn registry_empty() {
596        let r = PeerRegistry::new();
597        assert!(r.is_empty());
598        assert_eq!(r.len(), 0);
599    }
600    #[test]
601    fn registry_register() {
602        let mut r = PeerRegistry::new();
603        r.register(info("a"));
604        assert_eq!(r.len(), 1);
605        assert!(r.get(&pid("a")).is_some());
606    }
607    #[test]
608    fn registry_default() {
609        assert!(PeerRegistry::default().is_empty());
610    }
611    #[test]
612    fn send_known_peer() {
613        let mut r = PeerRegistry::new();
614        r.register(info("b"));
615        assert!(send(&r, &msg("a", Some("b"))).is_ok());
616    }
617    #[test]
618    fn send_unknown_peer() {
619        let r = PeerRegistry::new();
620        let err = send(&r, &msg("a", Some("b"))).unwrap_err();
621        assert!(matches!(
622            err,
623            ApiError::PeerNotFound(peer_id) if peer_id == "did:exo:b"
624        ));
625    }
626    #[test]
627    fn send_broadcast() {
628        let r = PeerRegistry::new();
629        assert!(send(&r, &msg("a", None)).is_ok());
630    }
631    #[test]
632    fn message_signing_payload_is_domain_separated_and_deterministic() {
633        let (message, _) = signed_msg("a", Some("b"), 7);
634        let first = message_signing_payload(&message).expect("payload");
635        let second = message_signing_payload(&message).expect("payload");
636        assert_eq!(first, second);
637
638        #[derive(Deserialize)]
639        struct DecodedPayload {
640            domain: String,
641        }
642        let decoded: DecodedPayload = ciborium::from_reader(&first[..]).expect("decode");
643        assert_eq!(decoded.domain, "exo.p2p.message.v1");
644    }
645    #[test]
646    fn verify_message_accepts_correct_signature() {
647        let (message, public_key) = signed_msg("a", None, 7);
648        assert!(verify_message(&message, &public_key).is_ok());
649    }
650    #[test]
651    fn verify_message_rejects_oversized_payload_before_signature_work() {
652        let (public_key, _) = keypair(7);
653        let oversized = Message {
654            from: pid("a"),
655            to: None,
656            payload: vec![0xA5; MAX_P2P_MESSAGE_PAYLOAD_BYTES + 1],
657            signature: Signature::from_bytes([1u8; 64]),
658            nonce: 42,
659        };
660
661        let err = verify_message(&oversized, &public_key).unwrap_err();
662
663        assert!(matches!(
664            err,
665            ApiError::InvalidSchema { reason } if reason.contains("payload")
666                && reason.contains(&MAX_P2P_MESSAGE_PAYLOAD_BYTES.to_string())
667        ));
668    }
669    #[test]
670    fn message_signing_payload_rejects_oversized_payload_before_cbor_serialization() {
671        let mut message = msg("a", None);
672        message.payload = vec![0xA5; MAX_P2P_MESSAGE_PAYLOAD_BYTES + 1];
673
674        let err = message_signing_payload(&message).unwrap_err();
675
676        assert!(matches!(
677            err,
678            ApiError::InvalidSchema { reason } if reason.contains("payload")
679                && reason.contains(&MAX_P2P_MESSAGE_PAYLOAD_BYTES.to_string())
680        ));
681    }
682    #[test]
683    fn message_replay_guard_rejects_duplicate_signed_message_without_recording_tamper() {
684        let (message, public_key) = signed_msg("a", Some("b"), 7);
685        let mut guard = MessageReplayGuard::new();
686
687        assert!(guard.verify_and_record(&message, &public_key).is_ok());
688        let replay = guard.verify_and_record(&message, &public_key).unwrap_err();
689
690        assert!(matches!(
691            replay,
692            ApiError::ReplayDetected { peer_id, nonce }
693                if peer_id == "did:exo:a" && nonce == message.nonce
694        ));
695
696        let mut tampered = message.clone();
697        tampered.nonce += 1;
698        tampered.payload = b"tampered".to_vec();
699        let tamper_err = guard.verify_and_record(&tampered, &public_key).unwrap_err();
700        assert!(matches!(
701            tamper_err,
702            ApiError::VerificationFailed { reason } if reason == "invalid Ed25519 signature"
703        ));
704        assert_eq!(
705            guard.len(),
706            1,
707            "failed verifications must not grow replay state"
708        );
709    }
710    #[test]
711    fn message_replay_guard_rejects_reused_sender_nonce_with_new_valid_payload() {
712        let (public_key, secret_key) = keypair(7);
713        let mut first = Message {
714            from: pid("a"),
715            to: Some(pid("b")),
716            payload: b"hello".to_vec(),
717            signature: Signature::Empty,
718            nonce: 7,
719        };
720        sign_message(&mut first, &secret_key);
721        let mut second = first.clone();
722        second.payload = b"new signed payload".to_vec();
723        second.signature = Signature::Empty;
724        sign_message(&mut second, &secret_key);
725        let mut guard = MessageReplayGuard::new();
726
727        assert!(guard.verify_and_record(&first, &public_key).is_ok());
728        let replay = guard.verify_and_record(&second, &public_key).unwrap_err();
729
730        assert!(matches!(
731            replay,
732            ApiError::ReplayDetected { peer_id, nonce } if peer_id == "did:exo:a" && nonce == 7
733        ));
734        assert_eq!(guard.len(), 1);
735    }
736    #[test]
737    fn message_replay_guard_bounds_tracked_state_and_can_reset() {
738        let (message, public_key) = signed_msg("a", Some("b"), 7);
739        let mut guard = MessageReplayGuard::new();
740        for nonce in 0..MessageReplayGuard::MAX_TRACKED_MESSAGES {
741            guard.seen.insert(MessageReplayKey {
742                from: pid("filled"),
743                nonce: u64::try_from(nonce).unwrap(),
744            });
745        }
746
747        let err = guard.verify_and_record(&message, &public_key).unwrap_err();
748
749        assert!(matches!(
750            err,
751            ApiError::RateLimited { peer_id } if peer_id == "did:exo:a"
752        ));
753        assert_eq!(guard.len(), MessageReplayGuard::MAX_TRACKED_MESSAGES);
754
755        guard.reset();
756        assert!(guard.is_empty());
757        assert!(guard.verify_and_record(&message, &public_key).is_ok());
758    }
759    #[test]
760    fn verify_message_rejects_empty_and_zero_signatures() {
761        let (public_key, _) = keypair(7);
762        let m = Message {
763            from: pid("a"),
764            to: None,
765            payload: vec![],
766            signature: Signature::Empty,
767            nonce: 0,
768        };
769        assert!(verify_message(&m, &public_key).is_err());
770
771        let m = Message {
772            from: pid("a"),
773            to: None,
774            payload: vec![],
775            signature: Signature::from_bytes([0u8; 64]),
776            nonce: 0,
777        };
778        assert!(verify_message(&m, &public_key).is_err());
779    }
780    #[test]
781    fn verify_message_rejects_fake_non_empty_signature() {
782        let (public_key, _) = keypair(7);
783        assert!(verify_message(&msg("a", None), &public_key).is_err());
784    }
785    #[test]
786    fn verify_message_rejects_wrong_key() {
787        let (message, _) = signed_msg("a", Some("b"), 7);
788        let (wrong_public_key, _) = keypair(8);
789        assert!(verify_message(&message, &wrong_public_key).is_err());
790    }
791    #[test]
792    fn verify_message_rejects_tampering() {
793        let (message, public_key) = signed_msg("a", Some("b"), 7);
794
795        let mut tampered_payload = message.clone();
796        tampered_payload.payload = b"goodbye".to_vec();
797        assert!(verify_message(&tampered_payload, &public_key).is_err());
798
799        let mut tampered_recipient = message.clone();
800        tampered_recipient.to = Some(pid("c"));
801        assert!(verify_message(&tampered_recipient, &public_key).is_err());
802
803        let mut tampered_nonce = message.clone();
804        tampered_nonce.nonce = 2;
805        assert!(verify_message(&tampered_nonce, &public_key).is_err());
806
807        let mut tampered_sender = message.clone();
808        tampered_sender.from = pid("z");
809        assert!(verify_message(&tampered_sender, &public_key).is_err());
810    }
811    #[test]
812    fn validate_message_structure_keeps_non_crypto_checks_separate() {
813        assert!(validate_message_structure(&msg("a", None)).is_ok());
814    }
815    #[test]
816    fn discover() {
817        let mut r = PeerRegistry::new();
818        let d = discover_peers(&mut r, &["addr1".into(), "addr2".into()]).unwrap();
819        assert_eq!(d.len(), 2);
820        assert_eq!(r.len(), 2);
821    }
822    #[test]
823    fn discover_no_dupes() {
824        let mut r = PeerRegistry::new();
825        discover_peers(&mut r, &["a".into()]).unwrap();
826        let d = discover_peers(&mut r, &["a".into()]).unwrap();
827        assert!(d.is_empty());
828    }
829    #[test]
830    fn discover_peers_rejects_oversized_bootstrap_before_mutating_registry() {
831        const MAX_BOOTSTRAP_PEERS: usize = 4_096;
832        let bootstrap: Vec<String> = (0..(MAX_BOOTSTRAP_PEERS + 1))
833            .map(|i| format!("addr-{i}"))
834            .collect();
835        let mut registry = PeerRegistry::new();
836
837        let err = discover_peers(&mut registry, &bootstrap).unwrap_err();
838
839        assert!(matches!(err, ApiError::InvalidSchema { .. }));
840        assert_eq!(registry.len(), 0);
841    }
842    #[test]
843    fn rate_limiter() {
844        let mut rl = RateLimiter::new();
845        for _ in 0..100 {
846            rl.check_and_increment(&pid("a")).unwrap();
847        }
848        let err = rl.check_and_increment(&pid("a")).unwrap_err();
849        assert!(matches!(
850            err,
851            ApiError::RateLimited { peer_id } if peer_id == "did:exo:a"
852        ));
853    }
854    #[test]
855    fn rate_limiter_reset() {
856        let mut rl = RateLimiter::new();
857        for _ in 0..100 {
858            rl.check_and_increment(&pid("a")).unwrap();
859        }
860        rl.reset();
861        assert!(rl.check_and_increment(&pid("a")).is_ok());
862    }
863    #[test]
864    fn rate_limiter_bounds_distinct_peer_tracking() {
865        let mut rl = RateLimiter::new();
866        for i in 0..RateLimiter::MAX_TRACKED_PEERS {
867            rl.check_and_increment(&pid(&format!("peer-{i}"))).unwrap();
868        }
869
870        assert_eq!(rl.counts.len(), RateLimiter::MAX_TRACKED_PEERS);
871        let err = rl.check_and_increment(&pid("overflow-peer")).unwrap_err();
872        assert!(matches!(
873            err,
874            ApiError::RateLimited { peer_id } if peer_id == "did:exo:overflow-peer"
875        ));
876        assert_eq!(
877            rl.counts.len(),
878            RateLimiter::MAX_TRACKED_PEERS,
879            "refused peers must not grow the limiter state"
880        );
881        assert!(rl.check_and_increment(&pid("peer-0")).is_ok());
882    }
883    #[test]
884    fn p2p_error_peer_labels_do_not_depend_on_debug_formatting() {
885        let source = include_str!("p2p.rs");
886        let production = source
887            .split("#[cfg(test)]")
888            .next()
889            .expect("production section");
890        assert!(
891            !production.contains("format!(\"{:?}\", peer)"),
892            "P2P rate-limit errors must use stable peer labels"
893        );
894        assert!(
895            !production.contains("format!(\"{to:?}\")"),
896            "P2P peer lookup errors must use stable peer labels"
897        );
898    }
899    #[test]
900    fn peer_id_ord() {
901        assert!(pid("a") < pid("b"));
902    }
903    #[test]
904    fn message_serde() {
905        let m = msg("a", Some("b"));
906        let j = serde_json::to_string(&m).unwrap();
907        assert!(!j.is_empty());
908    }
909
910    // -- ASN diversity helpers -----------------------------------------------
911
912    fn peer_meta(name: &str, asn: Option<u32>, last_seen_ms: u64) -> PeerMetadata {
913        PeerMetadata {
914            info: info(name),
915            asn: asn.map(Asn),
916            first_seen: Timestamp::ZERO,
917            last_seen: Timestamp::new(last_seen_ms, 0),
918        }
919    }
920
921    // 1. All peers from a single ASN → Insufficient
922    #[test]
923    fn reject_single_asn_peer_set() {
924        let peers: Vec<PeerMetadata> = (0..5)
925            .map(|i| peer_meta(&format!("p{i}"), Some(64512), 1000))
926            .collect();
927        let policy = AsnPolicy::default();
928        assert_eq!(
929            check_asn_diversity(&peers, &policy),
930            DiversityResult::Insufficient {
931                unique_asns: 1,
932                required: 3
933            }
934        );
935    }
936
937    // 2. Peers from 3+ ASNs → Sufficient
938    #[test]
939    fn accept_diverse_peer_set() {
940        let peers = vec![
941            peer_meta("a", Some(100), 1000),
942            peer_meta("b", Some(200), 1000),
943            peer_meta("c", Some(300), 1000),
944        ];
945        let policy = AsnPolicy::default();
946        assert_eq!(
947            check_asn_diversity(&peers, &policy),
948            DiversityResult::Sufficient
949        );
950    }
951
952    #[test]
953    fn reject_peer_set_exceeding_max_peers_per_asn_even_when_unique_asns_sufficient() {
954        let mut peers: Vec<PeerMetadata> = (0..6)
955            .map(|i| peer_meta(&format!("attacker-{i}"), Some(64512), 1000))
956            .collect();
957        peers.push(peer_meta("honest-a", Some(64513), 1000));
958        peers.push(peer_meta("honest-b", Some(64514), 1000));
959        let policy = AsnPolicy::default();
960
961        assert_eq!(
962            check_asn_diversity(&peers, &policy),
963            DiversityResult::OverrepresentedAsn {
964                asn: Asn(64512),
965                peers: 6,
966                maximum: 5
967            }
968        );
969    }
970
971    // 3. 10 peers from same ASN, max_peers_per_asn=5 → only 5 selected
972    #[test]
973    fn max_peers_per_asn_enforced() {
974        let candidates: Vec<PeerMetadata> = (0..10)
975            .map(|i| peer_meta(&format!("p{i}"), Some(64512), 1000))
976            .collect();
977        let policy = AsnPolicy {
978            max_peers_per_asn: 5,
979            ..AsnPolicy::default()
980        };
981        let selected = select_diverse_peers(&candidates, &policy, 20);
982        assert_eq!(selected.len(), 5);
983    }
984
985    // 4. Candidates from 4 ASNs, max 8 → 2 per ASN (round-robin)
986    #[test]
987    fn select_diverse_round_robin() {
988        let mut candidates = Vec::new();
989        for asn in [10, 20, 30, 40] {
990            for i in 0..4 {
991                candidates.push(peer_meta(&format!("asn{asn}-p{i}"), Some(asn), 1000));
992            }
993        }
994        let policy = AsnPolicy::default();
995        let selected = select_diverse_peers(&candidates, &policy, 8);
996        assert_eq!(selected.len(), 8);
997        // Count per ASN — each should have exactly 2
998        let mut counts: BTreeMap<u32, usize> = BTreeMap::new();
999        for p in &selected {
1000            *counts.entry(p.asn.unwrap().0).or_default() += 1;
1001        }
1002        for &c in counts.values() {
1003            assert_eq!(c, 2);
1004        }
1005    }
1006
1007    #[test]
1008    fn select_diverse_peers_clamps_untrusted_max_peers_before_allocating() {
1009        const EXPECTED_SELECTION_LIMIT: usize = 4_096;
1010        let candidates: Vec<PeerMetadata> = (0..(EXPECTED_SELECTION_LIMIT + 1))
1011            .map(|i| {
1012                peer_meta(
1013                    &format!("peer-{i}"),
1014                    Some(u32::try_from(i + 1).unwrap()),
1015                    1000,
1016                )
1017            })
1018            .collect();
1019        let policy = AsnPolicy {
1020            max_peers_per_asn: 1,
1021            ..AsnPolicy::default()
1022        };
1023
1024        let selected = select_diverse_peers(&candidates, &policy, usize::MAX);
1025
1026        assert_eq!(selected.len(), EXPECTED_SELECTION_LIMIT);
1027        assert_eq!(selected.first().unwrap().info.id, candidates[0].info.id);
1028        assert_eq!(
1029            selected.last().unwrap().info.id,
1030            candidates[EXPECTED_SELECTION_LIMIT - 1].info.id
1031        );
1032    }
1033
1034    // 5. Stale peers identified correctly
1035    #[test]
1036    fn identify_stale_peers_test() {
1037        let now = Timestamp::new(10_000_000, 0);
1038        let max_age_ms = 7_200_000; // 2 hours
1039        let peers = vec![
1040            peer_meta("fresh", Some(1), 9_000_000), // seen 1 s ago, not stale
1041            peer_meta("stale1", Some(2), 1_000_000), // seen 9000 s ago, stale
1042            peer_meta("stale2", Some(3), 2_000_000), // seen 8000 s ago, stale
1043        ];
1044        let stale = identify_stale_peers(&peers, &now, max_age_ms);
1045        assert_eq!(stale.len(), 2);
1046        assert!(stale.contains(&pid("stale1")));
1047        assert!(stale.contains(&pid("stale2")));
1048    }
1049
1050    #[test]
1051    fn identify_stale_peers_treats_future_last_seen_as_stale() {
1052        let now = Timestamp::new(10_000_000, 0);
1053        let peers = vec![peer_meta("future", Some(1), 10_001_000)];
1054
1055        let stale = identify_stale_peers(&peers, &now, 7_200_000);
1056
1057        assert_eq!(stale, vec![pid("future")]);
1058    }
1059
1060    // 6. Stale peers evicted and replaced with diverse candidates
1061    #[test]
1062    fn rotate_replaces_stale_with_diverse() {
1063        let now = Timestamp::new(10_000_000, 0);
1064        let policy = AsnPolicy {
1065            rotation_interval_ms: 3_600_000,
1066            ..AsnPolicy::default()
1067        };
1068        let mut current = vec![
1069            peer_meta("keep", Some(100), 9_000_000), // fresh
1070            peer_meta("old1", Some(100), 1_000_000), // stale
1071            peer_meta("old2", Some(100), 2_000_000), // stale
1072        ];
1073        let candidates = vec![
1074            peer_meta("new1", Some(200), 9_500_000),
1075            peer_meta("new2", Some(300), 9_500_000),
1076        ];
1077        let evicted = rotate_peers(&mut current, &candidates, &policy, &now);
1078        assert_eq!(evicted.len(), 2);
1079        assert!(evicted.contains(&pid("old1")));
1080        assert!(evicted.contains(&pid("old2")));
1081        // current should now have: keep + up to 2 replacements
1082        assert!(current.len() >= 2 && current.len() <= 3);
1083        let ids: Vec<PeerId> = current.iter().map(|p| p.info.id.clone()).collect();
1084        assert!(ids.contains(&pid("keep")));
1085    }
1086
1087    // 7. Empty candidates → no crash
1088    #[test]
1089    fn empty_candidates_no_crash() {
1090        let selected = select_diverse_peers(&[], &AsnPolicy::default(), 10);
1091        assert!(selected.is_empty());
1092        let stale = identify_stale_peers(&[], &Timestamp::ZERO, 1000);
1093        assert!(stale.is_empty());
1094    }
1095
1096    // 8. Peers with asn: None grouped together
1097    #[test]
1098    fn no_asn_peers_treated_as_same_group() {
1099        let peers = vec![
1100            peer_meta("a", None, 1000),
1101            peer_meta("b", None, 1000),
1102            peer_meta("c", None, 1000),
1103        ];
1104        let policy = AsnPolicy::default();
1105        // All None → single effective ASN group → 1 unique ASN
1106        assert_eq!(
1107            check_asn_diversity(&peers, &policy),
1108            DiversityResult::Insufficient {
1109                unique_asns: 1,
1110                required: 3
1111            }
1112        );
1113        // select_diverse_peers respects max_peers_per_asn for the None group
1114        let policy2 = AsnPolicy {
1115            max_peers_per_asn: 2,
1116            ..AsnPolicy::default()
1117        };
1118        let selected = select_diverse_peers(&peers, &policy2, 10);
1119        assert_eq!(selected.len(), 2);
1120    }
1121
1122    // 9. Policy default values
1123    #[test]
1124    fn policy_default_values() {
1125        let p = AsnPolicy::default();
1126        assert_eq!(p.min_unique_asns, 3);
1127        assert_eq!(p.max_peers_per_asn, 5);
1128        assert_eq!(p.rotation_interval_ms, 3_600_000);
1129    }
1130}