Skip to main content

codetether_agent/bus/
relay.rs

1//! Protocol-first relay runtime for multi-agent handoff flows.
2//!
3//! This layer keeps orchestration transport-agnostic while using the local
4//! `AgentBus` as the default protocol transport. It provides:
5//! - Agent lifecycle registration/deregistration
6//! - Message handoff publication with correlation ids
7//! - Relay topic emission for observability (`relay.{relay_id}`)
8
9use super::{AgentBus, BusMessage};
10use crate::a2a::types::Part;
11use std::sync::Arc;
12use uuid::Uuid;
13
14/// Profile for a relay-participating agent.
15#[derive(Debug, Clone)]
16pub struct RelayAgentProfile {
17    pub name: String,
18    pub capabilities: Vec<String>,
19}
20
21/// Protocol runtime used by auto-chat and other relay-style orchestrators.
22#[derive(Clone)]
23pub struct ProtocolRelayRuntime {
24    bus: Arc<AgentBus>,
25    relay_id: String,
26}
27
28impl ProtocolRelayRuntime {
29    /// Build a relay runtime with an auto-generated relay id.
30    pub fn new(bus: Arc<AgentBus>) -> Self {
31        Self {
32            bus,
33            relay_id: format!("relay-{}", &Uuid::new_v4().to_string()[..8]),
34        }
35    }
36
37    /// Build a relay runtime with an explicit id.
38    pub fn with_relay_id(bus: Arc<AgentBus>, relay_id: impl Into<String>) -> Self {
39        Self {
40            bus,
41            relay_id: relay_id.into(),
42        }
43    }
44
45    /// The relay id used for correlation and topic publication.
46    pub fn relay_id(&self) -> &str {
47        &self.relay_id
48    }
49
50    /// Register relay agents on the protocol bus.
51    pub fn register_agents(&self, agents: &[RelayAgentProfile]) {
52        for agent in agents {
53            let mut capabilities = agent.capabilities.clone();
54            if !capabilities.iter().any(|c| c == "relay") {
55                capabilities.push("relay".to_string());
56            }
57            capabilities.push(format!("relay:{}", self.relay_id));
58
59            let handle = self.bus.handle(agent.name.clone());
60            handle.announce_ready(capabilities);
61        }
62    }
63
64    /// Deregister relay agents from the protocol bus.
65    pub fn shutdown_agents(&self, agent_ids: &[String]) {
66        for agent_id in agent_ids {
67            let handle = self.bus.handle(agent_id.clone());
68            handle.announce_shutdown();
69        }
70    }
71
72    /// Send one protocol handoff from `from` to `to`.
73    ///
74    /// Publishes to both `agent.{to}` and `relay.{relay_id}` so downstream
75    /// observers (TUI bus log, metrics) can trace the conversation flow.
76    pub fn send_handoff(&self, from: &str, to: &str, text: &str) -> String {
77        let payload = BusMessage::AgentMessage {
78            from: from.to_string(),
79            to: to.to_string(),
80            parts: vec![Part::Text {
81                text: text.to_string(),
82            }],
83        };
84
85        let correlation = format!("{}:{}", self.relay_id, Uuid::new_v4());
86        let handle = self.bus.handle(from.to_string());
87
88        handle.send_with_correlation(
89            format!("agent.{to}"),
90            payload.clone(),
91            Some(correlation.clone()),
92        );
93        handle.send_with_correlation(
94            format!("relay.{}", self.relay_id),
95            payload,
96            Some(correlation.clone()),
97        );
98        correlation
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use super::*;
105    use tokio::time::{Duration, timeout};
106
107    #[tokio::test]
108    async fn test_register_and_shutdown_agents() {
109        let bus = AgentBus::new().into_arc();
110        let relay = ProtocolRelayRuntime::with_relay_id(bus.clone(), "relay-test");
111
112        let agents = vec![
113            RelayAgentProfile {
114                name: "auto-planner".to_string(),
115                capabilities: vec!["planning".to_string()],
116            },
117            RelayAgentProfile {
118                name: "auto-coder".to_string(),
119                capabilities: vec!["coding".to_string()],
120            },
121        ];
122
123        relay.register_agents(&agents);
124
125        assert!(bus.registry.get("auto-planner").is_some());
126        assert!(bus.registry.get("auto-coder").is_some());
127
128        relay.shutdown_agents(&["auto-planner".to_string(), "auto-coder".to_string()]);
129
130        assert!(bus.registry.get("auto-planner").is_none());
131        assert!(bus.registry.get("auto-coder").is_none());
132    }
133
134    #[tokio::test]
135    async fn test_send_handoff_emits_agent_and_relay_topics() {
136        let bus = AgentBus::new().into_arc();
137        let relay = ProtocolRelayRuntime::with_relay_id(bus.clone(), "relay-test");
138        let mut observer = bus.handle("observer");
139
140        let correlation = relay.send_handoff("auto-planner", "auto-coder", "handoff payload");
141        assert!(correlation.starts_with("relay-test:"));
142
143        let first = timeout(Duration::from_millis(200), observer.recv())
144            .await
145            .expect("first envelope timeout")
146            .expect("first envelope missing");
147        let second = timeout(Duration::from_millis(200), observer.recv())
148            .await
149            .expect("second envelope timeout")
150            .expect("second envelope missing");
151
152        let topics = [first.topic, second.topic];
153        assert!(topics.iter().any(|t| t == "agent.auto-coder"));
154        assert!(topics.iter().any(|t| t == "relay.relay-test"));
155    }
156}