brainwires_agent_network/network/
manager.rs1use std::collections::HashMap;
2use std::sync::Arc;
3
4use anyhow::{Result, bail};
5use tokio::sync::{RwLock, broadcast};
6use uuid::Uuid;
7
8use crate::discovery::Discovery;
9use crate::identity::AgentIdentity;
10use crate::network::event::TransportType;
11use crate::network::{MessageEnvelope, MessageTarget, NetworkEvent, Payload};
12use crate::routing::{BroadcastRouter, ContentRouter, DirectRouter, PeerTable, Router};
13use crate::transport::{Transport, TransportAddress};
14
15pub struct NetworkManager {
37 identity: AgentIdentity,
39 transports: HashMap<TransportType, Box<dyn Transport>>,
41 direct_router: DirectRouter,
43 broadcast_router: BroadcastRouter,
44 content_router: ContentRouter,
45 custom_router: Option<Box<dyn Router>>,
47 discoveries: Vec<Box<dyn Discovery>>,
49 peer_table: Arc<RwLock<PeerTable>>,
51 event_tx: broadcast::Sender<NetworkEvent>,
53}
54
55impl NetworkManager {
56 pub fn identity(&self) -> &AgentIdentity {
58 &self.identity
59 }
60
61 pub fn subscribe(&self) -> broadcast::Receiver<NetworkEvent> {
63 self.event_tx.subscribe()
64 }
65
66 pub async fn peer_table(&self) -> tokio::sync::RwLockReadGuard<'_, PeerTable> {
68 self.peer_table.read().await
69 }
70
71 pub async fn peer_table_mut(&self) -> tokio::sync::RwLockWriteGuard<'_, PeerTable> {
73 self.peer_table.write().await
74 }
75
76 pub async fn peers(&self) -> Vec<AgentIdentity> {
78 self.peer_table.read().await.all_peers().cloned().collect()
79 }
80
81 pub async fn send(&self, target: Uuid, payload: impl Into<Payload>) -> Result<()> {
83 let envelope = MessageEnvelope::direct(self.identity.id, target, payload);
84 self.send_envelope(envelope).await
85 }
86
87 pub async fn broadcast(&self, payload: impl Into<Payload>) -> Result<()> {
89 let envelope = MessageEnvelope::broadcast(self.identity.id, payload);
90 self.send_envelope(envelope).await
91 }
92
93 pub async fn publish(
95 &self,
96 topic: impl Into<String>,
97 payload: impl Into<Payload>,
98 ) -> Result<()> {
99 let envelope = MessageEnvelope::topic(self.identity.id, topic, payload);
100 self.send_envelope(envelope).await
101 }
102
103 pub async fn send_envelope(&self, envelope: MessageEnvelope) -> Result<()> {
105 let peer_table = self.peer_table.read().await;
106
107 let addresses = match &envelope.recipient {
109 MessageTarget::Direct(_) => {
110 if let Some(router) = &self.custom_router {
111 router.route(&envelope, &peer_table).await?
112 } else {
113 self.direct_router.route(&envelope, &peer_table).await?
114 }
115 }
116 MessageTarget::Broadcast => self.broadcast_router.route(&envelope, &peer_table).await?,
117 MessageTarget::Topic(_) => self.content_router.route(&envelope, &peer_table).await?,
118 };
119
120 drop(peer_table);
121
122 if addresses.is_empty() {
123 bail!("No delivery addresses resolved for message");
124 }
125
126 for addr in &addresses {
128 let transport_type = transport_type_for_address(addr);
129 if let Some(transport) = self.transports.get(&transport_type) {
130 transport.send(&envelope).await?;
131 } else {
132 tracing::warn!(
133 "No transport available for address {addr} (type {transport_type:?})"
134 );
135 }
136 }
137
138 Ok(())
139 }
140
141 pub fn add_transport(&mut self, transport: Box<dyn Transport>) {
143 let t = transport.transport_type();
144 self.transports.insert(t, transport);
145 }
146
147 pub fn set_custom_router(&mut self, router: Box<dyn Router>) {
150 self.custom_router = Some(router);
151 }
152
153 pub fn add_discovery(&mut self, discovery: Box<dyn Discovery>) {
155 self.discoveries.push(discovery);
156 }
157
158 pub async fn register_self(&self) -> Result<()> {
160 for d in &self.discoveries {
161 d.register(&self.identity).await?;
162 }
163 Ok(())
164 }
165
166 pub async fn deregister_self(&self) -> Result<()> {
168 for d in &self.discoveries {
169 d.deregister(&self.identity.id).await?;
170 }
171 Ok(())
172 }
173
174 pub async fn discover_peers(&self) -> Result<Vec<AgentIdentity>> {
176 let mut all_peers = Vec::new();
177
178 for d in &self.discoveries {
179 match d.discover().await {
180 Ok(peers) => all_peers.extend(peers),
181 Err(e) => {
182 tracing::warn!("Discovery via {:?} failed: {e}", d.protocol());
183 }
184 }
185 }
186
187 let mut seen = std::collections::HashSet::new();
189 all_peers.retain(|p| seen.insert(p.id));
190
191 let mut table = self.peer_table.write().await;
193 for peer in &all_peers {
194 if peer.id == self.identity.id {
195 continue; }
197 if table.get(&peer.id).is_none() {
198 let addrs = endpoint_to_addresses(peer);
200 table.upsert(peer.clone(), addrs);
201 let _ = self.event_tx.send(NetworkEvent::PeerJoined(peer.clone()));
202 }
203 }
204
205 Ok(all_peers)
206 }
207
208 pub fn emit(&self, event: NetworkEvent) {
210 let _ = self.event_tx.send(event);
211 }
212}
213
214pub struct NetworkManagerBuilder {
216 identity: AgentIdentity,
217 transports: HashMap<TransportType, Box<dyn Transport>>,
218 custom_router: Option<Box<dyn Router>>,
219 discoveries: Vec<Box<dyn Discovery>>,
220 event_buffer: usize,
221}
222
223impl NetworkManagerBuilder {
224 pub fn new(identity: AgentIdentity) -> Self {
226 Self {
227 identity,
228 transports: HashMap::new(),
229 custom_router: None,
230 discoveries: Vec::new(),
231 event_buffer: 256,
232 }
233 }
234
235 pub fn add_transport(mut self, transport: Box<dyn Transport>) -> Self {
237 let t = transport.transport_type();
238 self.transports.insert(t, transport);
239 self
240 }
241
242 pub fn with_router(mut self, router: Box<dyn Router>) -> Self {
244 self.custom_router = Some(router);
245 self
246 }
247
248 pub fn add_discovery(mut self, discovery: Box<dyn Discovery>) -> Self {
250 self.discoveries.push(discovery);
251 self
252 }
253
254 pub fn event_buffer(mut self, size: usize) -> Self {
256 self.event_buffer = size;
257 self
258 }
259
260 pub fn build(self) -> NetworkManager {
262 let (event_tx, _) = broadcast::channel(self.event_buffer);
263
264 NetworkManager {
265 identity: self.identity,
266 transports: self.transports,
267 direct_router: DirectRouter::new(),
268 broadcast_router: BroadcastRouter::new(),
269 content_router: ContentRouter::new(),
270 custom_router: self.custom_router,
271 discoveries: self.discoveries,
272 peer_table: Arc::new(RwLock::new(PeerTable::new())),
273 event_tx,
274 }
275 }
276}
277
278fn transport_type_for_address(addr: &TransportAddress) -> TransportType {
280 match addr {
281 TransportAddress::Unix(_) => TransportType::Ipc,
282 TransportAddress::Tcp(_) => TransportType::Tcp,
283 TransportAddress::Url(_) => TransportType::Remote,
284 TransportAddress::Channel(_) => TransportType::PubSub,
285 }
286}
287
288fn endpoint_to_addresses(identity: &AgentIdentity) -> Vec<TransportAddress> {
290 let Some(endpoint) = &identity.agent_card.endpoint else {
291 return Vec::new();
292 };
293
294 if let Some(path) = endpoint.strip_prefix("unix://") {
295 vec![TransportAddress::Unix(path.into())]
296 } else if let Some(addr) = endpoint.strip_prefix("tcp://") {
297 if let Ok(sock) = addr.parse() {
298 vec![TransportAddress::Tcp(sock)]
299 } else {
300 vec![TransportAddress::Url(endpoint.clone())]
301 }
302 } else {
303 vec![TransportAddress::Url(endpoint.clone())]
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310 use crate::discovery::ManualDiscovery;
311
312 #[tokio::test]
313 async fn builder_creates_manager() {
314 let identity = AgentIdentity::new("test-agent");
315 let manager = NetworkManagerBuilder::new(identity.clone())
316 .add_discovery(Box::new(ManualDiscovery::new()))
317 .build();
318
319 assert_eq!(manager.identity().name, "test-agent");
320 assert!(manager.peers().await.is_empty());
321 }
322
323 #[tokio::test]
324 async fn discover_peers_populates_table() {
325 let agent_a = AgentIdentity::new("agent-a");
326 let agent_b = AgentIdentity::new("agent-b");
327
328 let discovery = ManualDiscovery::with_peers(vec![agent_b.clone()]);
329
330 let manager = NetworkManagerBuilder::new(agent_a)
331 .add_discovery(Box::new(discovery))
332 .build();
333
334 let mut events = manager.subscribe();
335
336 let found = manager.discover_peers().await.unwrap();
337 assert_eq!(found.len(), 1);
338
339 let peers = manager.peers().await;
340 assert_eq!(peers.len(), 1);
341 assert_eq!(peers[0].name, "agent-b");
342
343 let event = events.try_recv().unwrap();
345 match event {
346 NetworkEvent::PeerJoined(p) => assert_eq!(p.id, agent_b.id),
347 _ => panic!("expected PeerJoined"),
348 }
349 }
350
351 #[tokio::test]
352 async fn register_and_deregister_self() {
353 let identity = AgentIdentity::new("self");
354 let discovery = ManualDiscovery::new();
355
356 let manager = NetworkManagerBuilder::new(identity.clone())
357 .add_discovery(Box::new(discovery.clone()))
358 .build();
359
360 manager.register_self().await.unwrap();
361
362 let peers = discovery.discover().await.unwrap();
363 assert_eq!(peers.len(), 1);
364
365 manager.deregister_self().await.unwrap();
366
367 let peers = discovery.discover().await.unwrap();
368 assert!(peers.is_empty());
369 }
370
371 #[test]
372 fn transport_type_inference() {
373 assert_eq!(
374 transport_type_for_address(&TransportAddress::Unix("/tmp/test.sock".into())),
375 TransportType::Ipc
376 );
377 assert_eq!(
378 transport_type_for_address(&TransportAddress::Tcp("127.0.0.1:9090".parse().unwrap())),
379 TransportType::Tcp
380 );
381 assert_eq!(
382 transport_type_for_address(&TransportAddress::Url("https://example.com".into())),
383 TransportType::Remote
384 );
385 assert_eq!(
386 transport_type_for_address(&TransportAddress::Channel("events".into())),
387 TransportType::PubSub
388 );
389 }
390
391 #[test]
392 fn endpoint_parsing() {
393 let mut identity = AgentIdentity::new("test");
394
395 identity.agent_card.endpoint = Some("unix:///tmp/agent.sock".into());
396 let addrs = endpoint_to_addresses(&identity);
397 assert_eq!(
398 addrs,
399 vec![TransportAddress::Unix("/tmp/agent.sock".into())]
400 );
401
402 identity.agent_card.endpoint = Some("tcp://127.0.0.1:9090".into());
403 let addrs = endpoint_to_addresses(&identity);
404 assert_eq!(
405 addrs,
406 vec![TransportAddress::Tcp("127.0.0.1:9090".parse().unwrap())]
407 );
408
409 identity.agent_card.endpoint = Some("https://api.example.com".into());
410 let addrs = endpoint_to_addresses(&identity);
411 assert_eq!(
412 addrs,
413 vec![TransportAddress::Url("https://api.example.com".into())]
414 );
415
416 identity.agent_card.endpoint = None;
417 let addrs = endpoint_to_addresses(&identity);
418 assert!(addrs.is_empty());
419 }
420}