1use std::collections::{BTreeMap, BTreeSet};
19
20use exo_core::{Did, Hash256, PublicKey, Signature, Timestamp};
21use serde::{Deserialize, Serialize};
22
23use crate::error::{ApiError, Result};
24
25const P2P_MESSAGE_SIGNING_DOMAIN: &str = "exo.p2p.message.v1";
26pub const MAX_P2P_MESSAGE_PAYLOAD_BYTES: usize = 64 * 1024;
27const MAX_BOOTSTRAP_DISCOVERY_PEERS: usize = 4_096;
28const MAX_DIVERSE_SELECTION_PEERS: usize = 4_096;
29
30#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
32pub struct PeerId(pub Did);
33
34impl PeerId {
35 #[must_use]
36 pub fn as_str(&self) -> &str {
37 self.0.as_str()
38 }
39}
40
41impl std::fmt::Display for PeerId {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 f.write_str(self.as_str())
44 }
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct PeerInfo {
50 pub id: PeerId,
51 pub addresses: Vec<String>,
52 pub public_key_hash: Hash256,
53 pub last_seen: Timestamp,
54 pub reputation_score: u32,
55}
56
57#[derive(Debug, Clone, Default)]
59pub struct PeerRegistry {
60 pub peers: BTreeMap<PeerId, PeerInfo>,
61}
62
63impl PeerRegistry {
64 #[must_use]
66 pub fn new() -> Self {
67 Self {
68 peers: BTreeMap::new(),
69 }
70 }
71 pub fn register(&mut self, info: PeerInfo) {
73 self.peers.insert(info.id.clone(), info);
74 }
75 #[must_use]
77 pub fn get(&self, id: &PeerId) -> Option<&PeerInfo> {
78 self.peers.get(id)
79 }
80 #[must_use]
82 pub fn len(&self) -> usize {
83 self.peers.len()
84 }
85 #[must_use]
87 pub fn is_empty(&self) -> bool {
88 self.peers.is_empty()
89 }
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct Message {
95 pub from: PeerId,
96 pub to: Option<PeerId>,
97 pub payload: Vec<u8>,
98 pub signature: Signature,
99 pub nonce: u64,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
103struct MessageReplayKey {
104 from: PeerId,
105 nonce: u64,
106}
107
108impl MessageReplayKey {
109 fn from_message(msg: &Message) -> Self {
110 Self {
111 from: msg.from.clone(),
112 nonce: msg.nonce,
113 }
114 }
115}
116
117#[derive(Debug, Default)]
119pub struct MessageReplayGuard {
120 seen: BTreeSet<MessageReplayKey>,
121}
122
123impl MessageReplayGuard {
124 pub const MAX_TRACKED_MESSAGES: usize = 4096;
126
127 #[must_use]
129 pub fn new() -> Self {
130 Self {
131 seen: BTreeSet::new(),
132 }
133 }
134
135 #[must_use]
137 pub fn len(&self) -> usize {
138 self.seen.len()
139 }
140
141 #[must_use]
143 pub fn is_empty(&self) -> bool {
144 self.seen.is_empty()
145 }
146
147 pub fn reset(&mut self) {
149 self.seen.clear();
150 }
151
152 pub fn verify_and_record(
154 &mut self,
155 msg: &Message,
156 sender_public_key: &PublicKey,
157 ) -> Result<()> {
158 validate_message_structure(msg)?;
159 let key = MessageReplayKey::from_message(msg);
160 if self.seen.contains(&key) {
161 return Err(ApiError::ReplayDetected {
162 peer_id: msg.from.to_string(),
163 nonce: msg.nonce,
164 });
165 }
166 if self.seen.len() >= Self::MAX_TRACKED_MESSAGES {
167 return Err(ApiError::RateLimited {
168 peer_id: msg.from.to_string(),
169 });
170 }
171 verify_message_signature(msg, sender_public_key)?;
172 self.seen.insert(key);
173 Ok(())
174 }
175}
176
177#[derive(Debug, Default)]
179pub struct RateLimiter {
180 counts: BTreeMap<PeerId, u32>,
181}
182impl RateLimiter {
183 pub const MAX_TRACKED_PEERS: usize = 4096;
189 const MAX_PER_WINDOW: u32 = 100;
190
191 #[must_use]
193 pub fn new() -> Self {
194 Self {
195 counts: BTreeMap::new(),
196 }
197 }
198 pub fn check_and_increment(&mut self, peer: &PeerId) -> Result<()> {
200 if !self.counts.contains_key(peer) && self.counts.len() >= Self::MAX_TRACKED_PEERS {
201 return Err(ApiError::RateLimited {
202 peer_id: peer.to_string(),
203 });
204 }
205
206 let c = self.counts.entry(peer.clone()).or_insert(0);
207 if *c >= Self::MAX_PER_WINDOW {
208 return Err(ApiError::RateLimited {
209 peer_id: peer.to_string(),
210 });
211 }
212 *c += 1;
213 Ok(())
214 }
215 pub fn reset(&mut self) {
217 self.counts.clear();
218 }
219}
220
221pub fn send(registry: &PeerRegistry, msg: &Message) -> Result<()> {
223 if let Some(ref to) = msg.to {
224 if !registry.peers.contains_key(to) {
225 return Err(ApiError::PeerNotFound(to.to_string()));
226 }
227 }
228 Ok(())
229}
230
231#[derive(Serialize)]
232struct MessageSigningPayload<'a> {
233 domain: &'static str,
234 from: &'a str,
235 to: Option<&'a str>,
236 payload: &'a [u8],
237 nonce: u64,
238}
239
240fn validate_message_payload_size(msg: &Message) -> Result<()> {
241 if msg.payload.len() > MAX_P2P_MESSAGE_PAYLOAD_BYTES {
242 return Err(ApiError::InvalidSchema {
243 reason: format!(
244 "P2P message payload length {} exceeds maximum {}",
245 msg.payload.len(),
246 MAX_P2P_MESSAGE_PAYLOAD_BYTES
247 ),
248 });
249 }
250 Ok(())
251}
252
253pub fn message_signing_payload(msg: &Message) -> Result<Vec<u8>> {
259 validate_message_payload_size(msg)?;
260 let payload = MessageSigningPayload {
261 domain: P2P_MESSAGE_SIGNING_DOMAIN,
262 from: msg.from.0.as_str(),
263 to: msg.to.as_ref().map(|peer| peer.0.as_str()),
264 payload: &msg.payload,
265 nonce: msg.nonce,
266 };
267 let mut encoded = Vec::new();
268 ciborium::into_writer(&payload, &mut encoded)
269 .map_err(|e| ApiError::SerializationError(e.to_string()))?;
270 Ok(encoded)
271}
272
273pub fn validate_message_structure(msg: &Message) -> Result<()> {
282 validate_message_payload_size(msg)?;
283 if msg.signature.is_empty() {
285 return Err(ApiError::VerificationFailed {
286 reason: "empty or zero signature".into(),
287 });
288 }
289 if msg.from.0.to_string().is_empty() {
291 return Err(ApiError::VerificationFailed {
292 reason: "sender DID is empty".into(),
293 });
294 }
295 Ok(())
296}
297
298fn verify_message_signature(msg: &Message, sender_public_key: &PublicKey) -> Result<()> {
299 let payload = message_signing_payload(msg)?;
300 if exo_core::crypto::verify(&payload, &msg.signature, sender_public_key) {
301 Ok(())
302 } else {
303 Err(ApiError::VerificationFailed {
304 reason: "invalid Ed25519 signature".into(),
305 })
306 }
307}
308
309pub fn verify_message(msg: &Message, sender_public_key: &PublicKey) -> Result<()> {
311 validate_message_structure(msg)?;
312 verify_message_signature(msg, sender_public_key)
313}
314
315pub fn discover_peers(registry: &mut PeerRegistry, bootstrap: &[String]) -> Result<Vec<PeerId>> {
317 if bootstrap.len() > MAX_BOOTSTRAP_DISCOVERY_PEERS {
318 return Err(ApiError::InvalidSchema {
319 reason: format!(
320 "bootstrap peer list length {} exceeds maximum {}",
321 bootstrap.len(),
322 MAX_BOOTSTRAP_DISCOVERY_PEERS
323 ),
324 });
325 }
326
327 let mut discovered = Vec::with_capacity(bootstrap.len());
328 for addr in bootstrap {
329 let did = Did::new(&format!(
330 "did:exo:peer-{}",
331 blake3::hash(addr.as_bytes()).to_hex()
332 ))
333 .map_err(|e| ApiError::InvalidSchema {
334 reason: e.to_string(),
335 })?;
336 let pid = PeerId(did);
337 if !registry.peers.contains_key(&pid) {
338 registry.register(PeerInfo {
339 id: pid.clone(),
340 addresses: vec![addr.clone()],
341 public_key_hash: Hash256::digest(addr.as_bytes()),
342 last_seen: Timestamp::ZERO,
343 reputation_score: 50,
344 });
345 discovered.push(pid);
346 }
347 }
348 Ok(discovered)
349}
350
351#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
357pub struct Asn(pub u32);
358
359#[derive(Debug, Clone, Serialize, Deserialize)]
361pub struct PeerMetadata {
362 pub info: PeerInfo,
363 pub asn: Option<Asn>,
364 pub first_seen: Timestamp,
365 pub last_seen: Timestamp,
366}
367
368#[derive(Debug, Clone)]
370pub struct AsnPolicy {
371 pub min_unique_asns: usize,
373 pub max_peers_per_asn: usize,
375 pub rotation_interval_ms: u64,
377}
378
379impl Default for AsnPolicy {
380 fn default() -> Self {
381 Self {
382 min_unique_asns: 3,
383 max_peers_per_asn: 5,
384 rotation_interval_ms: 3_600_000, }
386 }
387}
388
389#[derive(Debug, Clone, PartialEq, Eq)]
391pub enum DiversityResult {
392 Sufficient,
393 Insufficient {
394 unique_asns: usize,
395 required: usize,
396 },
397 OverrepresentedAsn {
398 asn: Asn,
399 peers: usize,
400 maximum: usize,
401 },
402}
403
404const UNKNOWN_ASN: Asn = Asn(0);
406
407fn effective_asn(peer: &PeerMetadata) -> Asn {
410 peer.asn.unwrap_or(UNKNOWN_ASN)
411}
412
413#[must_use]
415pub fn check_asn_diversity(peers: &[PeerMetadata], policy: &AsnPolicy) -> DiversityResult {
416 let mut by_asn: BTreeMap<Asn, usize> = BTreeMap::new();
417 for peer in peers {
418 *by_asn.entry(effective_asn(peer)).or_default() += 1;
419 }
420
421 for (asn, peer_count) in &by_asn {
422 if *peer_count > policy.max_peers_per_asn {
423 return DiversityResult::OverrepresentedAsn {
424 asn: *asn,
425 peers: *peer_count,
426 maximum: policy.max_peers_per_asn,
427 };
428 }
429 }
430
431 if by_asn.len() >= policy.min_unique_asns {
432 DiversityResult::Sufficient
433 } else {
434 DiversityResult::Insufficient {
435 unique_asns: by_asn.len(),
436 required: policy.min_unique_asns,
437 }
438 }
439}
440
441#[must_use]
446pub fn select_diverse_peers(
447 candidates: &[PeerMetadata],
448 policy: &AsnPolicy,
449 max_peers: usize,
450) -> Vec<PeerMetadata> {
451 let selection_limit = max_peers
452 .min(candidates.len())
453 .min(MAX_DIVERSE_SELECTION_PEERS);
454 if candidates.is_empty() || selection_limit == 0 {
455 return Vec::new();
456 }
457
458 let mut by_asn: BTreeMap<Asn, Vec<&PeerMetadata>> = BTreeMap::new();
460 for c in candidates {
461 by_asn.entry(effective_asn(c)).or_default().push(c);
462 }
463
464 let mut selected: Vec<PeerMetadata> = Vec::with_capacity(selection_limit);
465 let mut round = 0usize;
466
467 loop {
468 if selected.len() >= selection_limit {
469 break;
470 }
471 let mut added_this_round = false;
472 for bucket in by_asn.values() {
473 if selected.len() >= selection_limit {
474 break;
475 }
476 if round >= policy.max_peers_per_asn {
477 continue;
478 }
479 if let Some(peer) = bucket.get(round) {
480 selected.push((*peer).clone());
481 added_this_round = true;
482 }
483 }
484 if !added_this_round {
485 break;
486 }
487 round += 1;
488 }
489
490 selected
491}
492
493#[must_use]
495pub fn identify_stale_peers(
496 peers: &[PeerMetadata],
497 now: &Timestamp,
498 max_age_ms: u64,
499) -> Vec<PeerId> {
500 peers
501 .iter()
502 .filter(|p| {
503 now.physical_ms
505 .checked_sub(p.last_seen.physical_ms)
506 .is_none_or(|age| age > max_age_ms)
507 })
508 .map(|p| p.info.id.clone())
509 .collect()
510}
511
512pub fn rotate_peers(
516 current: &mut Vec<PeerMetadata>,
517 candidates: &[PeerMetadata],
518 policy: &AsnPolicy,
519 now: &Timestamp,
520) -> Vec<PeerId> {
521 let stale_ids: BTreeSet<PeerId> =
522 identify_stale_peers(current, now, policy.rotation_interval_ms)
523 .into_iter()
524 .collect();
525
526 let evicted: Vec<PeerId> = stale_ids.iter().cloned().collect();
527
528 current.retain(|p| !stale_ids.contains(&p.info.id));
530
531 let current_ids: BTreeSet<PeerId> = current.iter().map(|p| p.info.id.clone()).collect();
533 let fresh_candidates: Vec<PeerMetadata> = candidates
534 .iter()
535 .filter(|c| !current_ids.contains(&c.info.id))
536 .cloned()
537 .collect();
538
539 let replacements = select_diverse_peers(&fresh_candidates, policy, evicted.len());
541 current.extend(replacements);
542
543 evicted
544}
545
546#[cfg(test)]
547#[allow(clippy::expect_used, clippy::unwrap_used)]
548mod tests {
549 use super::*;
550 fn pid(n: &str) -> PeerId {
551 PeerId(Did::new(&format!("did:exo:{n}")).unwrap())
552 }
553 fn info(n: &str) -> PeerInfo {
554 PeerInfo {
555 id: pid(n),
556 addresses: vec!["addr".into()],
557 public_key_hash: Hash256::ZERO,
558 last_seen: Timestamp::ZERO,
559 reputation_score: 50,
560 }
561 }
562 fn msg(from: &str, to: Option<&str>) -> Message {
563 let mut sig = [0u8; 64];
564 sig[0] = 1;
565 Message {
566 from: pid(from),
567 to: to.map(pid),
568 payload: b"hello".to_vec(),
569 signature: Signature::from_bytes(sig),
570 nonce: 1,
571 }
572 }
573 fn keypair(seed: u8) -> (PublicKey, exo_core::SecretKey) {
574 let keypair = exo_core::crypto::KeyPair::from_secret_bytes([seed; 32]).expect("keypair");
575 (*keypair.public_key(), keypair.secret_key().clone())
576 }
577 fn signed_msg(from: &str, to: Option<&str>, seed: u8) -> (Message, PublicKey) {
578 let (public_key, secret_key) = keypair(seed);
579 let mut message = Message {
580 from: pid(from),
581 to: to.map(pid),
582 payload: b"hello".to_vec(),
583 signature: Signature::Empty,
584 nonce: 1,
585 };
586 sign_message(&mut message, &secret_key);
587 (message, public_key)
588 }
589 fn sign_message(message: &mut Message, secret_key: &exo_core::SecretKey) {
590 let payload = message_signing_payload(message).expect("signing payload");
591 message.signature = exo_core::crypto::sign(&payload, secret_key);
592 }
593
594 #[test]
595 fn registry_empty() {
596 let r = PeerRegistry::new();
597 assert!(r.is_empty());
598 assert_eq!(r.len(), 0);
599 }
600 #[test]
601 fn registry_register() {
602 let mut r = PeerRegistry::new();
603 r.register(info("a"));
604 assert_eq!(r.len(), 1);
605 assert!(r.get(&pid("a")).is_some());
606 }
607 #[test]
608 fn registry_default() {
609 assert!(PeerRegistry::default().is_empty());
610 }
611 #[test]
612 fn send_known_peer() {
613 let mut r = PeerRegistry::new();
614 r.register(info("b"));
615 assert!(send(&r, &msg("a", Some("b"))).is_ok());
616 }
617 #[test]
618 fn send_unknown_peer() {
619 let r = PeerRegistry::new();
620 let err = send(&r, &msg("a", Some("b"))).unwrap_err();
621 assert!(matches!(
622 err,
623 ApiError::PeerNotFound(peer_id) if peer_id == "did:exo:b"
624 ));
625 }
626 #[test]
627 fn send_broadcast() {
628 let r = PeerRegistry::new();
629 assert!(send(&r, &msg("a", None)).is_ok());
630 }
631 #[test]
632 fn message_signing_payload_is_domain_separated_and_deterministic() {
633 let (message, _) = signed_msg("a", Some("b"), 7);
634 let first = message_signing_payload(&message).expect("payload");
635 let second = message_signing_payload(&message).expect("payload");
636 assert_eq!(first, second);
637
638 #[derive(Deserialize)]
639 struct DecodedPayload {
640 domain: String,
641 }
642 let decoded: DecodedPayload = ciborium::from_reader(&first[..]).expect("decode");
643 assert_eq!(decoded.domain, "exo.p2p.message.v1");
644 }
645 #[test]
646 fn verify_message_accepts_correct_signature() {
647 let (message, public_key) = signed_msg("a", None, 7);
648 assert!(verify_message(&message, &public_key).is_ok());
649 }
650 #[test]
651 fn verify_message_rejects_oversized_payload_before_signature_work() {
652 let (public_key, _) = keypair(7);
653 let oversized = Message {
654 from: pid("a"),
655 to: None,
656 payload: vec![0xA5; MAX_P2P_MESSAGE_PAYLOAD_BYTES + 1],
657 signature: Signature::from_bytes([1u8; 64]),
658 nonce: 42,
659 };
660
661 let err = verify_message(&oversized, &public_key).unwrap_err();
662
663 assert!(matches!(
664 err,
665 ApiError::InvalidSchema { reason } if reason.contains("payload")
666 && reason.contains(&MAX_P2P_MESSAGE_PAYLOAD_BYTES.to_string())
667 ));
668 }
669 #[test]
670 fn message_signing_payload_rejects_oversized_payload_before_cbor_serialization() {
671 let mut message = msg("a", None);
672 message.payload = vec![0xA5; MAX_P2P_MESSAGE_PAYLOAD_BYTES + 1];
673
674 let err = message_signing_payload(&message).unwrap_err();
675
676 assert!(matches!(
677 err,
678 ApiError::InvalidSchema { reason } if reason.contains("payload")
679 && reason.contains(&MAX_P2P_MESSAGE_PAYLOAD_BYTES.to_string())
680 ));
681 }
682 #[test]
683 fn message_replay_guard_rejects_duplicate_signed_message_without_recording_tamper() {
684 let (message, public_key) = signed_msg("a", Some("b"), 7);
685 let mut guard = MessageReplayGuard::new();
686
687 assert!(guard.verify_and_record(&message, &public_key).is_ok());
688 let replay = guard.verify_and_record(&message, &public_key).unwrap_err();
689
690 assert!(matches!(
691 replay,
692 ApiError::ReplayDetected { peer_id, nonce }
693 if peer_id == "did:exo:a" && nonce == message.nonce
694 ));
695
696 let mut tampered = message.clone();
697 tampered.nonce += 1;
698 tampered.payload = b"tampered".to_vec();
699 let tamper_err = guard.verify_and_record(&tampered, &public_key).unwrap_err();
700 assert!(matches!(
701 tamper_err,
702 ApiError::VerificationFailed { reason } if reason == "invalid Ed25519 signature"
703 ));
704 assert_eq!(
705 guard.len(),
706 1,
707 "failed verifications must not grow replay state"
708 );
709 }
710 #[test]
711 fn message_replay_guard_rejects_reused_sender_nonce_with_new_valid_payload() {
712 let (public_key, secret_key) = keypair(7);
713 let mut first = Message {
714 from: pid("a"),
715 to: Some(pid("b")),
716 payload: b"hello".to_vec(),
717 signature: Signature::Empty,
718 nonce: 7,
719 };
720 sign_message(&mut first, &secret_key);
721 let mut second = first.clone();
722 second.payload = b"new signed payload".to_vec();
723 second.signature = Signature::Empty;
724 sign_message(&mut second, &secret_key);
725 let mut guard = MessageReplayGuard::new();
726
727 assert!(guard.verify_and_record(&first, &public_key).is_ok());
728 let replay = guard.verify_and_record(&second, &public_key).unwrap_err();
729
730 assert!(matches!(
731 replay,
732 ApiError::ReplayDetected { peer_id, nonce } if peer_id == "did:exo:a" && nonce == 7
733 ));
734 assert_eq!(guard.len(), 1);
735 }
736 #[test]
737 fn message_replay_guard_bounds_tracked_state_and_can_reset() {
738 let (message, public_key) = signed_msg("a", Some("b"), 7);
739 let mut guard = MessageReplayGuard::new();
740 for nonce in 0..MessageReplayGuard::MAX_TRACKED_MESSAGES {
741 guard.seen.insert(MessageReplayKey {
742 from: pid("filled"),
743 nonce: u64::try_from(nonce).unwrap(),
744 });
745 }
746
747 let err = guard.verify_and_record(&message, &public_key).unwrap_err();
748
749 assert!(matches!(
750 err,
751 ApiError::RateLimited { peer_id } if peer_id == "did:exo:a"
752 ));
753 assert_eq!(guard.len(), MessageReplayGuard::MAX_TRACKED_MESSAGES);
754
755 guard.reset();
756 assert!(guard.is_empty());
757 assert!(guard.verify_and_record(&message, &public_key).is_ok());
758 }
759 #[test]
760 fn verify_message_rejects_empty_and_zero_signatures() {
761 let (public_key, _) = keypair(7);
762 let m = Message {
763 from: pid("a"),
764 to: None,
765 payload: vec![],
766 signature: Signature::Empty,
767 nonce: 0,
768 };
769 assert!(verify_message(&m, &public_key).is_err());
770
771 let m = Message {
772 from: pid("a"),
773 to: None,
774 payload: vec![],
775 signature: Signature::from_bytes([0u8; 64]),
776 nonce: 0,
777 };
778 assert!(verify_message(&m, &public_key).is_err());
779 }
780 #[test]
781 fn verify_message_rejects_fake_non_empty_signature() {
782 let (public_key, _) = keypair(7);
783 assert!(verify_message(&msg("a", None), &public_key).is_err());
784 }
785 #[test]
786 fn verify_message_rejects_wrong_key() {
787 let (message, _) = signed_msg("a", Some("b"), 7);
788 let (wrong_public_key, _) = keypair(8);
789 assert!(verify_message(&message, &wrong_public_key).is_err());
790 }
791 #[test]
792 fn verify_message_rejects_tampering() {
793 let (message, public_key) = signed_msg("a", Some("b"), 7);
794
795 let mut tampered_payload = message.clone();
796 tampered_payload.payload = b"goodbye".to_vec();
797 assert!(verify_message(&tampered_payload, &public_key).is_err());
798
799 let mut tampered_recipient = message.clone();
800 tampered_recipient.to = Some(pid("c"));
801 assert!(verify_message(&tampered_recipient, &public_key).is_err());
802
803 let mut tampered_nonce = message.clone();
804 tampered_nonce.nonce = 2;
805 assert!(verify_message(&tampered_nonce, &public_key).is_err());
806
807 let mut tampered_sender = message.clone();
808 tampered_sender.from = pid("z");
809 assert!(verify_message(&tampered_sender, &public_key).is_err());
810 }
811 #[test]
812 fn validate_message_structure_keeps_non_crypto_checks_separate() {
813 assert!(validate_message_structure(&msg("a", None)).is_ok());
814 }
815 #[test]
816 fn discover() {
817 let mut r = PeerRegistry::new();
818 let d = discover_peers(&mut r, &["addr1".into(), "addr2".into()]).unwrap();
819 assert_eq!(d.len(), 2);
820 assert_eq!(r.len(), 2);
821 }
822 #[test]
823 fn discover_no_dupes() {
824 let mut r = PeerRegistry::new();
825 discover_peers(&mut r, &["a".into()]).unwrap();
826 let d = discover_peers(&mut r, &["a".into()]).unwrap();
827 assert!(d.is_empty());
828 }
829 #[test]
830 fn discover_peers_rejects_oversized_bootstrap_before_mutating_registry() {
831 const MAX_BOOTSTRAP_PEERS: usize = 4_096;
832 let bootstrap: Vec<String> = (0..(MAX_BOOTSTRAP_PEERS + 1))
833 .map(|i| format!("addr-{i}"))
834 .collect();
835 let mut registry = PeerRegistry::new();
836
837 let err = discover_peers(&mut registry, &bootstrap).unwrap_err();
838
839 assert!(matches!(err, ApiError::InvalidSchema { .. }));
840 assert_eq!(registry.len(), 0);
841 }
842 #[test]
843 fn rate_limiter() {
844 let mut rl = RateLimiter::new();
845 for _ in 0..100 {
846 rl.check_and_increment(&pid("a")).unwrap();
847 }
848 let err = rl.check_and_increment(&pid("a")).unwrap_err();
849 assert!(matches!(
850 err,
851 ApiError::RateLimited { peer_id } if peer_id == "did:exo:a"
852 ));
853 }
854 #[test]
855 fn rate_limiter_reset() {
856 let mut rl = RateLimiter::new();
857 for _ in 0..100 {
858 rl.check_and_increment(&pid("a")).unwrap();
859 }
860 rl.reset();
861 assert!(rl.check_and_increment(&pid("a")).is_ok());
862 }
863 #[test]
864 fn rate_limiter_bounds_distinct_peer_tracking() {
865 let mut rl = RateLimiter::new();
866 for i in 0..RateLimiter::MAX_TRACKED_PEERS {
867 rl.check_and_increment(&pid(&format!("peer-{i}"))).unwrap();
868 }
869
870 assert_eq!(rl.counts.len(), RateLimiter::MAX_TRACKED_PEERS);
871 let err = rl.check_and_increment(&pid("overflow-peer")).unwrap_err();
872 assert!(matches!(
873 err,
874 ApiError::RateLimited { peer_id } if peer_id == "did:exo:overflow-peer"
875 ));
876 assert_eq!(
877 rl.counts.len(),
878 RateLimiter::MAX_TRACKED_PEERS,
879 "refused peers must not grow the limiter state"
880 );
881 assert!(rl.check_and_increment(&pid("peer-0")).is_ok());
882 }
883 #[test]
884 fn p2p_error_peer_labels_do_not_depend_on_debug_formatting() {
885 let source = include_str!("p2p.rs");
886 let production = source
887 .split("#[cfg(test)]")
888 .next()
889 .expect("production section");
890 assert!(
891 !production.contains("format!(\"{:?}\", peer)"),
892 "P2P rate-limit errors must use stable peer labels"
893 );
894 assert!(
895 !production.contains("format!(\"{to:?}\")"),
896 "P2P peer lookup errors must use stable peer labels"
897 );
898 }
899 #[test]
900 fn peer_id_ord() {
901 assert!(pid("a") < pid("b"));
902 }
903 #[test]
904 fn message_serde() {
905 let m = msg("a", Some("b"));
906 let j = serde_json::to_string(&m).unwrap();
907 assert!(!j.is_empty());
908 }
909
910 fn peer_meta(name: &str, asn: Option<u32>, last_seen_ms: u64) -> PeerMetadata {
913 PeerMetadata {
914 info: info(name),
915 asn: asn.map(Asn),
916 first_seen: Timestamp::ZERO,
917 last_seen: Timestamp::new(last_seen_ms, 0),
918 }
919 }
920
921 #[test]
923 fn reject_single_asn_peer_set() {
924 let peers: Vec<PeerMetadata> = (0..5)
925 .map(|i| peer_meta(&format!("p{i}"), Some(64512), 1000))
926 .collect();
927 let policy = AsnPolicy::default();
928 assert_eq!(
929 check_asn_diversity(&peers, &policy),
930 DiversityResult::Insufficient {
931 unique_asns: 1,
932 required: 3
933 }
934 );
935 }
936
937 #[test]
939 fn accept_diverse_peer_set() {
940 let peers = vec![
941 peer_meta("a", Some(100), 1000),
942 peer_meta("b", Some(200), 1000),
943 peer_meta("c", Some(300), 1000),
944 ];
945 let policy = AsnPolicy::default();
946 assert_eq!(
947 check_asn_diversity(&peers, &policy),
948 DiversityResult::Sufficient
949 );
950 }
951
952 #[test]
953 fn reject_peer_set_exceeding_max_peers_per_asn_even_when_unique_asns_sufficient() {
954 let mut peers: Vec<PeerMetadata> = (0..6)
955 .map(|i| peer_meta(&format!("attacker-{i}"), Some(64512), 1000))
956 .collect();
957 peers.push(peer_meta("honest-a", Some(64513), 1000));
958 peers.push(peer_meta("honest-b", Some(64514), 1000));
959 let policy = AsnPolicy::default();
960
961 assert_eq!(
962 check_asn_diversity(&peers, &policy),
963 DiversityResult::OverrepresentedAsn {
964 asn: Asn(64512),
965 peers: 6,
966 maximum: 5
967 }
968 );
969 }
970
971 #[test]
973 fn max_peers_per_asn_enforced() {
974 let candidates: Vec<PeerMetadata> = (0..10)
975 .map(|i| peer_meta(&format!("p{i}"), Some(64512), 1000))
976 .collect();
977 let policy = AsnPolicy {
978 max_peers_per_asn: 5,
979 ..AsnPolicy::default()
980 };
981 let selected = select_diverse_peers(&candidates, &policy, 20);
982 assert_eq!(selected.len(), 5);
983 }
984
985 #[test]
987 fn select_diverse_round_robin() {
988 let mut candidates = Vec::new();
989 for asn in [10, 20, 30, 40] {
990 for i in 0..4 {
991 candidates.push(peer_meta(&format!("asn{asn}-p{i}"), Some(asn), 1000));
992 }
993 }
994 let policy = AsnPolicy::default();
995 let selected = select_diverse_peers(&candidates, &policy, 8);
996 assert_eq!(selected.len(), 8);
997 let mut counts: BTreeMap<u32, usize> = BTreeMap::new();
999 for p in &selected {
1000 *counts.entry(p.asn.unwrap().0).or_default() += 1;
1001 }
1002 for &c in counts.values() {
1003 assert_eq!(c, 2);
1004 }
1005 }
1006
1007 #[test]
1008 fn select_diverse_peers_clamps_untrusted_max_peers_before_allocating() {
1009 const EXPECTED_SELECTION_LIMIT: usize = 4_096;
1010 let candidates: Vec<PeerMetadata> = (0..(EXPECTED_SELECTION_LIMIT + 1))
1011 .map(|i| {
1012 peer_meta(
1013 &format!("peer-{i}"),
1014 Some(u32::try_from(i + 1).unwrap()),
1015 1000,
1016 )
1017 })
1018 .collect();
1019 let policy = AsnPolicy {
1020 max_peers_per_asn: 1,
1021 ..AsnPolicy::default()
1022 };
1023
1024 let selected = select_diverse_peers(&candidates, &policy, usize::MAX);
1025
1026 assert_eq!(selected.len(), EXPECTED_SELECTION_LIMIT);
1027 assert_eq!(selected.first().unwrap().info.id, candidates[0].info.id);
1028 assert_eq!(
1029 selected.last().unwrap().info.id,
1030 candidates[EXPECTED_SELECTION_LIMIT - 1].info.id
1031 );
1032 }
1033
1034 #[test]
1036 fn identify_stale_peers_test() {
1037 let now = Timestamp::new(10_000_000, 0);
1038 let max_age_ms = 7_200_000; let peers = vec![
1040 peer_meta("fresh", Some(1), 9_000_000), peer_meta("stale1", Some(2), 1_000_000), peer_meta("stale2", Some(3), 2_000_000), ];
1044 let stale = identify_stale_peers(&peers, &now, max_age_ms);
1045 assert_eq!(stale.len(), 2);
1046 assert!(stale.contains(&pid("stale1")));
1047 assert!(stale.contains(&pid("stale2")));
1048 }
1049
1050 #[test]
1051 fn identify_stale_peers_treats_future_last_seen_as_stale() {
1052 let now = Timestamp::new(10_000_000, 0);
1053 let peers = vec![peer_meta("future", Some(1), 10_001_000)];
1054
1055 let stale = identify_stale_peers(&peers, &now, 7_200_000);
1056
1057 assert_eq!(stale, vec![pid("future")]);
1058 }
1059
1060 #[test]
1062 fn rotate_replaces_stale_with_diverse() {
1063 let now = Timestamp::new(10_000_000, 0);
1064 let policy = AsnPolicy {
1065 rotation_interval_ms: 3_600_000,
1066 ..AsnPolicy::default()
1067 };
1068 let mut current = vec![
1069 peer_meta("keep", Some(100), 9_000_000), peer_meta("old1", Some(100), 1_000_000), peer_meta("old2", Some(100), 2_000_000), ];
1073 let candidates = vec![
1074 peer_meta("new1", Some(200), 9_500_000),
1075 peer_meta("new2", Some(300), 9_500_000),
1076 ];
1077 let evicted = rotate_peers(&mut current, &candidates, &policy, &now);
1078 assert_eq!(evicted.len(), 2);
1079 assert!(evicted.contains(&pid("old1")));
1080 assert!(evicted.contains(&pid("old2")));
1081 assert!(current.len() >= 2 && current.len() <= 3);
1083 let ids: Vec<PeerId> = current.iter().map(|p| p.info.id.clone()).collect();
1084 assert!(ids.contains(&pid("keep")));
1085 }
1086
1087 #[test]
1089 fn empty_candidates_no_crash() {
1090 let selected = select_diverse_peers(&[], &AsnPolicy::default(), 10);
1091 assert!(selected.is_empty());
1092 let stale = identify_stale_peers(&[], &Timestamp::ZERO, 1000);
1093 assert!(stale.is_empty());
1094 }
1095
1096 #[test]
1098 fn no_asn_peers_treated_as_same_group() {
1099 let peers = vec![
1100 peer_meta("a", None, 1000),
1101 peer_meta("b", None, 1000),
1102 peer_meta("c", None, 1000),
1103 ];
1104 let policy = AsnPolicy::default();
1105 assert_eq!(
1107 check_asn_diversity(&peers, &policy),
1108 DiversityResult::Insufficient {
1109 unique_asns: 1,
1110 required: 3
1111 }
1112 );
1113 let policy2 = AsnPolicy {
1115 max_peers_per_asn: 2,
1116 ..AsnPolicy::default()
1117 };
1118 let selected = select_diverse_peers(&peers, &policy2, 10);
1119 assert_eq!(selected.len(), 2);
1120 }
1121
1122 #[test]
1124 fn policy_default_values() {
1125 let p = AsnPolicy::default();
1126 assert_eq!(p.min_unique_asns, 3);
1127 assert_eq!(p.max_peers_per_asn, 5);
1128 assert_eq!(p.rotation_interval_ms, 3_600_000);
1129 }
1130}