codetether_agent/bus/
relay.rs1use super::{AgentBus, BusMessage};
10use crate::a2a::types::Part;
11use std::sync::Arc;
12use uuid::Uuid;
13
14#[derive(Debug, Clone)]
16pub struct RelayAgentProfile {
17 pub name: String,
18 pub capabilities: Vec<String>,
19}
20
21#[derive(Clone)]
23pub struct ProtocolRelayRuntime {
24 bus: Arc<AgentBus>,
25 relay_id: String,
26}
27
28impl ProtocolRelayRuntime {
29 pub fn new(bus: Arc<AgentBus>) -> Self {
31 Self {
32 bus,
33 relay_id: format!("relay-{}", &Uuid::new_v4().to_string()[..8]),
34 }
35 }
36
37 pub fn with_relay_id(bus: Arc<AgentBus>, relay_id: impl Into<String>) -> Self {
39 Self {
40 bus,
41 relay_id: relay_id.into(),
42 }
43 }
44
45 pub fn relay_id(&self) -> &str {
47 &self.relay_id
48 }
49
50 pub fn register_agents(&self, agents: &[RelayAgentProfile]) {
52 for agent in agents {
53 let mut capabilities = agent.capabilities.clone();
54 if !capabilities.iter().any(|c| c == "relay") {
55 capabilities.push("relay".to_string());
56 }
57 capabilities.push(format!("relay:{}", self.relay_id));
58
59 let handle = self.bus.handle(agent.name.clone());
60 handle.announce_ready(capabilities);
61 }
62 }
63
64 pub fn shutdown_agents(&self, agent_ids: &[String]) {
66 for agent_id in agent_ids {
67 let handle = self.bus.handle(agent_id.clone());
68 handle.announce_shutdown();
69 }
70 }
71
72 pub fn send_handoff(&self, from: &str, to: &str, text: &str) -> String {
77 let payload = BusMessage::AgentMessage {
78 from: from.to_string(),
79 to: to.to_string(),
80 parts: vec![Part::Text {
81 text: text.to_string(),
82 }],
83 };
84
85 let correlation = format!("{}:{}", self.relay_id, Uuid::new_v4());
86 let handle = self.bus.handle(from.to_string());
87
88 handle.send_with_correlation(
89 format!("agent.{to}"),
90 payload.clone(),
91 Some(correlation.clone()),
92 );
93 handle.send_with_correlation(
94 format!("relay.{}", self.relay_id),
95 payload,
96 Some(correlation.clone()),
97 );
98 correlation
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use super::*;
105 use tokio::time::{Duration, timeout};
106
107 #[tokio::test]
108 async fn test_register_and_shutdown_agents() {
109 let bus = AgentBus::new().into_arc();
110 let relay = ProtocolRelayRuntime::with_relay_id(bus.clone(), "relay-test");
111
112 let agents = vec![
113 RelayAgentProfile {
114 name: "auto-planner".to_string(),
115 capabilities: vec!["planning".to_string()],
116 },
117 RelayAgentProfile {
118 name: "auto-coder".to_string(),
119 capabilities: vec!["coding".to_string()],
120 },
121 ];
122
123 relay.register_agents(&agents);
124
125 assert!(bus.registry.get("auto-planner").is_some());
126 assert!(bus.registry.get("auto-coder").is_some());
127
128 relay.shutdown_agents(&["auto-planner".to_string(), "auto-coder".to_string()]);
129
130 assert!(bus.registry.get("auto-planner").is_none());
131 assert!(bus.registry.get("auto-coder").is_none());
132 }
133
134 #[tokio::test]
135 async fn test_send_handoff_emits_agent_and_relay_topics() {
136 let bus = AgentBus::new().into_arc();
137 let relay = ProtocolRelayRuntime::with_relay_id(bus.clone(), "relay-test");
138 let mut observer = bus.handle("observer");
139
140 let correlation = relay.send_handoff("auto-planner", "auto-coder", "handoff payload");
141 assert!(correlation.starts_with("relay-test:"));
142
143 let first = timeout(Duration::from_millis(200), observer.recv())
144 .await
145 .expect("first envelope timeout")
146 .expect("first envelope missing");
147 let second = timeout(Duration::from_millis(200), observer.recv())
148 .await
149 .expect("second envelope timeout")
150 .expect("second envelope missing");
151
152 let topics = [first.topic, second.topic];
153 assert!(topics.iter().any(|t| t == "agent.auto-coder"));
154 assert!(topics.iter().any(|t| t == "relay.relay-test"));
155 }
156}