use std::collections::BTreeMap;
use std::time::SystemTime;
#[derive(Debug, Clone)]
pub struct PeerInfo {
pub peer_id: String,
pub address: String,
pub last_seen_ms: u64,
}
#[derive(Debug, Default)]
pub struct PeerRegistry {
peers: BTreeMap<String, PeerInfo>,
instance_seed: u64,
}
fn now_ms() -> u64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
impl PeerRegistry {
pub fn new() -> Self {
Self {
peers: BTreeMap::new(),
instance_seed: 0,
}
}
pub fn with_instance_id(instance_id: &str) -> Self {
let mut h: u64 = 0xcbf29ce484222325; for b in instance_id.bytes() {
h ^= b as u64;
h = h.wrapping_mul(0x100000001b3); }
Self {
peers: BTreeMap::new(),
instance_seed: h,
}
}
pub fn register(&mut self, peer_id: String, address: String) {
self.peers.insert(
peer_id.clone(),
PeerInfo {
peer_id,
address,
last_seen_ms: 0,
},
);
}
pub fn unregister(&mut self, peer_id: &str) -> bool {
self.peers.remove(peer_id).is_some()
}
pub fn list(&self) -> Vec<&PeerInfo> {
self.peers.values().collect()
}
pub fn len(&self) -> usize {
self.peers.len()
}
pub fn is_empty(&self) -> bool {
self.peers.is_empty()
}
pub fn select_sync_targets(&self) -> Vec<String> {
let n = self.peers.len();
if n == 0 {
return vec![];
}
if n <= 2 {
return self.peers.keys().cloned().collect();
}
let fan_out = ((n as f64).ln() + 1.0).ceil() as usize;
let fan_out = fan_out.min(n);
let seed = now_ms() ^ self.instance_seed;
let peer_ids: Vec<&String> = self.peers.keys().collect();
let mut selected = Vec::with_capacity(fan_out);
let mut state = seed;
while selected.len() < fan_out {
state = state
.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407);
let idx = (state >> 33) as usize % n;
let candidate = peer_ids[idx].clone();
if !selected.contains(&candidate) {
selected.push(candidate);
}
}
selected
}
pub fn record_sync(&mut self, peer_id: &str) {
if let Some(peer) = self.peers.get_mut(peer_id) {
peer.last_seen_ms = now_ms();
}
}
pub fn get(&self, peer_id: &str) -> Option<&PeerInfo> {
self.peers.get(peer_id)
}
pub fn verify_compaction_safe(&self, latest_entry_ms: u64) -> (bool, Vec<String>) {
if self.peers.is_empty() {
return (true, vec![]);
}
let mut reasons = Vec::new();
for peer in self.peers.values() {
if peer.last_seen_ms < latest_entry_ms {
if peer.last_seen_ms == 0 {
reasons.push(format!("peer '{}' has never synced", peer.peer_id));
} else {
reasons.push(format!(
"peer '{}' last synced at {}ms, but latest entry is at {}ms",
peer.peer_id, peer.last_seen_ms, latest_entry_ms
));
}
}
}
(reasons.is_empty(), reasons)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn register_and_list() {
let mut reg = PeerRegistry::new();
reg.register("a".into(), "tcp://a:7701".into());
reg.register("b".into(), "tcp://b:7701".into());
assert_eq!(reg.len(), 2);
assert!(!reg.is_empty());
}
#[test]
fn unregister_returns_true_if_existed() {
let mut reg = PeerRegistry::new();
reg.register("a".into(), "tcp://a:7701".into());
assert!(reg.unregister("a"));
assert!(!reg.unregister("a")); assert_eq!(reg.len(), 0);
}
#[test]
fn select_empty_returns_empty() {
let reg = PeerRegistry::new();
assert!(reg.select_sync_targets().is_empty());
}
#[test]
fn select_one_peer_returns_it() {
let mut reg = PeerRegistry::new();
reg.register("only".into(), "tcp://only:7701".into());
let targets = reg.select_sync_targets();
assert_eq!(targets, vec!["only"]);
}
#[test]
fn select_two_peers_returns_both() {
let mut reg = PeerRegistry::new();
reg.register("a".into(), "tcp://a:7701".into());
reg.register("b".into(), "tcp://b:7701".into());
let targets = reg.select_sync_targets();
assert_eq!(targets.len(), 2);
}
#[test]
fn select_logarithmic_fan_out() {
let mut reg = PeerRegistry::new();
for i in 0..1000 {
reg.register(
format!("peer-{i}"),
format!("tcp://10.0.{0}.{1}:7701", i / 256, i % 256),
);
}
let targets = reg.select_sync_targets();
let expected = ((1000_f64).ln() + 1.0).ceil() as usize;
assert_eq!(targets.len(), expected); for t in &targets {
assert!(reg.get(t).is_some());
}
let unique: std::collections::HashSet<&String> = targets.iter().collect();
assert_eq!(unique.len(), targets.len());
}
#[test]
fn record_sync_updates_last_seen() {
let mut reg = PeerRegistry::new();
reg.register("a".into(), "tcp://a:7701".into());
assert_eq!(reg.get("a").unwrap().last_seen_ms, 0);
reg.record_sync("a");
assert!(reg.get("a").unwrap().last_seen_ms > 0);
}
#[test]
fn select_targets_are_subset_of_registered() {
let mut reg = PeerRegistry::new();
for i in 0..50 {
reg.register(format!("p{i}"), format!("addr-{i}"));
}
let targets = reg.select_sync_targets();
for t in &targets {
assert!(reg.get(t).is_some(), "target {t} not in registry");
}
}
}