Skip to main content

brainwires_agent_network/network/
manager.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use anyhow::{Result, bail};
5use tokio::sync::{RwLock, broadcast};
6use uuid::Uuid;
7
8use crate::discovery::Discovery;
9use crate::identity::AgentIdentity;
10use crate::network::event::TransportType;
11use crate::network::{MessageEnvelope, MessageTarget, NetworkEvent, Payload};
12use crate::routing::{BroadcastRouter, ContentRouter, DirectRouter, PeerTable, Router};
13use crate::transport::{Transport, TransportAddress};
14
15/// The user-facing API for the networking stack.
16///
17/// `NetworkManager` ties together all five layers (identity, transport,
18/// routing, discovery, application) and provides a simple interface for
19/// sending messages, discovering peers, and subscribing to network events.
20///
21/// # Example
22///
23/// ```rust,ignore
24/// let manager = NetworkManagerBuilder::new(my_identity)
25///     .add_transport(IpcTransport::new(None))
26///     .with_discovery(ManualDiscovery::new())
27///     .build();
28///
29/// manager.send(peer_id, "hello").await?;
30///
31/// let mut events = manager.subscribe();
32/// while let Ok(event) = events.recv().await {
33///     // handle events
34/// }
35/// ```
36pub struct NetworkManager {
37    /// This agent's identity.
38    identity: AgentIdentity,
39    /// Connected transports keyed by type.
40    transports: HashMap<TransportType, Box<dyn Transport>>,
41    /// Registered routers.
42    direct_router: DirectRouter,
43    broadcast_router: BroadcastRouter,
44    content_router: ContentRouter,
45    /// Custom router (optional override).
46    custom_router: Option<Box<dyn Router>>,
47    /// Discovery services.
48    discoveries: Vec<Box<dyn Discovery>>,
49    /// Peer table (shared with routers).
50    peer_table: Arc<RwLock<PeerTable>>,
51    /// Event broadcast channel.
52    event_tx: broadcast::Sender<NetworkEvent>,
53}
54
55impl NetworkManager {
56    /// Get this agent's identity.
57    pub fn identity(&self) -> &AgentIdentity {
58        &self.identity
59    }
60
61    /// Subscribe to network events.
62    pub fn subscribe(&self) -> broadcast::Receiver<NetworkEvent> {
63        self.event_tx.subscribe()
64    }
65
66    /// Get a read lock on the peer table.
67    pub async fn peer_table(&self) -> tokio::sync::RwLockReadGuard<'_, PeerTable> {
68        self.peer_table.read().await
69    }
70
71    /// Get a write lock on the peer table.
72    pub async fn peer_table_mut(&self) -> tokio::sync::RwLockWriteGuard<'_, PeerTable> {
73        self.peer_table.write().await
74    }
75
76    /// List all known peers.
77    pub async fn peers(&self) -> Vec<AgentIdentity> {
78        self.peer_table.read().await.all_peers().cloned().collect()
79    }
80
81    /// Send a message to a specific peer.
82    pub async fn send(&self, target: Uuid, payload: impl Into<Payload>) -> Result<()> {
83        let envelope = MessageEnvelope::direct(self.identity.id, target, payload);
84        self.send_envelope(envelope).await
85    }
86
87    /// Broadcast a message to all known peers.
88    pub async fn broadcast(&self, payload: impl Into<Payload>) -> Result<()> {
89        let envelope = MessageEnvelope::broadcast(self.identity.id, payload);
90        self.send_envelope(envelope).await
91    }
92
93    /// Publish a message to a topic.
94    pub async fn publish(
95        &self,
96        topic: impl Into<String>,
97        payload: impl Into<Payload>,
98    ) -> Result<()> {
99        let envelope = MessageEnvelope::topic(self.identity.id, topic, payload);
100        self.send_envelope(envelope).await
101    }
102
103    /// Send a pre-built envelope.
104    pub async fn send_envelope(&self, envelope: MessageEnvelope) -> Result<()> {
105        let peer_table = self.peer_table.read().await;
106
107        // Route the message
108        let addresses = match &envelope.recipient {
109            MessageTarget::Direct(_) => {
110                if let Some(router) = &self.custom_router {
111                    router.route(&envelope, &peer_table).await?
112                } else {
113                    self.direct_router.route(&envelope, &peer_table).await?
114                }
115            }
116            MessageTarget::Broadcast => self.broadcast_router.route(&envelope, &peer_table).await?,
117            MessageTarget::Topic(_) => self.content_router.route(&envelope, &peer_table).await?,
118        };
119
120        drop(peer_table);
121
122        if addresses.is_empty() {
123            bail!("No delivery addresses resolved for message");
124        }
125
126        // Deliver to each address via the appropriate transport
127        for addr in &addresses {
128            let transport_type = transport_type_for_address(addr);
129            if let Some(transport) = self.transports.get(&transport_type) {
130                transport.send(&envelope).await?;
131            } else {
132                tracing::warn!(
133                    "No transport available for address {addr} (type {transport_type:?})"
134                );
135            }
136        }
137
138        Ok(())
139    }
140
141    /// Add a transport to the manager.
142    pub fn add_transport(&mut self, transport: Box<dyn Transport>) {
143        let t = transport.transport_type();
144        self.transports.insert(t, transport);
145    }
146
147    /// Set a custom router (overrides the default direct router for
148    /// point-to-point messages).
149    pub fn set_custom_router(&mut self, router: Box<dyn Router>) {
150        self.custom_router = Some(router);
151    }
152
153    /// Add a discovery service.
154    pub fn add_discovery(&mut self, discovery: Box<dyn Discovery>) {
155        self.discoveries.push(discovery);
156    }
157
158    /// Register this agent with all discovery services.
159    pub async fn register_self(&self) -> Result<()> {
160        for d in &self.discoveries {
161            d.register(&self.identity).await?;
162        }
163        Ok(())
164    }
165
166    /// Deregister this agent from all discovery services.
167    pub async fn deregister_self(&self) -> Result<()> {
168        for d in &self.discoveries {
169            d.deregister(&self.identity.id).await?;
170        }
171        Ok(())
172    }
173
174    /// Run discovery across all services and update the peer table.
175    pub async fn discover_peers(&self) -> Result<Vec<AgentIdentity>> {
176        let mut all_peers = Vec::new();
177
178        for d in &self.discoveries {
179            match d.discover().await {
180                Ok(peers) => all_peers.extend(peers),
181                Err(e) => {
182                    tracing::warn!("Discovery via {:?} failed: {e}", d.protocol());
183                }
184            }
185        }
186
187        // Deduplicate by UUID
188        let mut seen = std::collections::HashSet::new();
189        all_peers.retain(|p| seen.insert(p.id));
190
191        // Update peer table and emit events
192        let mut table = self.peer_table.write().await;
193        for peer in &all_peers {
194            if peer.id == self.identity.id {
195                continue; // Don't add self
196            }
197            if table.get(&peer.id).is_none() {
198                // New peer
199                let addrs = endpoint_to_addresses(peer);
200                table.upsert(peer.clone(), addrs);
201                let _ = self.event_tx.send(NetworkEvent::PeerJoined(peer.clone()));
202            }
203        }
204
205        Ok(all_peers)
206    }
207
208    /// Emit a network event.
209    pub fn emit(&self, event: NetworkEvent) {
210        let _ = self.event_tx.send(event);
211    }
212}
213
214/// Builder for [`NetworkManager`].
215pub struct NetworkManagerBuilder {
216    identity: AgentIdentity,
217    transports: HashMap<TransportType, Box<dyn Transport>>,
218    custom_router: Option<Box<dyn Router>>,
219    discoveries: Vec<Box<dyn Discovery>>,
220    event_buffer: usize,
221}
222
223impl NetworkManagerBuilder {
224    /// Start building a NetworkManager for the given agent identity.
225    pub fn new(identity: AgentIdentity) -> Self {
226        Self {
227            identity,
228            transports: HashMap::new(),
229            custom_router: None,
230            discoveries: Vec::new(),
231            event_buffer: 256,
232        }
233    }
234
235    /// Add a transport.
236    pub fn add_transport(mut self, transport: Box<dyn Transport>) -> Self {
237        let t = transport.transport_type();
238        self.transports.insert(t, transport);
239        self
240    }
241
242    /// Set a custom router for direct messages.
243    pub fn with_router(mut self, router: Box<dyn Router>) -> Self {
244        self.custom_router = Some(router);
245        self
246    }
247
248    /// Add a discovery service.
249    pub fn add_discovery(mut self, discovery: Box<dyn Discovery>) -> Self {
250        self.discoveries.push(discovery);
251        self
252    }
253
254    /// Set the event broadcast buffer size (default: 256).
255    pub fn event_buffer(mut self, size: usize) -> Self {
256        self.event_buffer = size;
257        self
258    }
259
260    /// Build the [`NetworkManager`].
261    pub fn build(self) -> NetworkManager {
262        let (event_tx, _) = broadcast::channel(self.event_buffer);
263
264        NetworkManager {
265            identity: self.identity,
266            transports: self.transports,
267            direct_router: DirectRouter::new(),
268            broadcast_router: BroadcastRouter::new(),
269            content_router: ContentRouter::new(),
270            custom_router: self.custom_router,
271            discoveries: self.discoveries,
272            peer_table: Arc::new(RwLock::new(PeerTable::new())),
273            event_tx,
274        }
275    }
276}
277
278/// Infer the transport type from a transport address.
279fn transport_type_for_address(addr: &TransportAddress) -> TransportType {
280    match addr {
281        TransportAddress::Unix(_) => TransportType::Ipc,
282        TransportAddress::Tcp(_) => TransportType::Tcp,
283        TransportAddress::Url(_) => TransportType::Remote,
284        TransportAddress::Channel(_) => TransportType::PubSub,
285    }
286}
287
288/// Extract transport addresses from an agent's advertised endpoint.
289fn endpoint_to_addresses(identity: &AgentIdentity) -> Vec<TransportAddress> {
290    let Some(endpoint) = &identity.agent_card.endpoint else {
291        return Vec::new();
292    };
293
294    if let Some(path) = endpoint.strip_prefix("unix://") {
295        vec![TransportAddress::Unix(path.into())]
296    } else if let Some(addr) = endpoint.strip_prefix("tcp://") {
297        if let Ok(sock) = addr.parse() {
298            vec![TransportAddress::Tcp(sock)]
299        } else {
300            vec![TransportAddress::Url(endpoint.clone())]
301        }
302    } else {
303        vec![TransportAddress::Url(endpoint.clone())]
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310    use crate::discovery::ManualDiscovery;
311
312    #[tokio::test]
313    async fn builder_creates_manager() {
314        let identity = AgentIdentity::new("test-agent");
315        let manager = NetworkManagerBuilder::new(identity.clone())
316            .add_discovery(Box::new(ManualDiscovery::new()))
317            .build();
318
319        assert_eq!(manager.identity().name, "test-agent");
320        assert!(manager.peers().await.is_empty());
321    }
322
323    #[tokio::test]
324    async fn discover_peers_populates_table() {
325        let agent_a = AgentIdentity::new("agent-a");
326        let agent_b = AgentIdentity::new("agent-b");
327
328        let discovery = ManualDiscovery::with_peers(vec![agent_b.clone()]);
329
330        let manager = NetworkManagerBuilder::new(agent_a)
331            .add_discovery(Box::new(discovery))
332            .build();
333
334        let mut events = manager.subscribe();
335
336        let found = manager.discover_peers().await.unwrap();
337        assert_eq!(found.len(), 1);
338
339        let peers = manager.peers().await;
340        assert_eq!(peers.len(), 1);
341        assert_eq!(peers[0].name, "agent-b");
342
343        // Should have emitted PeerJoined
344        let event = events.try_recv().unwrap();
345        match event {
346            NetworkEvent::PeerJoined(p) => assert_eq!(p.id, agent_b.id),
347            _ => panic!("expected PeerJoined"),
348        }
349    }
350
351    #[tokio::test]
352    async fn register_and_deregister_self() {
353        let identity = AgentIdentity::new("self");
354        let discovery = ManualDiscovery::new();
355
356        let manager = NetworkManagerBuilder::new(identity.clone())
357            .add_discovery(Box::new(discovery.clone()))
358            .build();
359
360        manager.register_self().await.unwrap();
361
362        let peers = discovery.discover().await.unwrap();
363        assert_eq!(peers.len(), 1);
364
365        manager.deregister_self().await.unwrap();
366
367        let peers = discovery.discover().await.unwrap();
368        assert!(peers.is_empty());
369    }
370
371    #[test]
372    fn transport_type_inference() {
373        assert_eq!(
374            transport_type_for_address(&TransportAddress::Unix("/tmp/test.sock".into())),
375            TransportType::Ipc
376        );
377        assert_eq!(
378            transport_type_for_address(&TransportAddress::Tcp("127.0.0.1:9090".parse().unwrap())),
379            TransportType::Tcp
380        );
381        assert_eq!(
382            transport_type_for_address(&TransportAddress::Url("https://example.com".into())),
383            TransportType::Remote
384        );
385        assert_eq!(
386            transport_type_for_address(&TransportAddress::Channel("events".into())),
387            TransportType::PubSub
388        );
389    }
390
391    #[test]
392    fn endpoint_parsing() {
393        let mut identity = AgentIdentity::new("test");
394
395        identity.agent_card.endpoint = Some("unix:///tmp/agent.sock".into());
396        let addrs = endpoint_to_addresses(&identity);
397        assert_eq!(
398            addrs,
399            vec![TransportAddress::Unix("/tmp/agent.sock".into())]
400        );
401
402        identity.agent_card.endpoint = Some("tcp://127.0.0.1:9090".into());
403        let addrs = endpoint_to_addresses(&identity);
404        assert_eq!(
405            addrs,
406            vec![TransportAddress::Tcp("127.0.0.1:9090".parse().unwrap())]
407        );
408
409        identity.agent_card.endpoint = Some("https://api.example.com".into());
410        let addrs = endpoint_to_addresses(&identity);
411        assert_eq!(
412            addrs,
413            vec![TransportAddress::Url("https://api.example.com".into())]
414        );
415
416        identity.agent_card.endpoint = None;
417        let addrs = endpoint_to_addresses(&identity);
418        assert!(addrs.is_empty());
419    }
420}