1pub mod circuit_breaker;
8
9pub use circuit_breaker::{A2ACircuitBreaker, CircuitState};
10
11use std::collections::HashMap;
12use std::sync::Arc;
13
14use anyhow::Result;
15use chrono::{DateTime, Utc};
16use serde::{Deserialize, Serialize};
17use tokio::sync::RwLock;
18use uuid::Uuid;
19
20use crate::event_bus::{EventBus, KernelEvent};
21use crate::types::{AgentId, AgentStatus};
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25#[serde(tag = "type", rename_all = "snake_case")]
26pub enum A2AMessage {
27 TaskDelegation {
29 task_id: Uuid,
31 description: String,
33 payload: serde_json::Value,
35 priority: TaskPriority,
37 },
38 StatusUpdate {
40 task_id: Uuid,
42 progress: u8,
44 message: String,
46 },
47 ResultSharing {
49 task_id: Uuid,
51 result: serde_json::Value,
53 summary: String,
55 },
56 CapabilityQuery {
58 query: String,
60 required_capabilities: Vec<String>,
62 },
63 Handshake {
65 agent_id: AgentId,
67 name: String,
69 capabilities: Vec<String>,
71 },
72}
73
74impl A2AMessage {
75 pub fn type_name(&self) -> &'static str {
77 match self {
78 A2AMessage::TaskDelegation { .. } => "task_delegation",
79 A2AMessage::StatusUpdate { .. } => "status_update",
80 A2AMessage::ResultSharing { .. } => "result_sharing",
81 A2AMessage::CapabilityQuery { .. } => "capability_query",
82 A2AMessage::Handshake { .. } => "handshake",
83 }
84 }
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
89pub enum TaskPriority {
90 Low,
92 #[default]
94 Normal,
95 High,
97 Critical,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct TaskSpec {
104 pub task_id: Uuid,
106 pub description: String,
108 pub payload: serde_json::Value,
110 pub priority: TaskPriority,
112 pub deadline: Option<DateTime<Utc>>,
114}
115
116impl TaskSpec {
117 pub fn new(description: impl Into<String>, payload: serde_json::Value) -> Self {
119 Self {
120 task_id: Uuid::new_v4(),
121 description: description.into(),
122 payload,
123 priority: TaskPriority::default(),
124 deadline: None,
125 }
126 }
127
128 pub fn with_priority(mut self, priority: TaskPriority) -> Self {
130 self.priority = priority;
131 self
132 }
133
134 pub fn with_deadline(mut self, deadline: DateTime<Utc>) -> Self {
136 self.deadline = Some(deadline);
137 self
138 }
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct A2ARequest {
144 pub request_id: Uuid,
146 pub from: AgentId,
148 pub to: AgentId,
150 pub message: A2AMessage,
152 pub timestamp: DateTime<Utc>,
154}
155
156impl A2ARequest {
157 pub fn new(from: AgentId, to: AgentId, message: A2AMessage) -> Self {
159 Self {
160 request_id: Uuid::new_v4(),
161 from,
162 to,
163 message,
164 timestamp: Utc::now(),
165 }
166 }
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct A2AResponse {
172 pub response_id: Uuid,
174 pub request_id: Uuid,
176 pub from: AgentId,
178 pub to: AgentId,
180 pub accepted: bool,
182 pub payload: serde_json::Value,
184 pub timestamp: DateTime<Utc>,
186}
187
188impl A2AResponse {
189 pub fn success(
191 request_id: Uuid,
192 from: AgentId,
193 to: AgentId,
194 payload: serde_json::Value,
195 ) -> Self {
196 Self {
197 response_id: Uuid::new_v4(),
198 request_id,
199 from,
200 to,
201 accepted: true,
202 payload,
203 timestamp: Utc::now(),
204 }
205 }
206
207 pub fn error(request_id: Uuid, from: AgentId, to: AgentId, error: impl Into<String>) -> Self {
209 Self {
210 response_id: Uuid::new_v4(),
211 request_id,
212 from,
213 to,
214 accepted: false,
215 payload: serde_json::json!({ "error": error.into() }),
216 timestamp: Utc::now(),
217 }
218 }
219}
220
221#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct PendingMessage {
224 pub request: A2ARequest,
226 pub queued_at: DateTime<Utc>,
228}
229
230impl PendingMessage {
231 fn new(request: A2ARequest) -> Self {
232 Self {
233 request,
234 queued_at: Utc::now(),
235 }
236 }
237}
238
239#[derive(Debug, Clone, Serialize, Deserialize)]
244pub struct AgentCard {
245 pub agent_id: AgentId,
247 pub name: String,
249 pub description: String,
251 pub capabilities: Vec<String>,
253 pub skills: Vec<String>,
255 pub endpoint: String,
257 pub status: AgentStatus,
259}
260
261impl AgentCard {
262 pub fn new(agent_id: AgentId, name: impl Into<String>, description: impl Into<String>) -> Self {
264 Self {
265 agent_id,
266 name: name.into(),
267 description: description.into(),
268 capabilities: Vec::new(),
269 skills: Vec::new(),
270 endpoint: "local".into(),
271 status: AgentStatus::Starting,
272 }
273 }
274
275 pub fn with_capability(mut self, capability: impl Into<String>) -> Self {
277 self.capabilities.push(capability.into());
278 self
279 }
280
281 pub fn with_skill(mut self, skill: impl Into<String>) -> Self {
283 self.skills.push(skill.into());
284 self
285 }
286
287 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
289 self.endpoint = endpoint.into();
290 self
291 }
292
293 pub fn with_status(mut self, status: AgentStatus) -> Self {
295 self.status = status;
296 self
297 }
298
299 pub fn has_capability(&self, capability: &str) -> bool {
301 self.capabilities.iter().any(|c| c == capability)
302 }
303
304 pub fn has_skill(&self, skill: &str) -> bool {
306 self.skills.iter().any(|s| s == skill)
307 }
308}
309
310#[derive(Clone)]
315pub struct AgentCardRegistry {
316 cards: Arc<RwLock<HashMap<AgentId, AgentCard>>>,
318 event_bus: EventBus,
320}
321
322impl AgentCardRegistry {
323 pub fn new(event_bus: EventBus) -> Self {
325 Self {
326 cards: Arc::new(RwLock::new(HashMap::new())),
327 event_bus,
328 }
329 }
330
331 pub async fn register_agent(&self, card: AgentCard) -> Result<()> {
333 let agent_id = card.agent_id;
334 let mut cards = self.cards.write().await;
335 cards.insert(agent_id, card.clone());
336 drop(cards);
337
338 self.event_bus.publish(KernelEvent::AgentCreated {
339 id: agent_id,
340 name: card.name.clone(),
341 })?;
342
343 tracing::info!(agent_id = %agent_id, name = %card.name, "Agent registered in A2A registry");
344 Ok(())
345 }
346
347 pub async fn unregister_agent(&self, agent_id: AgentId) -> Result<()> {
349 let mut cards = self.cards.write().await;
350 if let Some(card) = cards.remove(&agent_id) {
351 tracing::info!(agent_id = %agent_id, name = %card.name, "Agent unregistered from A2A registry");
352 drop(cards);
353
354 self.event_bus
355 .publish(KernelEvent::AgentStopped { id: agent_id })?;
356 }
357 Ok(())
358 }
359
360 pub async fn find_agents_by_capability(&self, capability: &str) -> Result<Vec<AgentCard>> {
362 let cards = self.cards.read().await;
363 let matches: Vec<AgentCard> = cards
364 .values()
365 .filter(|card| card.has_capability(capability))
366 .cloned()
367 .collect();
368 Ok(matches)
369 }
370
371 pub async fn find_agents_by_skill(&self, skill: &str) -> Result<Vec<AgentCard>> {
373 let cards = self.cards.read().await;
374 let matches: Vec<AgentCard> = cards
375 .values()
376 .filter(|card| card.has_skill(skill))
377 .cloned()
378 .collect();
379 Ok(matches)
380 }
381
382 pub async fn get_agent(&self, agent_id: AgentId) -> Option<AgentCard> {
384 let cards = self.cards.read().await;
385 cards.get(&agent_id).cloned()
386 }
387
388 pub async fn list_agents(&self) -> Vec<AgentCard> {
390 let cards = self.cards.read().await;
391 cards.values().cloned().collect()
392 }
393
394 pub async fn agent_count(&self) -> usize {
396 let cards = self.cards.read().await;
397 cards.len()
398 }
399
400 pub async fn update_status(&self, agent_id: AgentId, status: AgentStatus) -> Result<()> {
402 let mut cards = self.cards.write().await;
403 if let Some(card) = cards.get_mut(&agent_id) {
404 card.status = status;
405 }
406 Ok(())
407 }
408}
409
410impl std::fmt::Debug for AgentCardRegistry {
411 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
412 f.debug_struct("AgentCardRegistry").finish()
413 }
414}
415
416struct AgentQueue {
421 messages: parking_lot::Mutex<Vec<PendingMessage>>,
423 notify: tokio::sync::Notify,
425}
426
427impl AgentQueue {
428 fn new() -> Self {
429 Self {
430 messages: parking_lot::Mutex::new(Vec::new()),
431 notify: tokio::sync::Notify::new(),
432 }
433 }
434}
435
436pub type DelegationHandler = Arc<
441 dyn Fn(
442 AgentId,
443 AgentId,
444 TaskSpec,
445 )
446 -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<serde_json::Value>> + Send>>
447 + Send
448 + Sync,
449>;
450
451#[derive(Debug, Clone, Serialize, Deserialize)]
457pub struct A2AMessageLogEntry {
458 pub from: AgentId,
460 pub to: AgentId,
462 pub message_type: String,
464 pub timestamp: DateTime<Utc>,
466 pub content: String,
468}
469
470#[derive(Debug, Clone, Serialize, Deserialize)]
474pub struct TopologyNode {
475 pub id: String,
477 pub label: String,
479 pub status: String,
481 pub capabilities: Vec<String>,
483 pub skills: Vec<String>,
485 pub last_seen: Option<String>,
488}
489
490#[derive(Debug, Clone, Serialize, Deserialize)]
496pub struct TopologyEdge {
497 pub from: String,
499 pub to: String,
501 pub message_count_5m: u32,
503 pub last_kind: String,
505}
506
507#[derive(Debug, Clone, Serialize, Deserialize)]
509pub struct TopologyResponse {
510 pub nodes: Vec<TopologyNode>,
512 pub edges: Vec<TopologyEdge>,
514}
515
516#[derive(Clone)]
518pub struct A2AProtocol {
519 registry: AgentCardRegistry,
521 queues: Arc<RwLock<HashMap<AgentId, Arc<AgentQueue>>>>,
523 event_bus: EventBus,
525 delegation_handler: Arc<RwLock<Option<DelegationHandler>>>,
527 message_log: Arc<parking_lot::RwLock<Vec<A2AMessageLogEntry>>>,
529}
530
531impl A2AProtocol {
532 pub const MAX_LOG_ENTRIES: usize = 10_000;
534
535 pub fn new(event_bus: EventBus) -> Self {
537 let registry = AgentCardRegistry::new(event_bus.clone());
538 Self {
539 registry,
540 queues: Arc::new(RwLock::new(HashMap::new())),
541 event_bus,
542 delegation_handler: Arc::new(RwLock::new(None)),
543 message_log: Arc::new(parking_lot::RwLock::new(Vec::with_capacity(256))),
544 }
545 }
546
547 pub async fn set_delegation_handler(&self, handler: DelegationHandler) {
553 let mut h = self.delegation_handler.write().await;
554 *h = Some(handler);
555 }
556
557 fn append_log(&self, entry: A2AMessageLogEntry) {
559 let mut log = self.message_log.write();
560 log.push(entry);
561 if log.len() > Self::MAX_LOG_ENTRIES {
562 let excess = log.len() - Self::MAX_LOG_ENTRIES;
563 log.drain(..excess);
564 }
565 }
566
567 pub fn get_message_log(&self, limit: Option<usize>) -> Vec<A2AMessageLogEntry> {
571 let log = self.message_log.read();
572 match limit {
573 Some(n) => log
574 .iter()
575 .rev()
576 .take(n)
577 .cloned()
578 .collect::<Vec<_>>()
579 .into_iter()
580 .rev()
581 .collect(),
582 None => log.clone(),
583 }
584 }
585
586 pub fn recent_messages(&self, secs: u64) -> Vec<A2AMessageLogEntry> {
592 let now = Utc::now();
593 let cutoff = now - chrono::Duration::seconds(secs as i64);
594 let log = self.message_log.read();
595 log.iter()
596 .filter(|entry| entry.timestamp >= cutoff)
597 .cloned()
598 .collect()
599 }
600
601 async fn get_or_create_queue(&self, agent_id: AgentId) -> Arc<AgentQueue> {
603 let mut queues = self.queues.write().await;
604 queues
605 .entry(agent_id)
606 .or_insert_with(|| Arc::new(AgentQueue::new()))
607 .clone()
608 }
609
610 pub fn registry(&self) -> &AgentCardRegistry {
612 &self.registry
613 }
614
615 pub async fn execute_delegation(
622 &self,
623 from: AgentId,
624 to: AgentId,
625 task: TaskSpec,
626 ) -> Option<Result<serde_json::Value>> {
627 let handler = self.delegation_handler.read().await;
628 let handler_ref = handler.as_ref()?;
629
630 let _ = self.event_bus.publish(KernelEvent::MessageReceived {
632 from,
633 content: format!("[task_delegation] {:?}", task.task_id),
634 });
635
636 self.append_log(A2AMessageLogEntry {
638 from,
639 to,
640 message_type: "task_delegation".to_string(),
641 timestamp: Utc::now(),
642 content: task.description.clone(),
643 });
644
645 tracing::info!(
646 from = %from,
647 to = %to,
648 task_id = %task.task_id,
649 "A2A execute_delegation: starting"
650 );
651
652 let result = handler_ref(from, to, task).await;
653
654 tracing::info!(
655 from = %from,
656 to = %to,
657 success = result.is_ok(),
658 "A2A execute_delegation: completed"
659 );
660
661 Some(result)
662 }
663
664 pub async fn send_message(
666 &self,
667 from: AgentId,
668 to: AgentId,
669 message: A2AMessage,
670 ) -> Result<Uuid> {
671 let msg_type = message.type_name();
672 let request = A2ARequest::new(from, to, message.clone());
673 let request_id = request.request_id;
674
675 let content_summary = match &request.message {
677 A2AMessage::TaskDelegation { description, .. } => description.clone(),
678 A2AMessage::StatusUpdate { message, .. } => message.clone(),
679 A2AMessage::ResultSharing { summary, .. } => summary.clone(),
680 A2AMessage::CapabilityQuery { query, .. } => query.clone(),
681 A2AMessage::Handshake { name, .. } => format!("handshake from {name}"),
682 };
683 self.append_log(A2AMessageLogEntry {
684 from,
685 to,
686 message_type: msg_type.to_string(),
687 timestamp: Utc::now(),
688 content: content_summary,
689 });
690
691 let queue = self.get_or_create_queue(to).await;
693 queue
694 .messages
695 .lock()
696 .push(PendingMessage::new(request.clone()));
697 queue.notify.notify_one();
698
699 if let Err(e) = self.event_bus.publish(KernelEvent::MessageReceived {
705 from,
706 content: format!("[{msg_type}] {request_id:?}"),
707 }) {
708 tracing::warn!(
709 error = %e,
710 from = %from,
711 to = %to,
712 request_id = %request_id,
713 "a2a: failed to publish MessageReceived event (message was still delivered)"
714 );
715 }
716
717 tracing::debug!(
718 from = %from,
719 to = %to,
720 request_id = %request_id,
721 msg_type,
722 "A2A message sent"
723 );
724
725 Ok(request_id)
726 }
727
728 pub async fn delegate_task(&self, from: AgentId, to: AgentId, task: TaskSpec) -> Result<Uuid> {
730 let message = A2AMessage::TaskDelegation {
731 task_id: task.task_id,
732 description: task.description.clone(),
733 payload: task.payload.clone(),
734 priority: task.priority,
735 };
736
737 self.send_message(from, to, message).await
738 }
739
740 pub async fn send_status_update(
742 &self,
743 from: AgentId,
744 to: AgentId,
745 task_id: Uuid,
746 progress: u8,
747 message: String,
748 ) -> Result<Uuid> {
749 let message = A2AMessage::StatusUpdate {
750 task_id,
751 progress,
752 message,
753 };
754
755 self.send_message(from, to, message).await
756 }
757
758 pub async fn share_result(
760 &self,
761 from: AgentId,
762 to: AgentId,
763 task_id: Uuid,
764 result: serde_json::Value,
765 summary: String,
766 ) -> Result<Uuid> {
767 let message = A2AMessage::ResultSharing {
768 task_id,
769 result,
770 summary,
771 };
772
773 self.send_message(from, to, message).await
774 }
775
776 pub async fn query_capabilities(&self, capability: &str) -> Result<Vec<AgentCard>> {
778 self.registry.find_agents_by_capability(capability).await
779 }
780
781 pub async fn send_handshake(&self, from: AgentId, to: AgentId) -> Result<Uuid> {
783 let card = self.registry.get_agent(from).await;
784
785 let (name, capabilities) = if let Some(card) = card {
786 (card.name, card.capabilities.clone())
787 } else {
788 ("unknown".into(), Vec::new())
789 };
790
791 let message = A2AMessage::Handshake {
792 agent_id: from,
793 name,
794 capabilities,
795 };
796
797 self.send_message(from, to, message).await
798 }
799
800 pub async fn receive_messages(&self, agent_id: AgentId) -> Vec<A2ARequest> {
802 let queues = self.queues.read().await;
803 if let Some(queue) = queues.get(&agent_id) {
804 let drained: Vec<PendingMessage> = queue.messages.lock().drain(..).collect();
805 drained.into_iter().map(|m| m.request).collect()
806 } else {
807 Vec::new()
808 }
809 }
810
811 pub async fn pending_count(&self, agent_id: AgentId) -> usize {
813 let queues = self.queues.read().await;
814 queues
815 .get(&agent_id)
816 .map(|q| q.messages.lock().len())
817 .unwrap_or(0)
818 }
819
820 pub async fn has_messages(&self, agent_id: AgentId) -> bool {
822 self.pending_count(agent_id).await > 0
823 }
824
825 pub async fn deliver_pending_messages(&self, agent_id: AgentId) -> Result<Vec<A2ARequest>> {
831 Ok(self.receive_messages(agent_id).await)
832 }
833
834 pub async fn send_and_wait(
842 &self,
843 from: AgentId,
844 to: AgentId,
845 message: A2AMessage,
846 timeout: std::time::Duration,
847 ) -> Result<A2AResponse> {
848 let wait_task_id = match &message {
850 A2AMessage::TaskDelegation { task_id, .. } => Some(*task_id),
851 _ => None,
852 };
853
854 let request_id = self.send_message(from, to, message).await?;
855 let queue = self.get_or_create_queue(from).await;
856 let deadline = tokio::time::Instant::now() + timeout;
857
858 loop {
859 {
861 let mut msgs = queue.messages.lock();
862 let match_idx = msgs.iter().position(|p| {
863 match (&p.request.message, wait_task_id) {
864 (A2AMessage::ResultSharing { task_id, .. }, Some(wait_id)) => {
866 *task_id == wait_id
867 }
868 (A2AMessage::ResultSharing { result, .. }, None) => {
870 result.get("request_id").and_then(|v| v.as_str())
871 == Some(&request_id.to_string())
872 }
873 _ => false,
874 }
875 });
876 if let Some(idx) = match_idx {
877 let matched = msgs.remove(idx);
878 if let A2AMessage::ResultSharing { result, .. } = matched.request.message {
879 return Ok(A2AResponse::success(request_id, to, from, result));
880 }
881 }
882 }
883
884 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
886 if remaining.is_zero() {
887 anyhow::bail!("A2A response timeout after {timeout:?}");
888 }
889
890 tokio::select! {
891 _ = queue.notify.notified() => {
892 }
894 _ = tokio::time::sleep(remaining) => {
895 anyhow::bail!("A2A response timeout after {timeout:?}");
896 }
897 }
898 }
899 }
900}
901
902impl std::fmt::Debug for A2AProtocol {
903 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
904 f.debug_struct("A2AProtocol")
905 .field("registry", &self.registry)
906 .finish()
907 }
908}
909
910#[cfg(test)]
911mod tests {
912 use super::*;
913
914 fn create_test_event_bus() -> EventBus {
915 EventBus::new(256)
916 }
917
918 fn create_test_agent_id() -> AgentId {
919 Uuid::new_v4()
920 }
921
922 #[tokio::test]
923 async fn test_agent_card_creation() {
924 let agent_id = create_test_agent_id();
925 let card = AgentCard::new(agent_id, "test-agent", "A test agent")
926 .with_capability("code-review")
927 .with_capability("lint")
928 .with_skill("rust")
929 .with_endpoint("local");
930
931 assert_eq!(card.agent_id, agent_id);
932 assert_eq!(card.name, "test-agent");
933 assert!(card.has_capability("code-review"));
934 assert!(card.has_capability("lint"));
935 assert!(!card.has_capability("refactor"));
936 assert!(card.has_skill("rust"));
937 assert!(!card.has_skill("python"));
938 }
939
940 #[tokio::test]
941 async fn test_registry_register_unregister() {
942 let bus = create_test_event_bus();
943 let registry = AgentCardRegistry::new(bus);
944
945 let agent_id = create_test_agent_id();
946 let card = AgentCard::new(agent_id, "register-test", "Test agent").with_capability("test");
947
948 registry.register_agent(card.clone()).await.unwrap();
949 assert_eq!(registry.agent_count().await, 1);
950
951 let found = registry.get_agent(agent_id).await;
952 assert!(found.is_some());
953 assert_eq!(found.unwrap().name, "register-test");
954
955 registry.unregister_agent(agent_id).await.unwrap();
956 assert_eq!(registry.agent_count().await, 0);
957
958 let found = registry.get_agent(agent_id).await;
959 assert!(found.is_none());
960 }
961
962 #[tokio::test]
963 async fn test_registry_find_by_capability() {
964 let bus = create_test_event_bus();
965 let registry = AgentCardRegistry::new(bus);
966
967 let id1 = Uuid::new_v4();
968 let id2 = Uuid::new_v4();
969
970 registry
971 .register_agent(
972 AgentCard::new(id1, "agent-1", "First agent").with_capability("code-review"),
973 )
974 .await
975 .unwrap();
976
977 registry
978 .register_agent(
979 AgentCard::new(id2, "agent-2", "Second agent")
980 .with_capability("code-review")
981 .with_capability("refactor"),
982 )
983 .await
984 .unwrap();
985
986 let reviewers = registry
987 .find_agents_by_capability("code-review")
988 .await
989 .unwrap();
990 assert_eq!(reviewers.len(), 2);
991 }
992
993 #[tokio::test]
994 async fn test_a2a_protocol_send_receive() {
995 let bus = create_test_event_bus();
996 let a2a = A2AProtocol::new(bus);
997
998 let from = create_test_agent_id();
999 let to = create_test_agent_id();
1000
1001 let message = A2AMessage::Handshake {
1002 agent_id: from,
1003 name: "sender".into(),
1004 capabilities: vec!["test".into()],
1005 };
1006
1007 a2a.send_message(from, to, message).await.unwrap();
1008 assert_eq!(a2a.pending_count(to).await, 1);
1009
1010 let messages = a2a.receive_messages(to).await;
1011 assert_eq!(messages.len(), 1);
1012 assert_eq!(messages[0].from, from);
1013 assert_eq!(messages[0].to, to);
1014 assert_eq!(a2a.pending_count(to).await, 0);
1015 }
1016
1017 #[tokio::test]
1018 async fn test_delegate_task() {
1019 let bus = create_test_event_bus();
1020 let a2a = A2AProtocol::new(bus);
1021
1022 let from = create_test_agent_id();
1023 let to = create_test_agent_id();
1024
1025 let task = TaskSpec::new("Review PR", serde_json::json!({ "pr": 42 }));
1026
1027 let request_id = a2a.delegate_task(from, to, task).await.unwrap();
1028 assert!(request_id != Uuid::nil());
1029
1030 let messages = a2a.receive_messages(to).await;
1031 assert_eq!(messages.len(), 1);
1032 }
1033
1034 #[test]
1035 fn test_recent_messages_filters_by_window() {
1036 let bus = create_test_event_bus();
1037 let a2a = A2AProtocol::new(bus);
1038
1039 let recent_ts = Utc::now();
1041 a2a.append_log(A2AMessageLogEntry {
1042 from: Uuid::new_v4(),
1043 to: Uuid::new_v4(),
1044 message_type: "task_delegation".into(),
1045 timestamp: recent_ts,
1046 content: "recent".into(),
1047 });
1048
1049 let old_ts = Utc::now() - chrono::Duration::seconds(600);
1051 a2a.append_log(A2AMessageLogEntry {
1052 from: Uuid::new_v4(),
1053 to: Uuid::new_v4(),
1054 message_type: "handshake".into(),
1055 timestamp: old_ts,
1056 content: "old".into(),
1057 });
1058
1059 let window = a2a.recent_messages(300);
1061 assert_eq!(window.len(), 1);
1062 assert_eq!(window[0].content, "recent");
1063 assert_eq!(window[0].message_type, "task_delegation");
1064
1065 let wider = a2a.recent_messages(900);
1067 assert_eq!(wider.len(), 2);
1068
1069 let narrow = a2a.recent_messages(1);
1071 assert_eq!(narrow.len(), 1);
1072 assert_eq!(narrow[0].content, "recent");
1073 }
1074
1075 #[tokio::test]
1076 async fn test_recent_messages_aggregates_fan_in_fan_out() {
1077 let bus = create_test_event_bus();
1079 let a2a = A2AProtocol::new(bus);
1080
1081 let orch = Uuid::new_v4();
1083 let worker_a = Uuid::new_v4();
1084 let worker_b = Uuid::new_v4();
1085 for (id, name) in [
1086 (orch, "orchestrator"),
1087 (worker_a, "worker-a"),
1088 (worker_b, "worker-b"),
1089 ] {
1090 a2a.registry
1091 .register_agent(AgentCard::new(id, name, "test").with_status(AgentStatus::Running))
1092 .await
1093 .unwrap();
1094 }
1095
1096 for _ in 0..2 {
1098 a2a.append_log(A2AMessageLogEntry {
1099 from: orch,
1100 to: worker_a,
1101 message_type: "task_delegation".into(),
1102 timestamp: Utc::now(),
1103 content: "do work".into(),
1104 });
1105 }
1106
1107 a2a.append_log(A2AMessageLogEntry {
1109 from: orch,
1110 to: worker_b,
1111 message_type: "task_delegation".into(),
1112 timestamp: Utc::now(),
1113 content: "do work b".into(),
1114 });
1115 a2a.append_log(A2AMessageLogEntry {
1116 from: worker_b,
1117 to: orch,
1118 message_type: "status_update".into(),
1119 timestamp: Utc::now(),
1120 content: "50%".into(),
1121 });
1122
1123 a2a.append_log(A2AMessageLogEntry {
1125 from: worker_a,
1126 to: orch,
1127 message_type: "result_sharing".into(),
1128 timestamp: Utc::now(),
1129 content: "done".into(),
1130 });
1131
1132 let entries = a2a.recent_messages(300);
1135 let mut aggregates: HashMap<(AgentId, AgentId), (u32, String)> = HashMap::new();
1136 for entry in &entries {
1137 let agg = aggregates
1138 .entry((entry.from, entry.to))
1139 .or_insert((0, String::new()));
1140 agg.0 = agg.0.saturating_add(1);
1141 agg.1 = entry.message_type.clone();
1142 }
1143
1144 let e1 = aggregates.get(&(orch, worker_a)).expect("edge 1 missing");
1146 assert_eq!(e1.0, 2, "orch->worker_a count");
1147 assert_eq!(e1.1, "task_delegation", "orch->worker_a last_kind");
1148
1149 let e2 = aggregates.get(&(orch, worker_b)).expect("edge 2 missing");
1151 assert_eq!(e2.0, 1, "orch->worker_b count");
1152 assert_eq!(e2.1, "task_delegation", "orch->worker_b last_kind");
1153
1154 let e3 = aggregates.get(&(worker_b, orch)).expect("edge 3 missing");
1156 assert_eq!(e3.0, 1, "worker_b->orch count");
1157 assert_eq!(e3.1, "status_update", "worker_b->orch last_kind");
1158
1159 let e4 = aggregates.get(&(worker_a, orch)).expect("edge 4 missing");
1161 assert_eq!(e4.0, 1, "worker_a->orch count");
1162 assert_eq!(e4.1, "result_sharing", "worker_a->orch last_kind");
1163
1164 assert_eq!(aggregates.len(), 4);
1166 }
1167}