Skip to main content

mur_core/team/
peer.rs

1//! Peer discovery and connection management for team sync.
2//!
3//! Handles finding team members on the network and maintaining
4//! connections for CRDT state exchange.
5
6use std::collections::HashMap;
7
8use chrono::{DateTime, Utc};
9use schemars::JsonSchema;
10use serde::{Deserialize, Serialize};
11use uuid::Uuid;
12
13/// Status of a peer connection.
14#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
15#[serde(rename_all = "snake_case")]
16pub enum PeerStatus {
17    /// Peer discovered but not yet connected.
18    Discovered,
19    /// Actively connected and syncing.
20    Connected,
21    /// Connection lost, will attempt reconnect.
22    Disconnected,
23    /// Peer explicitly removed from the team.
24    Removed,
25}
26
27/// Transport method for peer communication.
28#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
29#[serde(rename_all = "snake_case")]
30pub enum PeerTransport {
31    /// Direct TCP connection (LAN).
32    Tcp { host: String, port: u16 },
33    /// Via mur.run relay server.
34    Relay { relay_url: String },
35}
36
37/// A known team peer.
38#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
39pub struct PeerInfo {
40    pub peer_id: Uuid,
41    pub name: String,
42    pub transport: PeerTransport,
43    pub status: PeerStatus,
44    pub last_seen: DateTime<Utc>,
45    pub last_sync: Option<DateTime<Utc>>,
46}
47
48/// An active connection to a peer.
49#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
50pub struct PeerConnection {
51    pub peer: PeerInfo,
52    pub connected_at: DateTime<Utc>,
53    pub bytes_sent: u64,
54    pub bytes_received: u64,
55    pub syncs_completed: u64,
56}
57
58impl PeerConnection {
59    /// Create a new connection from peer info.
60    pub fn new(peer: PeerInfo) -> Self {
61        Self {
62            peer: PeerInfo {
63                status: PeerStatus::Connected,
64                ..peer
65            },
66            connected_at: Utc::now(),
67            bytes_sent: 0,
68            bytes_received: 0,
69            syncs_completed: 0,
70        }
71    }
72
73    /// Record a completed sync exchange.
74    pub fn record_sync(&mut self, sent: u64, received: u64) {
75        self.bytes_sent += sent;
76        self.bytes_received += received;
77        self.syncs_completed += 1;
78        self.peer.last_sync = Some(Utc::now());
79        self.peer.last_seen = Utc::now();
80    }
81}
82
83/// Discovery and connection management for team peers.
84#[derive(Debug)]
85pub struct PeerDiscovery {
86    local_peer_id: Uuid,
87    local_name: String,
88    peers: HashMap<Uuid, PeerInfo>,
89    connections: HashMap<Uuid, PeerConnection>,
90}
91
92impl PeerDiscovery {
93    /// Create a new peer discovery instance.
94    pub fn new(local_name: String) -> Self {
95        Self {
96            local_peer_id: Uuid::new_v4(),
97            local_name,
98            peers: HashMap::new(),
99            connections: HashMap::new(),
100        }
101    }
102
103    /// Get the local peer ID.
104    pub fn local_peer_id(&self) -> Uuid {
105        self.local_peer_id
106    }
107
108    /// Get the local peer name.
109    pub fn local_name(&self) -> &str {
110        &self.local_name
111    }
112
113    /// Register a discovered peer.
114    pub fn add_peer(&mut self, peer: PeerInfo) {
115        self.peers.insert(peer.peer_id, peer);
116    }
117
118    /// Remove a peer and disconnect if connected.
119    pub fn remove_peer(&mut self, peer_id: &Uuid) -> Option<PeerInfo> {
120        self.connections.remove(peer_id);
121        self.peers.remove(peer_id)
122    }
123
124    /// Get info about a specific peer.
125    pub fn get_peer(&self, peer_id: &Uuid) -> Option<&PeerInfo> {
126        self.peers.get(peer_id)
127    }
128
129    /// List all known peers.
130    pub fn list_peers(&self) -> Vec<&PeerInfo> {
131        self.peers.values().collect()
132    }
133
134    /// List only connected peers.
135    pub fn connected_peers(&self) -> Vec<&PeerConnection> {
136        self.connections.values().collect()
137    }
138
139    /// Establish a connection to a peer.
140    pub fn connect(&mut self, peer_id: &Uuid) -> anyhow::Result<&PeerConnection> {
141        let peer = self
142            .peers
143            .get(peer_id)
144            .ok_or_else(|| anyhow::anyhow!("unknown peer: {peer_id}"))?
145            .clone();
146
147        let conn = PeerConnection::new(peer.clone());
148        self.peers.insert(
149            *peer_id,
150            PeerInfo {
151                status: PeerStatus::Connected,
152                last_seen: Utc::now(),
153                ..peer
154            },
155        );
156        self.connections.insert(*peer_id, conn);
157        Ok(self.connections.get(peer_id).unwrap())
158    }
159
160    /// Disconnect from a peer.
161    pub fn disconnect(&mut self, peer_id: &Uuid) -> anyhow::Result<()> {
162        self.connections.remove(peer_id);
163        if let Some(peer) = self.peers.get_mut(peer_id) {
164            peer.status = PeerStatus::Disconnected;
165        }
166        Ok(())
167    }
168
169    /// Record a sync exchange with a peer.
170    pub fn record_sync(&mut self, peer_id: &Uuid, sent: u64, received: u64) -> anyhow::Result<()> {
171        let conn = self
172            .connections
173            .get_mut(peer_id)
174            .ok_or_else(|| anyhow::anyhow!("not connected to peer: {peer_id}"))?;
175        conn.record_sync(sent, received);
176        Ok(())
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183
184    fn make_peer_info(name: &str) -> PeerInfo {
185        PeerInfo {
186            peer_id: Uuid::new_v4(),
187            name: name.to_string(),
188            transport: PeerTransport::Tcp {
189                host: "127.0.0.1".into(),
190                port: 9900,
191            },
192            status: PeerStatus::Discovered,
193            last_seen: Utc::now(),
194            last_sync: None,
195        }
196    }
197
198    #[test]
199    fn test_peer_discovery_new() {
200        let disc = PeerDiscovery::new("alice".into());
201        assert_eq!(disc.local_name(), "alice");
202        assert!(disc.list_peers().is_empty());
203        assert!(disc.connected_peers().is_empty());
204    }
205
206    #[test]
207    fn test_add_and_list_peers() {
208        let mut disc = PeerDiscovery::new("alice".into());
209        let bob = make_peer_info("bob");
210        let bob_id = bob.peer_id;
211        disc.add_peer(bob);
212
213        assert_eq!(disc.list_peers().len(), 1);
214        assert_eq!(disc.get_peer(&bob_id).unwrap().name, "bob");
215    }
216
217    #[test]
218    fn test_remove_peer() {
219        let mut disc = PeerDiscovery::new("alice".into());
220        let bob = make_peer_info("bob");
221        let bob_id = bob.peer_id;
222        disc.add_peer(bob);
223
224        let removed = disc.remove_peer(&bob_id);
225        assert!(removed.is_some());
226        assert!(disc.list_peers().is_empty());
227    }
228
229    #[test]
230    fn test_connect_and_disconnect() {
231        let mut disc = PeerDiscovery::new("alice".into());
232        let bob = make_peer_info("bob");
233        let bob_id = bob.peer_id;
234        disc.add_peer(bob);
235
236        disc.connect(&bob_id).unwrap();
237        assert_eq!(disc.connected_peers().len(), 1);
238        assert_eq!(
239            disc.get_peer(&bob_id).unwrap().status,
240            PeerStatus::Connected
241        );
242
243        disc.disconnect(&bob_id).unwrap();
244        assert!(disc.connected_peers().is_empty());
245        assert_eq!(
246            disc.get_peer(&bob_id).unwrap().status,
247            PeerStatus::Disconnected
248        );
249    }
250
251    #[test]
252    fn test_connect_unknown_peer() {
253        let mut disc = PeerDiscovery::new("alice".into());
254        let result = disc.connect(&Uuid::new_v4());
255        assert!(result.is_err());
256    }
257
258    #[test]
259    fn test_record_sync() {
260        let mut disc = PeerDiscovery::new("alice".into());
261        let bob = make_peer_info("bob");
262        let bob_id = bob.peer_id;
263        disc.add_peer(bob);
264        disc.connect(&bob_id).unwrap();
265
266        disc.record_sync(&bob_id, 1024, 2048).unwrap();
267        let conn = &disc.connected_peers()[0];
268        assert_eq!(conn.bytes_sent, 1024);
269        assert_eq!(conn.bytes_received, 2048);
270        assert_eq!(conn.syncs_completed, 1);
271        assert!(conn.peer.last_sync.is_some());
272    }
273
274    #[test]
275    fn test_record_sync_not_connected() {
276        let mut disc = PeerDiscovery::new("alice".into());
277        assert!(disc.record_sync(&Uuid::new_v4(), 0, 0).is_err());
278    }
279
280    #[test]
281    fn test_peer_status_serialization() {
282        let statuses = vec![
283            PeerStatus::Discovered,
284            PeerStatus::Connected,
285            PeerStatus::Disconnected,
286            PeerStatus::Removed,
287        ];
288        for status in statuses {
289            let json = serde_json::to_string(&status).unwrap();
290            let back: PeerStatus = serde_json::from_str(&json).unwrap();
291            assert_eq!(status, back);
292        }
293    }
294
295    #[test]
296    fn test_peer_transport_serialization() {
297        let tcp = PeerTransport::Tcp {
298            host: "10.0.0.1".into(),
299            port: 8080,
300        };
301        let json = serde_json::to_string(&tcp).unwrap();
302        let back: PeerTransport = serde_json::from_str(&json).unwrap();
303        assert_eq!(tcp, back);
304
305        let relay = PeerTransport::Relay {
306            relay_url: "wss://relay.mur.run".into(),
307        };
308        let json = serde_json::to_string(&relay).unwrap();
309        let back: PeerTransport = serde_json::from_str(&json).unwrap();
310        assert_eq!(relay, back);
311    }
312
313    #[test]
314    fn test_peer_connection_new() {
315        let peer = make_peer_info("bob");
316        let conn = PeerConnection::new(peer);
317        assert_eq!(conn.peer.status, PeerStatus::Connected);
318        assert_eq!(conn.bytes_sent, 0);
319        assert_eq!(conn.bytes_received, 0);
320        assert_eq!(conn.syncs_completed, 0);
321    }
322}