1use std::collections::HashMap;
7
8use chrono::{DateTime, Utc};
9use schemars::JsonSchema;
10use serde::{Deserialize, Serialize};
11use uuid::Uuid;
12
13#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
15#[serde(rename_all = "snake_case")]
16pub enum PeerStatus {
17 Discovered,
19 Connected,
21 Disconnected,
23 Removed,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
29#[serde(rename_all = "snake_case")]
30pub enum PeerTransport {
31 Tcp { host: String, port: u16 },
33 Relay { relay_url: String },
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
39pub struct PeerInfo {
40 pub peer_id: Uuid,
41 pub name: String,
42 pub transport: PeerTransport,
43 pub status: PeerStatus,
44 pub last_seen: DateTime<Utc>,
45 pub last_sync: Option<DateTime<Utc>>,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
50pub struct PeerConnection {
51 pub peer: PeerInfo,
52 pub connected_at: DateTime<Utc>,
53 pub bytes_sent: u64,
54 pub bytes_received: u64,
55 pub syncs_completed: u64,
56}
57
58impl PeerConnection {
59 pub fn new(peer: PeerInfo) -> Self {
61 Self {
62 peer: PeerInfo {
63 status: PeerStatus::Connected,
64 ..peer
65 },
66 connected_at: Utc::now(),
67 bytes_sent: 0,
68 bytes_received: 0,
69 syncs_completed: 0,
70 }
71 }
72
73 pub fn record_sync(&mut self, sent: u64, received: u64) {
75 self.bytes_sent += sent;
76 self.bytes_received += received;
77 self.syncs_completed += 1;
78 self.peer.last_sync = Some(Utc::now());
79 self.peer.last_seen = Utc::now();
80 }
81}
82
83#[derive(Debug)]
85pub struct PeerDiscovery {
86 local_peer_id: Uuid,
87 local_name: String,
88 peers: HashMap<Uuid, PeerInfo>,
89 connections: HashMap<Uuid, PeerConnection>,
90}
91
92impl PeerDiscovery {
93 pub fn new(local_name: String) -> Self {
95 Self {
96 local_peer_id: Uuid::new_v4(),
97 local_name,
98 peers: HashMap::new(),
99 connections: HashMap::new(),
100 }
101 }
102
103 pub fn local_peer_id(&self) -> Uuid {
105 self.local_peer_id
106 }
107
108 pub fn local_name(&self) -> &str {
110 &self.local_name
111 }
112
113 pub fn add_peer(&mut self, peer: PeerInfo) {
115 self.peers.insert(peer.peer_id, peer);
116 }
117
118 pub fn remove_peer(&mut self, peer_id: &Uuid) -> Option<PeerInfo> {
120 self.connections.remove(peer_id);
121 self.peers.remove(peer_id)
122 }
123
124 pub fn get_peer(&self, peer_id: &Uuid) -> Option<&PeerInfo> {
126 self.peers.get(peer_id)
127 }
128
129 pub fn list_peers(&self) -> Vec<&PeerInfo> {
131 self.peers.values().collect()
132 }
133
134 pub fn connected_peers(&self) -> Vec<&PeerConnection> {
136 self.connections.values().collect()
137 }
138
139 pub fn connect(&mut self, peer_id: &Uuid) -> anyhow::Result<&PeerConnection> {
141 let peer = self
142 .peers
143 .get(peer_id)
144 .ok_or_else(|| anyhow::anyhow!("unknown peer: {peer_id}"))?
145 .clone();
146
147 let conn = PeerConnection::new(peer.clone());
148 self.peers.insert(
149 *peer_id,
150 PeerInfo {
151 status: PeerStatus::Connected,
152 last_seen: Utc::now(),
153 ..peer
154 },
155 );
156 self.connections.insert(*peer_id, conn);
157 Ok(self.connections.get(peer_id).unwrap())
158 }
159
160 pub fn disconnect(&mut self, peer_id: &Uuid) -> anyhow::Result<()> {
162 self.connections.remove(peer_id);
163 if let Some(peer) = self.peers.get_mut(peer_id) {
164 peer.status = PeerStatus::Disconnected;
165 }
166 Ok(())
167 }
168
169 pub fn record_sync(&mut self, peer_id: &Uuid, sent: u64, received: u64) -> anyhow::Result<()> {
171 let conn = self
172 .connections
173 .get_mut(peer_id)
174 .ok_or_else(|| anyhow::anyhow!("not connected to peer: {peer_id}"))?;
175 conn.record_sync(sent, received);
176 Ok(())
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183
184 fn make_peer_info(name: &str) -> PeerInfo {
185 PeerInfo {
186 peer_id: Uuid::new_v4(),
187 name: name.to_string(),
188 transport: PeerTransport::Tcp {
189 host: "127.0.0.1".into(),
190 port: 9900,
191 },
192 status: PeerStatus::Discovered,
193 last_seen: Utc::now(),
194 last_sync: None,
195 }
196 }
197
198 #[test]
199 fn test_peer_discovery_new() {
200 let disc = PeerDiscovery::new("alice".into());
201 assert_eq!(disc.local_name(), "alice");
202 assert!(disc.list_peers().is_empty());
203 assert!(disc.connected_peers().is_empty());
204 }
205
206 #[test]
207 fn test_add_and_list_peers() {
208 let mut disc = PeerDiscovery::new("alice".into());
209 let bob = make_peer_info("bob");
210 let bob_id = bob.peer_id;
211 disc.add_peer(bob);
212
213 assert_eq!(disc.list_peers().len(), 1);
214 assert_eq!(disc.get_peer(&bob_id).unwrap().name, "bob");
215 }
216
217 #[test]
218 fn test_remove_peer() {
219 let mut disc = PeerDiscovery::new("alice".into());
220 let bob = make_peer_info("bob");
221 let bob_id = bob.peer_id;
222 disc.add_peer(bob);
223
224 let removed = disc.remove_peer(&bob_id);
225 assert!(removed.is_some());
226 assert!(disc.list_peers().is_empty());
227 }
228
229 #[test]
230 fn test_connect_and_disconnect() {
231 let mut disc = PeerDiscovery::new("alice".into());
232 let bob = make_peer_info("bob");
233 let bob_id = bob.peer_id;
234 disc.add_peer(bob);
235
236 disc.connect(&bob_id).unwrap();
237 assert_eq!(disc.connected_peers().len(), 1);
238 assert_eq!(
239 disc.get_peer(&bob_id).unwrap().status,
240 PeerStatus::Connected
241 );
242
243 disc.disconnect(&bob_id).unwrap();
244 assert!(disc.connected_peers().is_empty());
245 assert_eq!(
246 disc.get_peer(&bob_id).unwrap().status,
247 PeerStatus::Disconnected
248 );
249 }
250
251 #[test]
252 fn test_connect_unknown_peer() {
253 let mut disc = PeerDiscovery::new("alice".into());
254 let result = disc.connect(&Uuid::new_v4());
255 assert!(result.is_err());
256 }
257
258 #[test]
259 fn test_record_sync() {
260 let mut disc = PeerDiscovery::new("alice".into());
261 let bob = make_peer_info("bob");
262 let bob_id = bob.peer_id;
263 disc.add_peer(bob);
264 disc.connect(&bob_id).unwrap();
265
266 disc.record_sync(&bob_id, 1024, 2048).unwrap();
267 let conn = &disc.connected_peers()[0];
268 assert_eq!(conn.bytes_sent, 1024);
269 assert_eq!(conn.bytes_received, 2048);
270 assert_eq!(conn.syncs_completed, 1);
271 assert!(conn.peer.last_sync.is_some());
272 }
273
274 #[test]
275 fn test_record_sync_not_connected() {
276 let mut disc = PeerDiscovery::new("alice".into());
277 assert!(disc.record_sync(&Uuid::new_v4(), 0, 0).is_err());
278 }
279
280 #[test]
281 fn test_peer_status_serialization() {
282 let statuses = vec![
283 PeerStatus::Discovered,
284 PeerStatus::Connected,
285 PeerStatus::Disconnected,
286 PeerStatus::Removed,
287 ];
288 for status in statuses {
289 let json = serde_json::to_string(&status).unwrap();
290 let back: PeerStatus = serde_json::from_str(&json).unwrap();
291 assert_eq!(status, back);
292 }
293 }
294
295 #[test]
296 fn test_peer_transport_serialization() {
297 let tcp = PeerTransport::Tcp {
298 host: "10.0.0.1".into(),
299 port: 8080,
300 };
301 let json = serde_json::to_string(&tcp).unwrap();
302 let back: PeerTransport = serde_json::from_str(&json).unwrap();
303 assert_eq!(tcp, back);
304
305 let relay = PeerTransport::Relay {
306 relay_url: "wss://relay.mur.run".into(),
307 };
308 let json = serde_json::to_string(&relay).unwrap();
309 let back: PeerTransport = serde_json::from_str(&json).unwrap();
310 assert_eq!(relay, back);
311 }
312
313 #[test]
314 fn test_peer_connection_new() {
315 let peer = make_peer_info("bob");
316 let conn = PeerConnection::new(peer);
317 assert_eq!(conn.peer.status, PeerStatus::Connected);
318 assert_eq!(conn.bytes_sent, 0);
319 assert_eq!(conn.bytes_received, 0);
320 assert_eq!(conn.syncs_completed, 0);
321 }
322}