Skip to main content

oxios_kernel/a2a/
mod.rs

1//! A2A (Agent-to-Agent) protocol for horizontal agent communication.
2//!
3//! A2A is Google's protocol for horizontal agent↔agent communication.
4//! Unlike MCP which is vertical (agent→tool), A2A enables agents to
5//! discover each other, delegate tasks, and share results.
6
7pub mod circuit_breaker;
8
9pub use circuit_breaker::{A2ACircuitBreaker, CircuitState};
10
11use std::collections::HashMap;
12use std::sync::Arc;
13
14use anyhow::Result;
15use chrono::{DateTime, Utc};
16use serde::{Deserialize, Serialize};
17use tokio::sync::RwLock;
18use uuid::Uuid;
19
20use crate::event_bus::{EventBus, KernelEvent};
21use crate::types::{AgentId, AgentStatus};
22
23/// A2A Message types for inter-agent communication.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25#[serde(tag = "type", rename_all = "snake_case")]
26pub enum A2AMessage {
27    /// Task delegation: "Here, do X"
28    TaskDelegation {
29        /// Unique task identifier.
30        task_id: Uuid,
31        /// Human-readable description of the task.
32        description: String,
33        /// Structured task payload.
34        payload: serde_json::Value,
35        /// Priority level.
36        priority: TaskPriority,
37    },
38    /// Status update: "I'm working on X, status: Y%"
39    StatusUpdate {
40        /// Associated task identifier.
41        task_id: Uuid,
42        /// Progress percentage (0-100).
43        progress: u8,
44        /// Status message.
45        message: String,
46    },
47    /// Result sharing: "Here's the result of X"
48    ResultSharing {
49        /// Associated task identifier.
50        task_id: Uuid,
51        /// Result data.
52        result: serde_json::Value,
53        /// Human-readable summary.
54        summary: String,
55    },
56    /// Capability query: "Who can do X?"
57    CapabilityQuery {
58        /// Query description.
59        query: String,
60        /// Required capabilities.
61        required_capabilities: Vec<String>,
62    },
63    /// Handshake: "Hello, I can do Y"
64    Handshake {
65        /// Agent identifier.
66        agent_id: AgentId,
67        /// Agent name.
68        name: String,
69        /// Agent capabilities.
70        capabilities: Vec<String>,
71    },
72}
73
74impl A2AMessage {
75    /// Returns the message type name for logging/debugging.
76    pub fn type_name(&self) -> &'static str {
77        match self {
78            A2AMessage::TaskDelegation { .. } => "task_delegation",
79            A2AMessage::StatusUpdate { .. } => "status_update",
80            A2AMessage::ResultSharing { .. } => "result_sharing",
81            A2AMessage::CapabilityQuery { .. } => "capability_query",
82            A2AMessage::Handshake { .. } => "handshake",
83        }
84    }
85}
86
87/// Priority level for delegated tasks.
88#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
89pub enum TaskPriority {
90    /// Low priority, best-effort.
91    Low,
92    /// Normal priority.
93    #[default]
94    Normal,
95    /// High priority, should be handled soon.
96    High,
97    /// Critical, immediate attention required.
98    Critical,
99}
100
101/// Specification for a delegated task.
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct TaskSpec {
104    /// Unique task identifier.
105    pub task_id: Uuid,
106    /// Human-readable description of the task.
107    pub description: String,
108    /// Structured task payload.
109    pub payload: serde_json::Value,
110    /// Priority level.
111    pub priority: TaskPriority,
112    /// Deadline for task completion, if any.
113    pub deadline: Option<DateTime<Utc>>,
114}
115
116impl TaskSpec {
117    /// Creates a new task specification.
118    pub fn new(description: impl Into<String>, payload: serde_json::Value) -> Self {
119        Self {
120            task_id: Uuid::new_v4(),
121            description: description.into(),
122            payload,
123            priority: TaskPriority::default(),
124            deadline: None,
125        }
126    }
127
128    /// Sets the priority.
129    pub fn with_priority(mut self, priority: TaskPriority) -> Self {
130        self.priority = priority;
131        self
132    }
133
134    /// Sets the deadline.
135    pub fn with_deadline(mut self, deadline: DateTime<Utc>) -> Self {
136        self.deadline = Some(deadline);
137        self
138    }
139}
140
141/// A request sent by one agent to another via A2A.
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct A2ARequest {
144    /// Unique request identifier.
145    pub request_id: Uuid,
146    /// Sending agent's ID.
147    pub from: AgentId,
148    /// Receiving agent's ID.
149    pub to: AgentId,
150    /// The message being sent.
151    pub message: A2AMessage,
152    /// Timestamp when the request was created.
153    pub timestamp: DateTime<Utc>,
154}
155
156impl A2ARequest {
157    /// Creates a new A2A request.
158    pub fn new(from: AgentId, to: AgentId, message: A2AMessage) -> Self {
159        Self {
160            request_id: Uuid::new_v4(),
161            from,
162            to,
163            message,
164            timestamp: Utc::now(),
165        }
166    }
167}
168
169/// A response from a target agent.
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct A2AResponse {
172    /// Unique response identifier.
173    pub response_id: Uuid,
174    /// ID of the request this responds to.
175    pub request_id: Uuid,
176    /// Responding agent's ID.
177    pub from: AgentId,
178    /// Original requesting agent's ID.
179    pub to: AgentId,
180    /// Whether the request was accepted.
181    pub accepted: bool,
182    /// Response payload (result, error, etc.).
183    pub payload: serde_json::Value,
184    /// Timestamp when the response was created.
185    pub timestamp: DateTime<Utc>,
186}
187
188impl A2AResponse {
189    /// Creates a success response.
190    pub fn success(
191        request_id: Uuid,
192        from: AgentId,
193        to: AgentId,
194        payload: serde_json::Value,
195    ) -> Self {
196        Self {
197            response_id: Uuid::new_v4(),
198            request_id,
199            from,
200            to,
201            accepted: true,
202            payload,
203            timestamp: Utc::now(),
204        }
205    }
206
207    /// Creates an error response.
208    pub fn error(request_id: Uuid, from: AgentId, to: AgentId, error: impl Into<String>) -> Self {
209        Self {
210            response_id: Uuid::new_v4(),
211            request_id,
212            from,
213            to,
214            accepted: false,
215            payload: serde_json::json!({ "error": error.into() }),
216            timestamp: Utc::now(),
217        }
218    }
219}
220
221/// A pending message waiting for an agent to receive it.
222#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct PendingMessage {
224    /// The request that created this message.
225    pub request: A2ARequest,
226    /// Timestamp when the message was queued.
227    pub queued_at: DateTime<Utc>,
228}
229
230impl PendingMessage {
231    fn new(request: A2ARequest) -> Self {
232        Self {
233            request,
234            queued_at: Utc::now(),
235        }
236    }
237}
238
239/// A card describing an agent's capabilities for discovery.
240///
241/// Each agent publishes an AgentCard to the registry, making its
242/// capabilities discoverable by other agents via A2A.
243#[derive(Debug, Clone, Serialize, Deserialize)]
244pub struct AgentCard {
245    /// Unique identifier for this agent.
246    pub agent_id: AgentId,
247    /// Human-readable name of the agent.
248    pub name: String,
249    /// Description of what the agent does.
250    pub description: String,
251    /// List of capabilities (e.g., ["code-review", "refactor"]).
252    pub capabilities: Vec<String>,
253    /// List of skills (e.g., ["rust", "python"]).
254    pub skills: Vec<String>,
255    /// Endpoint for communication (e.g., "local", "remote://...").
256    pub endpoint: String,
257    /// Current status of the agent.
258    pub status: AgentStatus,
259}
260
261impl AgentCard {
262    /// Creates a new agent card.
263    pub fn new(agent_id: AgentId, name: impl Into<String>, description: impl Into<String>) -> Self {
264        Self {
265            agent_id,
266            name: name.into(),
267            description: description.into(),
268            capabilities: Vec::new(),
269            skills: Vec::new(),
270            endpoint: "local".into(),
271            status: AgentStatus::Starting,
272        }
273    }
274
275    /// Adds a capability.
276    pub fn with_capability(mut self, capability: impl Into<String>) -> Self {
277        self.capabilities.push(capability.into());
278        self
279    }
280
281    /// Adds a skill.
282    pub fn with_skill(mut self, skill: impl Into<String>) -> Self {
283        self.skills.push(skill.into());
284        self
285    }
286
287    /// Sets the endpoint.
288    pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
289        self.endpoint = endpoint.into();
290        self
291    }
292
293    /// Sets the initial status.
294    pub fn with_status(mut self, status: AgentStatus) -> Self {
295        self.status = status;
296        self
297    }
298
299    /// Returns true if this agent has the given capability.
300    pub fn has_capability(&self, capability: &str) -> bool {
301        self.capabilities.iter().any(|c| c == capability)
302    }
303
304    /// Returns true if this agent has the given skill.
305    pub fn has_skill(&self, skill: &str) -> bool {
306        self.skills.iter().any(|s| s == skill)
307    }
308}
309
310/// Global registry of available agents and their capability cards.
311///
312/// The registry enables agents to discover each other by capability,
313/// supporting the A2A "handshake" pattern where agents query "who can do X?".
314#[derive(Clone)]
315pub struct AgentCardRegistry {
316    /// Map of agent ID to their card.
317    cards: Arc<RwLock<HashMap<AgentId, AgentCard>>>,
318    /// Event bus for publishing registry changes.
319    event_bus: EventBus,
320}
321
322impl AgentCardRegistry {
323    /// Creates a new empty registry.
324    pub fn new(event_bus: EventBus) -> Self {
325        Self {
326            cards: Arc::new(RwLock::new(HashMap::new())),
327            event_bus,
328        }
329    }
330
331    /// Registers an agent's card in the registry.
332    pub async fn register_agent(&self, card: AgentCard) -> Result<()> {
333        let agent_id = card.agent_id;
334        let mut cards = self.cards.write().await;
335        cards.insert(agent_id, card.clone());
336        drop(cards);
337
338        self.event_bus.publish(KernelEvent::AgentCreated {
339            id: agent_id,
340            name: card.name.clone(),
341        })?;
342
343        tracing::info!(agent_id = %agent_id, name = %card.name, "Agent registered in A2A registry");
344        Ok(())
345    }
346
347    /// Unregisters an agent from the registry.
348    pub async fn unregister_agent(&self, agent_id: AgentId) -> Result<()> {
349        let mut cards = self.cards.write().await;
350        if let Some(card) = cards.remove(&agent_id) {
351            tracing::info!(agent_id = %agent_id, name = %card.name, "Agent unregistered from A2A registry");
352            drop(cards);
353
354            self.event_bus.publish(KernelEvent::AgentStopped {
355                id: agent_id,
356                success: false,
357            })?;
358        }
359        Ok(())
360    }
361
362    /// Finds all agents that have the given capability.
363    pub async fn find_agents_by_capability(&self, capability: &str) -> Result<Vec<AgentCard>> {
364        let cards = self.cards.read().await;
365        let matches: Vec<AgentCard> = cards
366            .values()
367            .filter(|card| card.has_capability(capability))
368            .cloned()
369            .collect();
370        Ok(matches)
371    }
372
373    /// Finds all agents that have the given skill.
374    pub async fn find_agents_by_skill(&self, skill: &str) -> Result<Vec<AgentCard>> {
375        let cards = self.cards.read().await;
376        let matches: Vec<AgentCard> = cards
377            .values()
378            .filter(|card| card.has_skill(skill))
379            .cloned()
380            .collect();
381        Ok(matches)
382    }
383
384    /// Finds an agent by its ID.
385    pub async fn get_agent(&self, agent_id: AgentId) -> Option<AgentCard> {
386        let cards = self.cards.read().await;
387        cards.get(&agent_id).cloned()
388    }
389
390    /// Returns all registered agents.
391    pub async fn list_agents(&self) -> Vec<AgentCard> {
392        let cards = self.cards.read().await;
393        cards.values().cloned().collect()
394    }
395
396    /// Returns the count of registered agents.
397    pub async fn agent_count(&self) -> usize {
398        let cards = self.cards.read().await;
399        cards.len()
400    }
401
402    /// Updates an agent's status.
403    pub async fn update_status(&self, agent_id: AgentId, status: AgentStatus) -> Result<()> {
404        let mut cards = self.cards.write().await;
405        if let Some(card) = cards.get_mut(&agent_id) {
406            card.status = status;
407        }
408        Ok(())
409    }
410}
411
412impl std::fmt::Debug for AgentCardRegistry {
413    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
414        f.debug_struct("AgentCardRegistry").finish()
415    }
416}
417
418/// Per-agent message queue with notification.
419///
420/// Each agent gets its own queue backed by `tokio::sync::Notify`
421/// so consumers can `.await` new messages without polling.
422struct AgentQueue {
423    /// Buffered pending messages (behind a sync mutex for cheap push/drain).
424    messages: parking_lot::Mutex<Vec<PendingMessage>>,
425    /// Notifier signalled when a new message is pushed.
426    notify: tokio::sync::Notify,
427}
428
429impl AgentQueue {
430    fn new() -> Self {
431        Self {
432            messages: parking_lot::Mutex::new(Vec::new()),
433            notify: tokio::sync::Notify::new(),
434        }
435    }
436}
437
438/// Callback type invoked when a TaskDelegation message is received.
439///
440/// The dispatcher calls this with (from, to, task) and expects the
441/// handler to execute the work and return the result.
442pub type DelegationHandler = Arc<
443    dyn Fn(
444            AgentId,
445            AgentId,
446            TaskSpec,
447        )
448            -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<serde_json::Value>> + Send>>
449        + Send
450        + Sync,
451>;
452
453/// A single entry in the A2A message log.
454///
455/// Records every message that passes through the protocol for
456/// observability and debugging. The log is append-only and bounded
457/// to [`A2AProtocol::MAX_LOG_ENTRIES`] entries (oldest are pruned).
458#[derive(Debug, Clone, Serialize, Deserialize)]
459pub struct A2AMessageLogEntry {
460    /// Sending agent's ID.
461    pub from: AgentId,
462    /// Receiving agent's ID.
463    pub to: AgentId,
464    /// Message type name (e.g. "task_delegation", "handshake").
465    pub message_type: String,
466    /// When this message was logged.
467    pub timestamp: DateTime<Utc>,
468    /// Short human-readable content summary.
469    pub content: String,
470}
471
472/// A node in the A2A communication topology.
473///
474/// Represents a single agent, derived from the agent card registry.
475#[derive(Debug, Clone, Serialize, Deserialize)]
476pub struct TopologyNode {
477    /// Stable identifier (the agent name, used by the frontend as a node id).
478    pub id: String,
479    /// Display label.
480    pub label: String,
481    /// Lowercased status (e.g. "running", "idle", "stopped", "starting").
482    pub status: String,
483    /// Agent capabilities (e.g. ["code-review"]).
484    pub capabilities: Vec<String>,
485    /// Agent skills (e.g. ["rust", "python"]).
486    pub skills: Vec<String>,
487    /// ISO-8601 timestamp of the last observed message involving this
488    /// agent, or `None` if no recent activity.
489    pub last_seen: Option<String>,
490}
491
492/// An edge in the A2A communication topology.
493///
494/// Aggregates messages between a pair of agents over a recent
495/// time window. The `last_kind` is the type of the most recent
496/// message along this edge — useful for color-coding the edge.
497#[derive(Debug, Clone, Serialize, Deserialize)]
498pub struct TopologyEdge {
499    /// Source agent identifier (matches `TopologyNode.id`).
500    pub from: String,
501    /// Target agent identifier (matches `TopologyNode.id`).
502    pub to: String,
503    /// Number of messages between `from` and `to` in the window.
504    pub message_count_5m: u32,
505    /// Type of the most recent message along this edge.
506    pub last_kind: String,
507}
508
509/// Response shape for `/api/a2a/topology`.
510#[derive(Debug, Clone, Serialize, Deserialize)]
511pub struct TopologyResponse {
512    /// Agents in the topology (nodes).
513    pub nodes: Vec<TopologyNode>,
514    /// Communication edges aggregated from the recent message log.
515    pub edges: Vec<TopologyEdge>,
516}
517
518/// A2A Protocol handler for inter-agent communication.
519#[derive(Clone)]
520pub struct A2AProtocol {
521    /// The registry for agent capability discovery.
522    registry: AgentCardRegistry,
523    /// Per-agent queues with notification support.
524    queues: Arc<RwLock<HashMap<AgentId, Arc<AgentQueue>>>>,
525    /// Event bus for kernel events.
526    event_bus: EventBus,
527    /// Optional handler invoked when a TaskDelegation message is received.
528    delegation_handler: Arc<RwLock<Option<DelegationHandler>>>,
529    /// Append-only message log for observability.
530    message_log: Arc<parking_lot::RwLock<Vec<A2AMessageLogEntry>>>,
531}
532
533impl A2AProtocol {
534    /// Maximum number of log entries retained before pruning.
535    pub const MAX_LOG_ENTRIES: usize = 10_000;
536
537    /// Creates a new A2A protocol handler.
538    pub fn new(event_bus: EventBus) -> Self {
539        let registry = AgentCardRegistry::new(event_bus.clone());
540        Self {
541            registry,
542            queues: Arc::new(RwLock::new(HashMap::new())),
543            event_bus,
544            delegation_handler: Arc::new(RwLock::new(None)),
545            message_log: Arc::new(parking_lot::RwLock::new(Vec::with_capacity(256))),
546        }
547    }
548
549    /// Register a handler that executes delegated tasks.
550    ///
551    /// When a `TaskDelegation` message arrives and a handler is set,
552    /// the protocol spawns a background task to execute it and sends
553    /// the result back as a `ResultSharing` message.
554    pub async fn set_delegation_handler(&self, handler: DelegationHandler) {
555        let mut h = self.delegation_handler.write().await;
556        *h = Some(handler);
557    }
558
559    /// Append an entry to the message log, pruning if over capacity.
560    fn append_log(&self, entry: A2AMessageLogEntry) {
561        let mut log = self.message_log.write();
562        log.push(entry);
563        if log.len() > Self::MAX_LOG_ENTRIES {
564            let excess = log.len() - Self::MAX_LOG_ENTRIES;
565            log.drain(..excess);
566        }
567    }
568
569    /// Returns recent message log entries, most recent last.
570    ///
571    /// If `limit` is `Some(n)`, returns at most the last `n` entries.
572    pub fn get_message_log(&self, limit: Option<usize>) -> Vec<A2AMessageLogEntry> {
573        let log = self.message_log.read();
574        match limit {
575            Some(n) => log
576                .iter()
577                .rev()
578                .take(n)
579                .cloned()
580                .collect::<Vec<_>>()
581                .into_iter()
582                .rev()
583                .collect(),
584            None => log.clone(),
585        }
586    }
587
588    /// Returns message-log entries whose timestamp is within the last
589    /// `secs` seconds, most recent last.
590    ///
591    /// Used by the topology endpoint to derive edges from a sliding
592    /// window of recent activity.
593    pub fn recent_messages(&self, secs: u64) -> Vec<A2AMessageLogEntry> {
594        let now = Utc::now();
595        let cutoff = now - chrono::Duration::seconds(secs as i64);
596        let log = self.message_log.read();
597        log.iter()
598            .filter(|entry| entry.timestamp >= cutoff)
599            .cloned()
600            .collect()
601    }
602
603    /// Get or create a queue for the given agent.
604    async fn get_or_create_queue(&self, agent_id: AgentId) -> Arc<AgentQueue> {
605        let mut queues = self.queues.write().await;
606        queues
607            .entry(agent_id)
608            .or_insert_with(|| Arc::new(AgentQueue::new()))
609            .clone()
610    }
611
612    /// Returns the agent card registry.
613    pub fn registry(&self) -> &AgentCardRegistry {
614        &self.registry
615    }
616
617    /// Execute a delegated task through the registered handler (blocking).
618    ///
619    /// Also enqueues the delegation message and publishes events for
620    /// audit trail purposes, then calls the handler directly and waits.
621    ///
622    /// Returns `None` if no handler is registered.
623    pub async fn execute_delegation(
624        &self,
625        from: AgentId,
626        to: AgentId,
627        task: TaskSpec,
628    ) -> Option<Result<serde_json::Value>> {
629        let handler = self.delegation_handler.read().await;
630        let handler_ref = handler.as_ref()?;
631
632        // Publish audit event.
633        let _ = self.event_bus.publish(KernelEvent::MessageReceived {
634            from,
635            content: format!("[task_delegation] {:?}", task.task_id),
636        });
637
638        // Log for observability.
639        self.append_log(A2AMessageLogEntry {
640            from,
641            to,
642            message_type: "task_delegation".to_string(),
643            timestamp: Utc::now(),
644            content: task.description.clone(),
645        });
646
647        tracing::info!(
648            from = %from,
649            to = %to,
650            task_id = %task.task_id,
651            "A2A execute_delegation: starting"
652        );
653
654        let result = handler_ref(from, to, task).await;
655
656        tracing::info!(
657            from = %from,
658            to = %to,
659            success = result.is_ok(),
660            "A2A execute_delegation: completed"
661        );
662
663        Some(result)
664    }
665
666    /// Sends a message from one agent to another.
667    pub async fn send_message(
668        &self,
669        from: AgentId,
670        to: AgentId,
671        message: A2AMessage,
672    ) -> Result<Uuid> {
673        let msg_type = message.type_name();
674        let request = A2ARequest::new(from, to, message.clone());
675        let request_id = request.request_id;
676
677        // Log the message for observability.
678        let content_summary = match &request.message {
679            A2AMessage::TaskDelegation { description, .. } => description.clone(),
680            A2AMessage::StatusUpdate { message, .. } => message.clone(),
681            A2AMessage::ResultSharing { summary, .. } => summary.clone(),
682            A2AMessage::CapabilityQuery { query, .. } => query.clone(),
683            A2AMessage::Handshake { name, .. } => format!("handshake from {name}"),
684        };
685        self.append_log(A2AMessageLogEntry {
686            from,
687            to,
688            message_type: msg_type.to_string(),
689            timestamp: Utc::now(),
690            content: content_summary,
691        });
692
693        // Push to the target agent's queue and notify.
694        let queue = self.get_or_create_queue(to).await;
695        queue
696            .messages
697            .lock()
698            .push(PendingMessage::new(request.clone()));
699        queue.notify.notify_one();
700
701        // The event bus is an auxiliary observability/coordination channel;
702        // `append_log` above already records the message durably. If publish
703        // fails we must NOT return Err — the message has already been pushed
704        // to the recipient's queue, so propagating would cause the caller to
705        // retry and produce a duplicate delivery.
706        if let Err(e) = self.event_bus.publish(KernelEvent::MessageReceived {
707            from,
708            content: format!("[{msg_type}] {request_id:?}"),
709        }) {
710            tracing::warn!(
711                error = %e,
712                from = %from,
713                to = %to,
714                request_id = %request_id,
715                "a2a: failed to publish MessageReceived event (message was still delivered)"
716            );
717        }
718
719        tracing::debug!(
720            from = %from,
721            to = %to,
722            request_id = %request_id,
723            msg_type,
724            "A2A message sent"
725        );
726
727        Ok(request_id)
728    }
729
730    /// Delegates a task from one agent to another.
731    pub async fn delegate_task(&self, from: AgentId, to: AgentId, task: TaskSpec) -> Result<Uuid> {
732        let message = A2AMessage::TaskDelegation {
733            task_id: task.task_id,
734            description: task.description.clone(),
735            payload: task.payload.clone(),
736            priority: task.priority,
737        };
738
739        self.send_message(from, to, message).await
740    }
741
742    /// Sends a status update from one agent to another.
743    pub async fn send_status_update(
744        &self,
745        from: AgentId,
746        to: AgentId,
747        task_id: Uuid,
748        progress: u8,
749        message: String,
750    ) -> Result<Uuid> {
751        let message = A2AMessage::StatusUpdate {
752            task_id,
753            progress,
754            message,
755        };
756
757        self.send_message(from, to, message).await
758    }
759
760    /// Shares a result from one agent to another.
761    pub async fn share_result(
762        &self,
763        from: AgentId,
764        to: AgentId,
765        task_id: Uuid,
766        result: serde_json::Value,
767        summary: String,
768    ) -> Result<Uuid> {
769        let message = A2AMessage::ResultSharing {
770            task_id,
771            result,
772            summary,
773        };
774
775        self.send_message(from, to, message).await
776    }
777
778    /// Queries the registry for agents that can perform a capability.
779    pub async fn query_capabilities(&self, capability: &str) -> Result<Vec<AgentCard>> {
780        self.registry.find_agents_by_capability(capability).await
781    }
782
783    /// Initiates a handshake with another agent.
784    pub async fn send_handshake(&self, from: AgentId, to: AgentId) -> Result<Uuid> {
785        let card = self.registry.get_agent(from).await;
786
787        let (name, capabilities) = if let Some(card) = card {
788            (card.name, card.capabilities.clone())
789        } else {
790            ("unknown".into(), Vec::new())
791        };
792
793        let message = A2AMessage::Handshake {
794            agent_id: from,
795            name,
796            capabilities,
797        };
798
799        self.send_message(from, to, message).await
800    }
801
802    /// Receives all pending messages for an agent, draining the queue.
803    pub async fn receive_messages(&self, agent_id: AgentId) -> Vec<A2ARequest> {
804        let queues = self.queues.read().await;
805        if let Some(queue) = queues.get(&agent_id) {
806            let drained: Vec<PendingMessage> = queue.messages.lock().drain(..).collect();
807            drained.into_iter().map(|m| m.request).collect()
808        } else {
809            Vec::new()
810        }
811    }
812
813    /// Returns the number of pending messages for an agent.
814    pub async fn pending_count(&self, agent_id: AgentId) -> usize {
815        let queues = self.queues.read().await;
816        queues
817            .get(&agent_id)
818            .map(|q| q.messages.lock().len())
819            .unwrap_or(0)
820    }
821
822    /// Returns true if the agent has any pending messages.
823    pub async fn has_messages(&self, agent_id: AgentId) -> bool {
824        self.pending_count(agent_id).await > 0
825    }
826
827    /// Deliver all pending messages to an agent.
828    ///
829    /// Unlike `receive_messages` (which drains the queue silently),
830    /// this method does NOT re-publish `MessageReceived` events since
831    /// they were already published when the messages were originally sent.
832    pub async fn deliver_pending_messages(&self, agent_id: AgentId) -> Result<Vec<A2ARequest>> {
833        Ok(self.receive_messages(agent_id).await)
834    }
835
836    /// Send a message and wait for a response within a timeout.
837    ///
838    /// Uses `tokio::select!` with `Notify` instead of polling.
839    /// Matches `ResultSharing` messages by checking if `task_id` equals the
840    /// **delegated task's ID** (not the envelope request_id). This works because
841    /// `delegate_task` creates a `TaskDelegation { task_id: task.task_id, ... }`
842    /// message, and the handler responds with `ResultSharing { task_id: task.task_id }`.
843    pub async fn send_and_wait(
844        &self,
845        from: AgentId,
846        to: AgentId,
847        message: A2AMessage,
848        timeout: std::time::Duration,
849    ) -> Result<A2AResponse> {
850        // Extract the task_id from the outgoing message so we can match the response.
851        let wait_task_id = match &message {
852            A2AMessage::TaskDelegation { task_id, .. } => Some(*task_id),
853            _ => None,
854        };
855
856        let request_id = self.send_message(from, to, message).await?;
857        let queue = self.get_or_create_queue(from).await;
858        let deadline = tokio::time::Instant::now() + timeout;
859
860        loop {
861            // First, check if a matching response is already in the queue.
862            {
863                let mut msgs = queue.messages.lock();
864                let match_idx = msgs.iter().position(|p| {
865                    match (&p.request.message, wait_task_id) {
866                        // For TaskDelegation: match by the delegated task_id.
867                        (A2AMessage::ResultSharing { task_id, .. }, Some(wait_id)) => {
868                            *task_id == wait_id
869                        }
870                        // For non-delegation messages: match by request_id echoed in payload.
871                        (A2AMessage::ResultSharing { result, .. }, None) => {
872                            result.get("request_id").and_then(|v| v.as_str())
873                                == Some(&request_id.to_string())
874                        }
875                        _ => false,
876                    }
877                });
878                if let Some(idx) = match_idx {
879                    let matched = msgs.remove(idx);
880                    if let A2AMessage::ResultSharing { result, .. } = matched.request.message {
881                        return Ok(A2AResponse::success(request_id, to, from, result));
882                    }
883                }
884            }
885
886            // No match yet — wait for notification or timeout.
887            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
888            if remaining.is_zero() {
889                anyhow::bail!("A2A response timeout after {timeout:?}");
890            }
891
892            tokio::select! {
893                _ = queue.notify.notified() => {
894                    // A new message arrived — loop to check for a match.
895                }
896                _ = tokio::time::sleep(remaining) => {
897                    anyhow::bail!("A2A response timeout after {timeout:?}");
898                }
899            }
900        }
901    }
902}
903
904impl std::fmt::Debug for A2AProtocol {
905    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
906        f.debug_struct("A2AProtocol")
907            .field("registry", &self.registry)
908            .finish()
909    }
910}
911
912#[cfg(test)]
913mod tests {
914    use super::*;
915
916    fn create_test_event_bus() -> EventBus {
917        EventBus::new(256)
918    }
919
920    fn create_test_agent_id() -> AgentId {
921        Uuid::new_v4()
922    }
923
924    #[tokio::test]
925    async fn test_agent_card_creation() {
926        let agent_id = create_test_agent_id();
927        let card = AgentCard::new(agent_id, "test-agent", "A test agent")
928            .with_capability("code-review")
929            .with_capability("lint")
930            .with_skill("rust")
931            .with_endpoint("local");
932
933        assert_eq!(card.agent_id, agent_id);
934        assert_eq!(card.name, "test-agent");
935        assert!(card.has_capability("code-review"));
936        assert!(card.has_capability("lint"));
937        assert!(!card.has_capability("refactor"));
938        assert!(card.has_skill("rust"));
939        assert!(!card.has_skill("python"));
940    }
941
942    #[tokio::test]
943    async fn test_registry_register_unregister() {
944        let bus = create_test_event_bus();
945        let registry = AgentCardRegistry::new(bus);
946
947        let agent_id = create_test_agent_id();
948        let card = AgentCard::new(agent_id, "register-test", "Test agent").with_capability("test");
949
950        registry.register_agent(card.clone()).await.unwrap();
951        assert_eq!(registry.agent_count().await, 1);
952
953        let found = registry.get_agent(agent_id).await;
954        assert!(found.is_some());
955        assert_eq!(found.unwrap().name, "register-test");
956
957        registry.unregister_agent(agent_id).await.unwrap();
958        assert_eq!(registry.agent_count().await, 0);
959
960        let found = registry.get_agent(agent_id).await;
961        assert!(found.is_none());
962    }
963
964    #[tokio::test]
965    async fn test_registry_find_by_capability() {
966        let bus = create_test_event_bus();
967        let registry = AgentCardRegistry::new(bus);
968
969        let id1 = Uuid::new_v4();
970        let id2 = Uuid::new_v4();
971
972        registry
973            .register_agent(
974                AgentCard::new(id1, "agent-1", "First agent").with_capability("code-review"),
975            )
976            .await
977            .unwrap();
978
979        registry
980            .register_agent(
981                AgentCard::new(id2, "agent-2", "Second agent")
982                    .with_capability("code-review")
983                    .with_capability("refactor"),
984            )
985            .await
986            .unwrap();
987
988        let reviewers = registry
989            .find_agents_by_capability("code-review")
990            .await
991            .unwrap();
992        assert_eq!(reviewers.len(), 2);
993    }
994
995    #[tokio::test]
996    async fn test_a2a_protocol_send_receive() {
997        let bus = create_test_event_bus();
998        let a2a = A2AProtocol::new(bus);
999
1000        let from = create_test_agent_id();
1001        let to = create_test_agent_id();
1002
1003        let message = A2AMessage::Handshake {
1004            agent_id: from,
1005            name: "sender".into(),
1006            capabilities: vec!["test".into()],
1007        };
1008
1009        a2a.send_message(from, to, message).await.unwrap();
1010        assert_eq!(a2a.pending_count(to).await, 1);
1011
1012        let messages = a2a.receive_messages(to).await;
1013        assert_eq!(messages.len(), 1);
1014        assert_eq!(messages[0].from, from);
1015        assert_eq!(messages[0].to, to);
1016        assert_eq!(a2a.pending_count(to).await, 0);
1017    }
1018
1019    #[tokio::test]
1020    async fn test_delegate_task() {
1021        let bus = create_test_event_bus();
1022        let a2a = A2AProtocol::new(bus);
1023
1024        let from = create_test_agent_id();
1025        let to = create_test_agent_id();
1026
1027        let task = TaskSpec::new("Review PR", serde_json::json!({ "pr": 42 }));
1028
1029        let request_id = a2a.delegate_task(from, to, task).await.unwrap();
1030        assert!(request_id != Uuid::nil());
1031
1032        let messages = a2a.receive_messages(to).await;
1033        assert_eq!(messages.len(), 1);
1034    }
1035
1036    #[test]
1037    fn test_recent_messages_filters_by_window() {
1038        let bus = create_test_event_bus();
1039        let a2a = A2AProtocol::new(bus);
1040
1041        // Append a recent log entry directly.
1042        let recent_ts = Utc::now();
1043        a2a.append_log(A2AMessageLogEntry {
1044            from: Uuid::new_v4(),
1045            to: Uuid::new_v4(),
1046            message_type: "task_delegation".into(),
1047            timestamp: recent_ts,
1048            content: "recent".into(),
1049        });
1050
1051        // Append an old log entry (10 minutes ago).
1052        let old_ts = Utc::now() - chrono::Duration::seconds(600);
1053        a2a.append_log(A2AMessageLogEntry {
1054            from: Uuid::new_v4(),
1055            to: Uuid::new_v4(),
1056            message_type: "handshake".into(),
1057            timestamp: old_ts,
1058            content: "old".into(),
1059        });
1060
1061        // 5-minute window should include only the recent entry.
1062        let window = a2a.recent_messages(300);
1063        assert_eq!(window.len(), 1);
1064        assert_eq!(window[0].content, "recent");
1065        assert_eq!(window[0].message_type, "task_delegation");
1066
1067        // 15-minute window should include both.
1068        let wider = a2a.recent_messages(900);
1069        assert_eq!(wider.len(), 2);
1070
1071        // 1-second window should include only very recent entries.
1072        let narrow = a2a.recent_messages(1);
1073        assert_eq!(narrow.len(), 1);
1074        assert_eq!(narrow[0].content, "recent");
1075    }
1076
1077    #[tokio::test]
1078    async fn test_recent_messages_aggregates_fan_in_fan_out() {
1079        // Mixed message kinds, multi-agent fan-in / fan-out aggregation.
1080        let bus = create_test_event_bus();
1081        let a2a = A2AProtocol::new(bus);
1082
1083        // Register three agents so the registry has names.
1084        let orch = Uuid::new_v4();
1085        let worker_a = Uuid::new_v4();
1086        let worker_b = Uuid::new_v4();
1087        for (id, name) in [
1088            (orch, "orchestrator"),
1089            (worker_a, "worker-a"),
1090            (worker_b, "worker-b"),
1091        ] {
1092            a2a.registry
1093                .register_agent(AgentCard::new(id, name, "test").with_status(AgentStatus::Running))
1094                .await
1095                .unwrap();
1096        }
1097
1098        // orchestrator -> worker-a: 2x TaskDelegation
1099        for _ in 0..2 {
1100            a2a.append_log(A2AMessageLogEntry {
1101                from: orch,
1102                to: worker_a,
1103                message_type: "task_delegation".into(),
1104                timestamp: Utc::now(),
1105                content: "do work".into(),
1106            });
1107        }
1108
1109        // orchestrator -> worker-b: 1x TaskDelegation, 1x StatusUpdate
1110        a2a.append_log(A2AMessageLogEntry {
1111            from: orch,
1112            to: worker_b,
1113            message_type: "task_delegation".into(),
1114            timestamp: Utc::now(),
1115            content: "do work b".into(),
1116        });
1117        a2a.append_log(A2AMessageLogEntry {
1118            from: worker_b,
1119            to: orch,
1120            message_type: "status_update".into(),
1121            timestamp: Utc::now(),
1122            content: "50%".into(),
1123        });
1124
1125        // worker-a -> orchestrator: 1x ResultSharing (fan-in)
1126        a2a.append_log(A2AMessageLogEntry {
1127            from: worker_a,
1128            to: orch,
1129            message_type: "result_sharing".into(),
1130            timestamp: Utc::now(),
1131            content: "done".into(),
1132        });
1133
1134        // Now aggregate: 3 distinct (from,to) pairs, with the expected counts
1135        // and the most-recent message_type for each edge.
1136        let entries = a2a.recent_messages(300);
1137        let mut aggregates: HashMap<(AgentId, AgentId), (u32, String)> = HashMap::new();
1138        for entry in &entries {
1139            let agg = aggregates
1140                .entry((entry.from, entry.to))
1141                .or_insert((0, String::new()));
1142            agg.0 = agg.0.saturating_add(1);
1143            agg.1 = entry.message_type.clone();
1144        }
1145
1146        // orchestrator -> worker-a: count=2, last_kind=task_delegation
1147        let e1 = aggregates.get(&(orch, worker_a)).expect("edge 1 missing");
1148        assert_eq!(e1.0, 2, "orch->worker_a count");
1149        assert_eq!(e1.1, "task_delegation", "orch->worker_a last_kind");
1150
1151        // orchestrator -> worker-b: count=1, last_kind=task_delegation
1152        let e2 = aggregates.get(&(orch, worker_b)).expect("edge 2 missing");
1153        assert_eq!(e2.0, 1, "orch->worker_b count");
1154        assert_eq!(e2.1, "task_delegation", "orch->worker_b last_kind");
1155
1156        // worker-b -> orchestrator: count=1, last_kind=status_update
1157        let e3 = aggregates.get(&(worker_b, orch)).expect("edge 3 missing");
1158        assert_eq!(e3.0, 1, "worker_b->orch count");
1159        assert_eq!(e3.1, "status_update", "worker_b->orch last_kind");
1160
1161        // worker-a -> orchestrator: count=1, last_kind=result_sharing (fan-in)
1162        let e4 = aggregates.get(&(worker_a, orch)).expect("edge 4 missing");
1163        assert_eq!(e4.0, 1, "worker_a->orch count");
1164        assert_eq!(e4.1, "result_sharing", "worker_a->orch last_kind");
1165
1166        // Total of 4 distinct edges.
1167        assert_eq!(aggregates.len(), 4);
1168    }
1169}