1use std::collections::HashMap;
9use std::sync::{Arc, RwLock};
10use std::time::Instant;
11
12use chrono::Utc;
13use serde::{Deserialize, Serialize};
14
15#[derive(Clone, Debug, Deserialize, Serialize)]
17pub struct PeerConfig {
18 pub peers: Vec<PeerEndpoint>,
20 #[serde(default = "default_heartbeat_interval")]
22 pub heartbeat_interval_secs: u64,
23 #[serde(default = "default_peer_timeout_secs")]
25 pub peer_timeout_secs: u64,
26 #[serde(default = "default_fanout")]
28 pub gossip_fanout: usize,
29}
30
31fn default_heartbeat_interval() -> u64 {
32 30
33}
34fn default_peer_timeout_secs() -> u64 {
35 10
36}
37fn default_fanout() -> usize {
38 3
39}
40
41#[derive(Clone, Debug, Deserialize, Serialize)]
43pub struct PeerEndpoint {
44 pub peer_id: String,
46 pub endpoint: String,
48 pub public_key: Option<String>,
50}
51
52#[derive(Clone, Debug, PartialEq)]
54pub enum PeerStatus {
55 Active,
57 Suspected,
59 Offline,
61}
62
63#[derive(Clone, Debug)]
65pub struct PeerInfo {
66 pub endpoint: PeerEndpoint,
67 pub status: PeerStatus,
68 pub last_seen: Instant,
69 pub last_heartbeat: Option<Instant>,
70 pub failure_count: u32,
71}
72
73impl PeerInfo {
74 pub fn new(endpoint: PeerEndpoint) -> Self {
75 Self {
76 endpoint,
77 status: PeerStatus::Active,
78 last_seen: Instant::now(),
79 last_heartbeat: None,
80 failure_count: 0,
81 }
82 }
83
84 pub fn mark_failure(&mut self) {
85 self.failure_count += 1;
86 if self.failure_count >= 3 {
87 self.status = PeerStatus::Offline;
88 } else {
89 self.status = PeerStatus::Suspected;
90 }
91 }
92
93 pub fn mark_success(&mut self) {
94 self.failure_count = 0;
95 self.status = PeerStatus::Active;
96 self.last_seen = Instant::now();
97 }
98}
99
100#[derive(Clone, Debug, Deserialize, Serialize)]
102pub struct GossipMessage {
103 pub message_id: String,
105 pub origin_peer: String,
107 pub sequence: u64,
109 pub kind: GossipKind,
111 pub timestamp: String,
113 pub payload: String,
115}
116
117#[derive(Clone, Debug, Deserialize, Serialize)]
119#[serde(tag = "type", rename_all = "snake_case")]
120pub enum GossipKind {
121 Advertisement { peer_id: String, endpoint: String },
123 AssetUpdate {
125 asset_id: String,
126 asset_type: String,
127 },
128 SyncRequest { since_sequence: u64 },
130 SyncResponse { assets: Vec<String> },
132 Leave { peer_id: String },
134}
135
136#[derive(Clone)]
138pub struct PeerRegistry {
139 peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
140 config: PeerConfig,
141 local_peer_id: String,
142}
143
144impl PeerRegistry {
145 pub fn new(config: PeerConfig, local_peer_id: String) -> Self {
147 let peers: HashMap<String, PeerInfo> = config
148 .peers
149 .iter()
150 .map(|e| (e.peer_id.clone(), PeerInfo::new(e.clone())))
151 .collect();
152
153 Self {
154 peers: Arc::new(RwLock::new(peers)),
155 config,
156 local_peer_id,
157 }
158 }
159
160 pub fn get_active_peers(&self) -> Vec<PeerEndpoint> {
162 self.peers
163 .read()
164 .unwrap()
165 .values()
166 .filter(|p| p.status == PeerStatus::Active)
167 .map(|p| p.endpoint.clone())
168 .collect()
169 }
170
171 pub fn get_gossip_peers(&self, count: usize) -> Vec<PeerEndpoint> {
173 let peers = self.peers.read().unwrap();
174 let active: Vec<_> = peers
175 .values()
176 .filter(|p| p.status == PeerStatus::Active)
177 .filter(|p| p.endpoint.peer_id != self.local_peer_id)
178 .map(|p| p.endpoint.clone())
179 .collect();
180
181 if active.is_empty() {
182 return vec![];
183 }
184
185 let count = count.min(active.len());
187 active.into_iter().take(count).collect()
188 }
189
190 pub fn update_peer_status(&self, peer_id: &str, is_alive: bool) {
192 let mut peers = self.peers.write().unwrap();
193 if let Some(peer) = peers.get_mut(peer_id) {
194 if is_alive {
195 peer.mark_success();
196 peer.last_heartbeat = Some(Instant::now());
197 } else {
198 peer.mark_failure();
199 }
200 }
201 }
202
203 pub fn add_peer(&self, endpoint: PeerEndpoint) {
205 let mut peers = self.peers.write().unwrap();
206 if !peers.contains_key(&endpoint.peer_id) {
207 peers.insert(endpoint.peer_id.clone(), PeerInfo::new(endpoint));
208 }
209 }
210
211 pub fn remove_peer(&self, peer_id: &str) {
213 let mut peers = self.peers.write().unwrap();
214 peers.remove(peer_id);
215 }
216
217 pub fn local_peer_id(&self) -> &str {
219 &self.local_peer_id
220 }
221
222 pub fn config(&self) -> &PeerConfig {
224 &self.config
225 }
226}
227
228pub struct GossipBuilder {
230 origin_peer: String,
231 sequence: u64,
232 kind: Option<GossipKind>,
233 payload: Option<String>,
234}
235
236impl GossipBuilder {
237 pub fn new(origin_peer: String, sequence: u64) -> Self {
238 Self {
239 origin_peer,
240 sequence,
241 kind: None,
242 payload: None,
243 }
244 }
245
246 pub fn advertisement(mut self, peer_id: String, endpoint: String) -> Self {
247 self.kind = Some(GossipKind::Advertisement { peer_id, endpoint });
248 self
249 }
250
251 pub fn asset_update(mut self, asset_id: String, asset_type: String) -> Self {
252 self.kind = Some(GossipKind::AssetUpdate {
253 asset_id,
254 asset_type,
255 });
256 self
257 }
258
259 pub fn sync_request(mut self, since_sequence: u64) -> Self {
260 self.kind = Some(GossipKind::SyncRequest { since_sequence });
261 self
262 }
263
264 pub fn sync_response(mut self, assets: Vec<String>) -> Self {
265 self.kind = Some(GossipKind::SyncResponse { assets });
266 self
267 }
268
269 pub fn leave(mut self, peer_id: String) -> Self {
270 self.kind = Some(GossipKind::Leave { peer_id });
271 self
272 }
273
274 pub fn payload(mut self, payload: String) -> Self {
275 self.payload = Some(payload);
276 self
277 }
278
279 pub fn build(self) -> Option<GossipMessage> {
280 let kind = self.kind?;
281 let payload = self
282 .payload
283 .unwrap_or_else(|| serde_json::to_string(&kind).unwrap_or_default());
284
285 Some(GossipMessage {
286 message_id: format!(
287 "gossip-{:x}",
288 Utc::now().timestamp_nanos_opt().unwrap_or_default()
289 ),
290 origin_peer: self.origin_peer,
291 sequence: self.sequence,
292 kind,
293 timestamp: Utc::now().to_rfc3339(),
294 payload,
295 })
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302
303 #[test]
304 fn test_peer_registry_creation() {
305 let config = PeerConfig {
306 peers: vec![
307 PeerEndpoint {
308 peer_id: "peer1".into(),
309 endpoint: "http://peer1:8080".into(),
310 public_key: None,
311 },
312 PeerEndpoint {
313 peer_id: "peer2".into(),
314 endpoint: "http://peer2:8080".into(),
315 public_key: None,
316 },
317 ],
318 heartbeat_interval_secs: 30,
319 peer_timeout_secs: 10,
320 gossip_fanout: 3,
321 };
322
323 let registry = PeerRegistry::new(config, "local-peer".to_string());
324 let active = registry.get_active_peers();
325 assert_eq!(active.len(), 2);
326 }
327
328 #[test]
329 fn test_peer_failure_tracking() {
330 let config = PeerConfig {
331 peers: vec![PeerEndpoint {
332 peer_id: "peer1".into(),
333 endpoint: "http://peer1:8080".into(),
334 public_key: None,
335 }],
336 heartbeat_interval_secs: 30,
337 peer_timeout_secs: 10,
338 gossip_fanout: 3,
339 };
340
341 let registry = PeerRegistry::new(config, "local-peer".into());
342
343 registry.update_peer_status("peer1", false);
345 registry.update_peer_status("peer1", false);
346
347 let peers = registry.get_active_peers();
348 assert!(peers.is_empty()); registry.update_peer_status("peer1", true);
352 let peers = registry.get_active_peers();
353 assert_eq!(peers.len(), 1);
354 }
355
356 #[test]
357 fn test_gossip_builder() {
358 let msg = GossipBuilder::new("peer1".to_string(), 1)
359 .asset_update("asset-123".to_string(), "gene".to_string())
360 .build();
361
362 assert!(msg.is_some());
363 let msg = msg.unwrap();
364 assert_eq!(msg.origin_peer, "peer1");
365 assert_eq!(msg.sequence, 1);
366 }
367}