brainwires_network/routing/
peer_table.rs1use std::collections::{HashMap, HashSet};
2
3use uuid::Uuid;
4
5use crate::identity::AgentIdentity;
6use crate::transport::TransportAddress;
7
8#[derive(Debug, Default)]
14pub struct PeerTable {
15 peers: HashMap<Uuid, AgentIdentity>,
17 addresses: HashMap<Uuid, Vec<TransportAddress>>,
19 subscriptions: HashMap<String, HashSet<Uuid>>,
21}
22
23impl PeerTable {
24 pub fn new() -> Self {
26 Self::default()
27 }
28
29 pub fn upsert(&mut self, identity: AgentIdentity, addresses: Vec<TransportAddress>) {
31 let id = identity.id;
32 self.peers.insert(id, identity);
33 self.addresses.insert(id, addresses);
34 }
35
36 pub fn remove(&mut self, id: &Uuid) -> Option<AgentIdentity> {
38 self.addresses.remove(id);
39 for subs in self.subscriptions.values_mut() {
41 subs.remove(id);
42 }
43 self.peers.remove(id)
44 }
45
46 pub fn get(&self, id: &Uuid) -> Option<&AgentIdentity> {
48 self.peers.get(id)
49 }
50
51 pub fn addresses(&self, id: &Uuid) -> Option<&[TransportAddress]> {
53 self.addresses.get(id).map(|v| v.as_slice())
54 }
55
56 pub fn all_peers(&self) -> impl Iterator<Item = &AgentIdentity> {
58 self.peers.values()
59 }
60
61 pub fn all_peer_ids(&self) -> impl Iterator<Item = &Uuid> {
63 self.peers.keys()
64 }
65
66 pub fn len(&self) -> usize {
68 self.peers.len()
69 }
70
71 pub fn is_empty(&self) -> bool {
73 self.peers.is_empty()
74 }
75
76 pub fn subscribe(&mut self, peer_id: Uuid, topic: impl Into<String>) {
78 self.subscriptions
79 .entry(topic.into())
80 .or_default()
81 .insert(peer_id);
82 }
83
84 pub fn unsubscribe(&mut self, peer_id: &Uuid, topic: &str) {
86 if let Some(subs) = self.subscriptions.get_mut(topic) {
87 subs.remove(peer_id);
88 if subs.is_empty() {
89 self.subscriptions.remove(topic);
90 }
91 }
92 }
93
94 pub fn subscribers(&self, topic: &str) -> Vec<Uuid> {
96 self.subscriptions
97 .get(topic)
98 .map(|s| s.iter().copied().collect())
99 .unwrap_or_default()
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 use super::*;
106
107 fn make_peer(name: &str) -> (AgentIdentity, Vec<TransportAddress>) {
108 let identity = AgentIdentity::new(name);
109 let addrs = vec![TransportAddress::Tcp("127.0.0.1:9090".parse().unwrap())];
110 (identity, addrs)
111 }
112
113 #[test]
114 fn upsert_and_get() {
115 let mut table = PeerTable::new();
116 let (identity, addrs) = make_peer("agent-a");
117 let id = identity.id;
118
119 table.upsert(identity, addrs);
120
121 assert_eq!(table.len(), 1);
122 assert!(!table.is_empty());
123 assert!(table.get(&id).is_some());
124 assert_eq!(table.get(&id).unwrap().name, "agent-a");
125 assert_eq!(table.addresses(&id).unwrap().len(), 1);
126 }
127
128 #[test]
129 fn remove_peer() {
130 let mut table = PeerTable::new();
131 let (identity, addrs) = make_peer("agent-b");
132 let id = identity.id;
133
134 table.upsert(identity, addrs);
135 let removed = table.remove(&id);
136 assert!(removed.is_some());
137 assert_eq!(table.len(), 0);
138 assert!(table.get(&id).is_none());
139 }
140
141 #[test]
142 fn topic_subscriptions() {
143 let mut table = PeerTable::new();
144 let (id_a, addrs_a) = make_peer("a");
145 let (id_b, addrs_b) = make_peer("b");
146 let uuid_a = id_a.id;
147 let uuid_b = id_b.id;
148
149 table.upsert(id_a, addrs_a);
150 table.upsert(id_b, addrs_b);
151
152 table.subscribe(uuid_a, "status");
153 table.subscribe(uuid_b, "status");
154 table.subscribe(uuid_a, "errors");
155
156 assert_eq!(table.subscribers("status").len(), 2);
157 assert_eq!(table.subscribers("errors").len(), 1);
158 assert!(table.subscribers("unknown").is_empty());
159
160 table.unsubscribe(&uuid_a, "status");
161 assert_eq!(table.subscribers("status").len(), 1);
162 }
163
164 #[test]
165 fn remove_peer_cleans_subscriptions() {
166 let mut table = PeerTable::new();
167 let (identity, addrs) = make_peer("agent-c");
168 let id = identity.id;
169
170 table.upsert(identity, addrs);
171 table.subscribe(id, "events");
172 assert_eq!(table.subscribers("events").len(), 1);
173
174 table.remove(&id);
175 assert!(table.subscribers("events").is_empty());
176 }
177}