1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
//! Remote agent proxy pool for routing A2A messages to agent binaries.
//!
//! Each running user agent is represented by an [`AgentProxy`] in the pool,
//! keyed by agent ID. The gateway looks up proxies here when routing inbound
//! messages to the correct child process.
use dashmap::DashMap;
use std::sync::Arc;
// ---------------------------------------------------------------------------
// Trait + concrete proxy
// ---------------------------------------------------------------------------
/// Trait representing a remote agent proxy that can receive A2A messages.
pub trait AgentProxy: Send + Sync + std::fmt::Debug {
/// The unique agent identifier.
#[allow(dead_code)]
fn agent_id(&self) -> &str;
/// The HTTP URL of the agent's A2A endpoint (e.g. `http://127.0.0.1:19001`).
fn agent_url(&self) -> &str;
}
/// A concrete proxy pointing at a remote agent binary's A2A endpoint.
#[derive(Debug, Clone)]
pub struct RemoteAgentProxy {
/// Agent identifier, used by the AgentProxy trait implementation.
#[allow(dead_code)]
pub id: String,
pub url: String,
/// Port the agent is listening on, stored for diagnostics and monitoring.
#[allow(dead_code)]
pub port: u16,
}
impl AgentProxy for RemoteAgentProxy {
fn agent_id(&self) -> &str {
&self.id
}
fn agent_url(&self) -> &str {
&self.url
}
}
// ---------------------------------------------------------------------------
// Pool
// ---------------------------------------------------------------------------
/// Thread-safe pool of remote agent proxies, keyed by agent ID.
pub struct RemoteAgentProxyPool {
proxies: DashMap<String, Arc<dyn AgentProxy>>,
}
impl RemoteAgentProxyPool {
/// Create an empty proxy pool.
pub fn new() -> Self {
Self {
proxies: DashMap::new(),
}
}
/// Register a new remote agent proxy for the given agent ID and port.
///
/// Creates a [`RemoteAgentProxy`] with URL `http://127.0.0.1:{port}` and
/// inserts it into the pool. If an entry already exists for `agent_id` it
/// is replaced.
pub fn register(&self, agent_id: &str, port: u16) {
let proxy = RemoteAgentProxy {
id: agent_id.to_string(),
url: format!("http://127.0.0.1:{}", port),
port,
};
self.proxies
.insert(agent_id.to_string(), Arc::new(proxy) as Arc<dyn AgentProxy>);
}
/// Remove the proxy for `agent_id`, returning `true` if it existed.
pub fn remove(&self, agent_id: &str) -> bool {
self.proxies.remove(agent_id).is_some()
}
/// Look up the proxy for `agent_id`.
pub fn get(&self, agent_id: &str) -> Option<Arc<dyn AgentProxy>> {
self.proxies.get(agent_id).map(|r| Arc::clone(r.value()))
}
/// Return the IDs of all currently registered agents.
/// Used by integration tests and monitoring.
#[allow(dead_code)]
pub fn agent_ids(&self) -> Vec<String> {
self.proxies.iter().map(|r| r.key().clone()).collect()
}
}
impl Default for RemoteAgentProxyPool {
fn default() -> Self {
Self::new()
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn register_and_get_round_trip() {
let pool = RemoteAgentProxyPool::new();
// Pool starts empty.
assert!(pool.get("research").is_none());
assert!(pool.agent_ids().is_empty());
// Register an agent.
pool.register("research", 19001);
// Retrieve it and verify fields.
let proxy = pool.get("research").expect("proxy should exist");
assert_eq!(proxy.agent_id(), "research");
assert_eq!(proxy.agent_url(), "http://127.0.0.1:19001");
// agent_ids should contain the registered ID.
let ids = pool.agent_ids();
assert_eq!(ids.len(), 1);
assert!(ids.contains(&"research".to_string()));
// Remove it.
assert!(pool.remove("research"));
assert!(pool.get("research").is_none());
assert!(pool.agent_ids().is_empty());
// Removing again returns false.
assert!(!pool.remove("research"));
}
#[test]
fn register_multiple_agents() {
let pool = RemoteAgentProxyPool::new();
pool.register("agent-a", 19001);
pool.register("agent-b", 19002);
pool.register("agent-c", 19003);
assert_eq!(pool.agent_ids().len(), 3);
let b = pool.get("agent-b").unwrap();
assert_eq!(b.agent_url(), "http://127.0.0.1:19002");
pool.remove("agent-b");
assert!(pool.get("agent-b").is_none());
assert_eq!(pool.agent_ids().len(), 2);
}
#[test]
fn register_replaces_existing() {
let pool = RemoteAgentProxyPool::new();
pool.register("research", 19001);
assert_eq!(
pool.get("research").unwrap().agent_url(),
"http://127.0.0.1:19001"
);
// Re-register on a different port.
pool.register("research", 19099);
assert_eq!(
pool.get("research").unwrap().agent_url(),
"http://127.0.0.1:19099"
);
// Still only one entry.
assert_eq!(pool.agent_ids().len(), 1);
}
}