pub mod circuit_breaker;
pub use circuit_breaker::{A2ACircuitBreaker, CircuitState};
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Result;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use uuid::Uuid;
use crate::event_bus::{EventBus, KernelEvent};
use crate::types::{AgentId, AgentStatus};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum A2AMessage {
TaskDelegation {
task_id: Uuid,
description: String,
payload: serde_json::Value,
priority: TaskPriority,
},
StatusUpdate {
task_id: Uuid,
progress: u8,
message: String,
},
ResultSharing {
task_id: Uuid,
result: serde_json::Value,
summary: String,
},
CapabilityQuery {
query: String,
required_capabilities: Vec<String>,
},
Handshake {
agent_id: AgentId,
name: String,
capabilities: Vec<String>,
},
}
impl A2AMessage {
pub fn type_name(&self) -> &'static str {
match self {
A2AMessage::TaskDelegation { .. } => "task_delegation",
A2AMessage::StatusUpdate { .. } => "status_update",
A2AMessage::ResultSharing { .. } => "result_sharing",
A2AMessage::CapabilityQuery { .. } => "capability_query",
A2AMessage::Handshake { .. } => "handshake",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum TaskPriority {
Low,
#[default]
Normal,
High,
Critical,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskSpec {
pub task_id: Uuid,
pub description: String,
pub payload: serde_json::Value,
pub priority: TaskPriority,
pub deadline: Option<DateTime<Utc>>,
}
impl TaskSpec {
pub fn new(description: impl Into<String>, payload: serde_json::Value) -> Self {
Self {
task_id: Uuid::new_v4(),
description: description.into(),
payload,
priority: TaskPriority::default(),
deadline: None,
}
}
pub fn with_priority(mut self, priority: TaskPriority) -> Self {
self.priority = priority;
self
}
pub fn with_deadline(mut self, deadline: DateTime<Utc>) -> Self {
self.deadline = Some(deadline);
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct A2ARequest {
pub request_id: Uuid,
pub from: AgentId,
pub to: AgentId,
pub message: A2AMessage,
pub timestamp: DateTime<Utc>,
}
impl A2ARequest {
pub fn new(from: AgentId, to: AgentId, message: A2AMessage) -> Self {
Self {
request_id: Uuid::new_v4(),
from,
to,
message,
timestamp: Utc::now(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct A2AResponse {
pub response_id: Uuid,
pub request_id: Uuid,
pub from: AgentId,
pub to: AgentId,
pub accepted: bool,
pub payload: serde_json::Value,
pub timestamp: DateTime<Utc>,
}
impl A2AResponse {
pub fn success(
request_id: Uuid,
from: AgentId,
to: AgentId,
payload: serde_json::Value,
) -> Self {
Self {
response_id: Uuid::new_v4(),
request_id,
from,
to,
accepted: true,
payload,
timestamp: Utc::now(),
}
}
pub fn error(request_id: Uuid, from: AgentId, to: AgentId, error: impl Into<String>) -> Self {
Self {
response_id: Uuid::new_v4(),
request_id,
from,
to,
accepted: false,
payload: serde_json::json!({ "error": error.into() }),
timestamp: Utc::now(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PendingMessage {
pub request: A2ARequest,
pub queued_at: DateTime<Utc>,
}
impl PendingMessage {
fn new(request: A2ARequest) -> Self {
Self {
request,
queued_at: Utc::now(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentCard {
pub agent_id: AgentId,
pub name: String,
pub description: String,
pub capabilities: Vec<String>,
pub skills: Vec<String>,
pub endpoint: String,
pub status: AgentStatus,
}
impl AgentCard {
pub fn new(agent_id: AgentId, name: impl Into<String>, description: impl Into<String>) -> Self {
Self {
agent_id,
name: name.into(),
description: description.into(),
capabilities: Vec::new(),
skills: Vec::new(),
endpoint: "local".into(),
status: AgentStatus::Starting,
}
}
pub fn with_capability(mut self, capability: impl Into<String>) -> Self {
self.capabilities.push(capability.into());
self
}
pub fn with_skill(mut self, skill: impl Into<String>) -> Self {
self.skills.push(skill.into());
self
}
pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.endpoint = endpoint.into();
self
}
pub fn with_status(mut self, status: AgentStatus) -> Self {
self.status = status;
self
}
pub fn has_capability(&self, capability: &str) -> bool {
self.capabilities.iter().any(|c| c == capability)
}
pub fn has_skill(&self, skill: &str) -> bool {
self.skills.iter().any(|s| s == skill)
}
}
#[derive(Clone)]
pub struct AgentCardRegistry {
cards: Arc<RwLock<HashMap<AgentId, AgentCard>>>,
event_bus: EventBus,
}
impl AgentCardRegistry {
pub fn new(event_bus: EventBus) -> Self {
Self {
cards: Arc::new(RwLock::new(HashMap::new())),
event_bus,
}
}
pub async fn register_agent(&self, card: AgentCard) -> Result<()> {
let agent_id = card.agent_id;
let mut cards = self.cards.write().await;
cards.insert(agent_id, card.clone());
drop(cards);
self.event_bus.publish(KernelEvent::AgentCreated {
id: agent_id,
name: card.name.clone(),
})?;
tracing::info!(agent_id = %agent_id, name = %card.name, "Agent registered in A2A registry");
Ok(())
}
pub async fn unregister_agent(&self, agent_id: AgentId) -> Result<()> {
let mut cards = self.cards.write().await;
if let Some(card) = cards.remove(&agent_id) {
tracing::info!(agent_id = %agent_id, name = %card.name, "Agent unregistered from A2A registry");
drop(cards);
self.event_bus
.publish(KernelEvent::AgentStopped { id: agent_id })?;
}
Ok(())
}
pub async fn find_agents_by_capability(&self, capability: &str) -> Result<Vec<AgentCard>> {
let cards = self.cards.read().await;
let matches: Vec<AgentCard> = cards
.values()
.filter(|card| card.has_capability(capability))
.cloned()
.collect();
Ok(matches)
}
pub async fn find_agents_by_skill(&self, skill: &str) -> Result<Vec<AgentCard>> {
let cards = self.cards.read().await;
let matches: Vec<AgentCard> = cards
.values()
.filter(|card| card.has_skill(skill))
.cloned()
.collect();
Ok(matches)
}
pub async fn get_agent(&self, agent_id: AgentId) -> Option<AgentCard> {
let cards = self.cards.read().await;
cards.get(&agent_id).cloned()
}
pub async fn list_agents(&self) -> Vec<AgentCard> {
let cards = self.cards.read().await;
cards.values().cloned().collect()
}
pub async fn agent_count(&self) -> usize {
let cards = self.cards.read().await;
cards.len()
}
pub async fn update_status(&self, agent_id: AgentId, status: AgentStatus) -> Result<()> {
let mut cards = self.cards.write().await;
if let Some(card) = cards.get_mut(&agent_id) {
card.status = status;
}
Ok(())
}
}
impl std::fmt::Debug for AgentCardRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AgentCardRegistry").finish()
}
}
struct AgentQueue {
messages: parking_lot::Mutex<Vec<PendingMessage>>,
notify: tokio::sync::Notify,
}
impl AgentQueue {
fn new() -> Self {
Self {
messages: parking_lot::Mutex::new(Vec::new()),
notify: tokio::sync::Notify::new(),
}
}
}
pub type DelegationHandler = Arc<
dyn Fn(
AgentId,
AgentId,
TaskSpec,
)
-> std::pin::Pin<Box<dyn std::future::Future<Output = Result<serde_json::Value>> + Send>>
+ Send
+ Sync,
>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct A2AMessageLogEntry {
pub from: AgentId,
pub to: AgentId,
pub message_type: String,
pub timestamp: DateTime<Utc>,
pub content: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopologyNode {
pub id: String,
pub label: String,
pub status: String,
pub capabilities: Vec<String>,
pub skills: Vec<String>,
pub last_seen: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopologyEdge {
pub from: String,
pub to: String,
pub message_count_5m: u32,
pub last_kind: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopologyResponse {
pub nodes: Vec<TopologyNode>,
pub edges: Vec<TopologyEdge>,
}
#[derive(Clone)]
pub struct A2AProtocol {
registry: AgentCardRegistry,
queues: Arc<RwLock<HashMap<AgentId, Arc<AgentQueue>>>>,
event_bus: EventBus,
delegation_handler: Arc<RwLock<Option<DelegationHandler>>>,
message_log: Arc<parking_lot::RwLock<Vec<A2AMessageLogEntry>>>,
}
impl A2AProtocol {
pub const MAX_LOG_ENTRIES: usize = 10_000;
pub fn new(event_bus: EventBus) -> Self {
let registry = AgentCardRegistry::new(event_bus.clone());
Self {
registry,
queues: Arc::new(RwLock::new(HashMap::new())),
event_bus,
delegation_handler: Arc::new(RwLock::new(None)),
message_log: Arc::new(parking_lot::RwLock::new(Vec::with_capacity(256))),
}
}
pub async fn set_delegation_handler(&self, handler: DelegationHandler) {
let mut h = self.delegation_handler.write().await;
*h = Some(handler);
}
fn append_log(&self, entry: A2AMessageLogEntry) {
let mut log = self.message_log.write();
log.push(entry);
if log.len() > Self::MAX_LOG_ENTRIES {
let excess = log.len() - Self::MAX_LOG_ENTRIES;
log.drain(..excess);
}
}
pub fn get_message_log(&self, limit: Option<usize>) -> Vec<A2AMessageLogEntry> {
let log = self.message_log.read();
match limit {
Some(n) => log
.iter()
.rev()
.take(n)
.cloned()
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect(),
None => log.clone(),
}
}
pub fn recent_messages(&self, secs: u64) -> Vec<A2AMessageLogEntry> {
let now = Utc::now();
let cutoff = now - chrono::Duration::seconds(secs as i64);
let log = self.message_log.read();
log.iter()
.filter(|entry| entry.timestamp >= cutoff)
.cloned()
.collect()
}
async fn get_or_create_queue(&self, agent_id: AgentId) -> Arc<AgentQueue> {
let mut queues = self.queues.write().await;
queues
.entry(agent_id)
.or_insert_with(|| Arc::new(AgentQueue::new()))
.clone()
}
pub fn registry(&self) -> &AgentCardRegistry {
&self.registry
}
pub async fn execute_delegation(
&self,
from: AgentId,
to: AgentId,
task: TaskSpec,
) -> Option<Result<serde_json::Value>> {
let handler = self.delegation_handler.read().await;
let handler_ref = handler.as_ref()?;
let _ = self.event_bus.publish(KernelEvent::MessageReceived {
from,
content: format!("[task_delegation] {:?}", task.task_id),
});
self.append_log(A2AMessageLogEntry {
from,
to,
message_type: "task_delegation".to_string(),
timestamp: Utc::now(),
content: task.description.clone(),
});
tracing::info!(
from = %from,
to = %to,
task_id = %task.task_id,
"A2A execute_delegation: starting"
);
let result = handler_ref(from, to, task).await;
tracing::info!(
from = %from,
to = %to,
success = result.is_ok(),
"A2A execute_delegation: completed"
);
Some(result)
}
pub async fn send_message(
&self,
from: AgentId,
to: AgentId,
message: A2AMessage,
) -> Result<Uuid> {
let msg_type = message.type_name();
let request = A2ARequest::new(from, to, message.clone());
let request_id = request.request_id;
let content_summary = match &request.message {
A2AMessage::TaskDelegation { description, .. } => description.clone(),
A2AMessage::StatusUpdate { message, .. } => message.clone(),
A2AMessage::ResultSharing { summary, .. } => summary.clone(),
A2AMessage::CapabilityQuery { query, .. } => query.clone(),
A2AMessage::Handshake { name, .. } => format!("handshake from {name}"),
};
self.append_log(A2AMessageLogEntry {
from,
to,
message_type: msg_type.to_string(),
timestamp: Utc::now(),
content: content_summary,
});
let queue = self.get_or_create_queue(to).await;
queue
.messages
.lock()
.push(PendingMessage::new(request.clone()));
queue.notify.notify_one();
self.event_bus.publish(KernelEvent::MessageReceived {
from,
content: format!("[{msg_type}] {request_id:?}"),
})?;
tracing::debug!(
from = %from,
to = %to,
request_id = %request_id,
msg_type,
"A2A message sent"
);
Ok(request_id)
}
pub async fn delegate_task(&self, from: AgentId, to: AgentId, task: TaskSpec) -> Result<Uuid> {
let message = A2AMessage::TaskDelegation {
task_id: task.task_id,
description: task.description.clone(),
payload: task.payload.clone(),
priority: task.priority,
};
self.send_message(from, to, message).await
}
pub async fn send_status_update(
&self,
from: AgentId,
to: AgentId,
task_id: Uuid,
progress: u8,
message: String,
) -> Result<Uuid> {
let message = A2AMessage::StatusUpdate {
task_id,
progress,
message,
};
self.send_message(from, to, message).await
}
pub async fn share_result(
&self,
from: AgentId,
to: AgentId,
task_id: Uuid,
result: serde_json::Value,
summary: String,
) -> Result<Uuid> {
let message = A2AMessage::ResultSharing {
task_id,
result,
summary,
};
self.send_message(from, to, message).await
}
pub async fn query_capabilities(&self, capability: &str) -> Result<Vec<AgentCard>> {
self.registry.find_agents_by_capability(capability).await
}
pub async fn send_handshake(&self, from: AgentId, to: AgentId) -> Result<Uuid> {
let card = self.registry.get_agent(from).await;
let (name, capabilities) = if let Some(card) = card {
(card.name, card.capabilities.clone())
} else {
("unknown".into(), Vec::new())
};
let message = A2AMessage::Handshake {
agent_id: from,
name,
capabilities,
};
self.send_message(from, to, message).await
}
pub async fn receive_messages(&self, agent_id: AgentId) -> Vec<A2ARequest> {
let queues = self.queues.read().await;
if let Some(queue) = queues.get(&agent_id) {
let drained: Vec<PendingMessage> = queue.messages.lock().drain(..).collect();
drained.into_iter().map(|m| m.request).collect()
} else {
Vec::new()
}
}
pub async fn pending_count(&self, agent_id: AgentId) -> usize {
let queues = self.queues.read().await;
queues
.get(&agent_id)
.map(|q| q.messages.lock().len())
.unwrap_or(0)
}
pub async fn has_messages(&self, agent_id: AgentId) -> bool {
self.pending_count(agent_id).await > 0
}
pub async fn deliver_pending_messages(&self, agent_id: AgentId) -> Result<Vec<A2ARequest>> {
Ok(self.receive_messages(agent_id).await)
}
pub async fn send_and_wait(
&self,
from: AgentId,
to: AgentId,
message: A2AMessage,
timeout: std::time::Duration,
) -> Result<A2AResponse> {
let wait_task_id = match &message {
A2AMessage::TaskDelegation { task_id, .. } => Some(*task_id),
_ => None,
};
let request_id = self.send_message(from, to, message).await?;
let queue = self.get_or_create_queue(from).await;
let deadline = tokio::time::Instant::now() + timeout;
loop {
{
let mut msgs = queue.messages.lock();
let match_idx = msgs.iter().position(|p| {
match (&p.request.message, wait_task_id) {
(A2AMessage::ResultSharing { task_id, .. }, Some(wait_id)) => {
*task_id == wait_id
}
(A2AMessage::ResultSharing { result, .. }, None) => {
result.get("request_id").and_then(|v| v.as_str())
== Some(&request_id.to_string())
}
_ => false,
}
});
if let Some(idx) = match_idx {
let matched = msgs.remove(idx);
if let A2AMessage::ResultSharing { result, .. } = matched.request.message {
return Ok(A2AResponse::success(request_id, to, from, result));
}
}
}
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
anyhow::bail!("A2A response timeout after {timeout:?}");
}
tokio::select! {
_ = queue.notify.notified() => {
}
_ = tokio::time::sleep(remaining) => {
anyhow::bail!("A2A response timeout after {timeout:?}");
}
}
}
}
}
impl std::fmt::Debug for A2AProtocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("A2AProtocol")
.field("registry", &self.registry)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_event_bus() -> EventBus {
EventBus::new(256)
}
fn create_test_agent_id() -> AgentId {
Uuid::new_v4()
}
#[tokio::test]
async fn test_agent_card_creation() {
let agent_id = create_test_agent_id();
let card = AgentCard::new(agent_id, "test-agent", "A test agent")
.with_capability("code-review")
.with_capability("lint")
.with_skill("rust")
.with_endpoint("local");
assert_eq!(card.agent_id, agent_id);
assert_eq!(card.name, "test-agent");
assert!(card.has_capability("code-review"));
assert!(card.has_capability("lint"));
assert!(!card.has_capability("refactor"));
assert!(card.has_skill("rust"));
assert!(!card.has_skill("python"));
}
#[tokio::test]
async fn test_registry_register_unregister() {
let bus = create_test_event_bus();
let registry = AgentCardRegistry::new(bus);
let agent_id = create_test_agent_id();
let card = AgentCard::new(agent_id, "register-test", "Test agent").with_capability("test");
registry.register_agent(card.clone()).await.unwrap();
assert_eq!(registry.agent_count().await, 1);
let found = registry.get_agent(agent_id).await;
assert!(found.is_some());
assert_eq!(found.unwrap().name, "register-test");
registry.unregister_agent(agent_id).await.unwrap();
assert_eq!(registry.agent_count().await, 0);
let found = registry.get_agent(agent_id).await;
assert!(found.is_none());
}
#[tokio::test]
async fn test_registry_find_by_capability() {
let bus = create_test_event_bus();
let registry = AgentCardRegistry::new(bus);
let id1 = Uuid::new_v4();
let id2 = Uuid::new_v4();
registry
.register_agent(
AgentCard::new(id1, "agent-1", "First agent").with_capability("code-review"),
)
.await
.unwrap();
registry
.register_agent(
AgentCard::new(id2, "agent-2", "Second agent")
.with_capability("code-review")
.with_capability("refactor"),
)
.await
.unwrap();
let reviewers = registry
.find_agents_by_capability("code-review")
.await
.unwrap();
assert_eq!(reviewers.len(), 2);
}
#[tokio::test]
async fn test_a2a_protocol_send_receive() {
let bus = create_test_event_bus();
let a2a = A2AProtocol::new(bus);
let from = create_test_agent_id();
let to = create_test_agent_id();
let message = A2AMessage::Handshake {
agent_id: from,
name: "sender".into(),
capabilities: vec!["test".into()],
};
a2a.send_message(from, to, message).await.unwrap();
assert_eq!(a2a.pending_count(to).await, 1);
let messages = a2a.receive_messages(to).await;
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].from, from);
assert_eq!(messages[0].to, to);
assert_eq!(a2a.pending_count(to).await, 0);
}
#[tokio::test]
async fn test_delegate_task() {
let bus = create_test_event_bus();
let a2a = A2AProtocol::new(bus);
let from = create_test_agent_id();
let to = create_test_agent_id();
let task = TaskSpec::new("Review PR", serde_json::json!({ "pr": 42 }));
let request_id = a2a.delegate_task(from, to, task).await.unwrap();
assert!(request_id != Uuid::nil());
let messages = a2a.receive_messages(to).await;
assert_eq!(messages.len(), 1);
}
#[test]
fn test_recent_messages_filters_by_window() {
let bus = create_test_event_bus();
let a2a = A2AProtocol::new(bus);
let recent_ts = Utc::now();
a2a.append_log(A2AMessageLogEntry {
from: Uuid::new_v4(),
to: Uuid::new_v4(),
message_type: "task_delegation".into(),
timestamp: recent_ts,
content: "recent".into(),
});
let old_ts = Utc::now() - chrono::Duration::seconds(600);
a2a.append_log(A2AMessageLogEntry {
from: Uuid::new_v4(),
to: Uuid::new_v4(),
message_type: "handshake".into(),
timestamp: old_ts,
content: "old".into(),
});
let window = a2a.recent_messages(300);
assert_eq!(window.len(), 1);
assert_eq!(window[0].content, "recent");
assert_eq!(window[0].message_type, "task_delegation");
let wider = a2a.recent_messages(900);
assert_eq!(wider.len(), 2);
let narrow = a2a.recent_messages(1);
assert_eq!(narrow.len(), 1);
assert_eq!(narrow[0].content, "recent");
}
#[tokio::test]
async fn test_recent_messages_aggregates_fan_in_fan_out() {
let bus = create_test_event_bus();
let a2a = A2AProtocol::new(bus);
let orch = Uuid::new_v4();
let worker_a = Uuid::new_v4();
let worker_b = Uuid::new_v4();
for (id, name) in [
(orch, "orchestrator"),
(worker_a, "worker-a"),
(worker_b, "worker-b"),
] {
a2a.registry
.register_agent(AgentCard::new(id, name, "test").with_status(AgentStatus::Running))
.await
.unwrap();
}
for _ in 0..2 {
a2a.append_log(A2AMessageLogEntry {
from: orch,
to: worker_a,
message_type: "task_delegation".into(),
timestamp: Utc::now(),
content: "do work".into(),
});
}
a2a.append_log(A2AMessageLogEntry {
from: orch,
to: worker_b,
message_type: "task_delegation".into(),
timestamp: Utc::now(),
content: "do work b".into(),
});
a2a.append_log(A2AMessageLogEntry {
from: worker_b,
to: orch,
message_type: "status_update".into(),
timestamp: Utc::now(),
content: "50%".into(),
});
a2a.append_log(A2AMessageLogEntry {
from: worker_a,
to: orch,
message_type: "result_sharing".into(),
timestamp: Utc::now(),
content: "done".into(),
});
let entries = a2a.recent_messages(300);
let mut aggregates: HashMap<(AgentId, AgentId), (u32, String)> = HashMap::new();
for entry in &entries {
let agg = aggregates
.entry((entry.from, entry.to))
.or_insert((0, String::new()));
agg.0 = agg.0.saturating_add(1);
agg.1 = entry.message_type.clone();
}
let e1 = aggregates.get(&(orch, worker_a)).expect("edge 1 missing");
assert_eq!(e1.0, 2, "orch->worker_a count");
assert_eq!(e1.1, "task_delegation", "orch->worker_a last_kind");
let e2 = aggregates.get(&(orch, worker_b)).expect("edge 2 missing");
assert_eq!(e2.0, 1, "orch->worker_b count");
assert_eq!(e2.1, "task_delegation", "orch->worker_b last_kind");
let e3 = aggregates.get(&(worker_b, orch)).expect("edge 3 missing");
assert_eq!(e3.0, 1, "worker_b->orch count");
assert_eq!(e3.1, "status_update", "worker_b->orch last_kind");
let e4 = aggregates.get(&(worker_a, orch)).expect("edge 4 missing");
assert_eq!(e4.0, 1, "worker_a->orch count");
assert_eq!(e4.1, "result_sharing", "worker_a->orch last_kind");
assert_eq!(aggregates.len(), 4);
}
}